生产者 配置文件org.springframework.boot spring-boot-starter-webcom.alibaba fastjson1.2.75 org.springframework.boot spring-boot-starter-amqp
server: port: 7001 spring: rabbitmq: host: localhost port: 5672 username: xx password: xx virtual-host: /常量类
package com.example.rabbitmq.constant; public class Constant { //队列名称 public static final String DIRECT_QUEUE_NAME02 = "testDirectQueue02"; //队列名称 public static final String DIRECT_QUEUE_NAME = "testDirectQueue"; //交换机名称 public static final String DIRECT_EXCHANGE_NAME = "testDirectExchange"; //交换机绑定rout public static final String DIRECT_ROUTING = "testDirectRouting"; //交换机绑定rout public static final String DIRECT_ROUTING02 = "testDirectRouting02"; }rabbitmq配置类
package com.example.rabbitmq.config; import com.example.rabbitmq.constant.Constant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false @Bean public Queue testDirectQueue() { //第一个队列 return new Queue(Constant.DIRECT_QUEUE_NAME, true); } // 路由模式 start // 生产者通过路由方式 把队列绑定到交换机上 // 消息对于多个消费者来说 消息只能被消费一次 // 生产消息的时候指定交换机和路由,就会发送到绑定的队列上面 // 消费者只要消费指定的队列就可以了 不会消费同一个交换机上的其他队列 // 也就是说一个交换机可以绑定多个队列 //Direct交换机 起名:testDirectExchange @Bean public DirectExchange testDirectExchange() { return new DirectExchange(Constant.DIRECT_EXCHANGE_NAME, true, false); } //把第一个队列和交换机绑定 @Bean public Binding bindingDirect() { return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(Constant.DIRECT_ROUTING); } // 路由模式 end @Bean public Queue testDirectQueue02(){ //第二个队列 return new Queue(Constant.DIRECT_QUEUE_NAME02,true); } //第二个队列和交换机绑定 @Bean public Binding bindingDirect02() { return BindingBuilder.bind(testDirectQueue02()).to(testDirectExchange()).with(Constant.DIRECT_ROUTING02); } }controller 这里总共调用了10次接口,发现消费者总共消费了10次,因为虽然是同一个交换机 但是绑定的队列不一样,消费者消费的是队列 不是交换机
package com.example.rabbitmq.controller; import com.alibaba.fastjson.JSON; import com.example.rabbitmq.constant.Constant; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController public class RabbitmqController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/directMq") public void directMq() { Map消费者 配置文件map = new HashMap<>(); map.put("messageId", UUID.randomUUID()); map.put("messageData", "hello testDirect"); map.put("createTime", System.currentTimeMillis()); //Constant.DIRECT_ROUTING 对应的队列是testDirectQueue rabbitTemplate.convertAndSend(Constant.DIRECT_EXCHANGE_NAME, Constant.DIRECT_ROUTING, JSON.toJSonString(map)); //Constant.DIRECT_ROUTING02 对应的队列是testDirectQueue02 rabbitTemplate.convertAndSend(Constant.DIRECT_EXCHANGE_NAME, Constant.DIRECT_ROUTING02, JSON.toJSonString(map)); //rabbitTemplate.convertAndSend(Constant.DIRECT_QUEUE_NAME,JSON.toJSonString(map)); } }
server: port: 7002 spring: rabbitmq: host: localhost port: 5672 username: xx password: xx virtual-host: / listener: simple: acknowledge-mode: manual retry: enabled: true prefetch: 1消费者1代码
package com.example.rabbitmq.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.charset.StandardCharsets; @Component public class RabbitmqListener { @RabbitListener(queues = "testDirectQueue") public void listener(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("01==" + msg); try { channel.basicAck(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }消费者2代码
package com.example.rabbitmq.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.charset.StandardCharsets; @Component public class RabbitmqListener { @RabbitListener(queues = "testDirectQueue") public void listener(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("02==" + msg); try { channel.basicAck(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
发送10条消息查看消费情况
消费者1
消费者2
这里修改下代码,
消费者1消费队列testDirectQueue
消费者2消费队列testDirectQueue02
只修改消费者2的代码即可
package com.example.rabbitmq.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; import java.nio.charset.StandardCharsets; @Component public class RabbitmqListener { @RabbitListener(queues = "testDirectQueue02") public void listener(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); System.out.println("02==" + msg); try { channel.basicAck(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
再次请求生产者接口10次 发现打印结果是
消费者1
消费者2
再次证明消费者消费的是队列不是交换机
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)