目录
简单列队
广播列队
订阅列队
主题列队
Headers列队
学习使用rabbitmq(mac系统)在这里呦!!!
@RestController @RequestMapping("/") public class test{ @Autowired private MQSender mQSender; @GetMapping("/test/{msg}") @ResponseBody public String test(@PathVariable("msg") Object msg){ mQSender.send(msg); return String.valueOf(msg); } }2.编写配置文件:
spring: rabbitmq: username: guestweb password: 123456 port: 15672 host: 127.0.0.1 virtual-host: / listener: simple: #消费者最小数量 concurrency: 10 #消费者最大数量 max-concurrency: 10 #限制消费者每次只处理一条消息,处理完再继续下一条消息 prefetch: 1 #启动时是否默认启动容器,默认true auto-startup: true #被拒绝时重新进入队列 default-requeue-rejected: true # 配置模版 template: retry: #发布重试,默认false enabled: true #重试时间 默认1000ms initial-interval: 1000 #重试最大次数,默认3次 max-attempts: 3 #重试最大间隔时间,默认10000ms max-interval: 10000 #重试间隔的乘数。比如配2.0 第一次等10s,第二次等20s,第三次等40s multiplier: 1.03.简单列队: 3.1:编写Config
@Configuration public class RabbitMqConfig { //列队 public static final String MAIL_QUEUE_NAME = "queue"; @Bean public Queue queue(){ //配置队列名 并设置消息持久化 return new Queue(MAIL_QUEUE_NAME,true); } }3.2:编写简单列队生产者和消费者:
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; //生产者 public void send(Object msg){ log.info("==========>>>>>>>>>>发送消息:"+msg); rabbitTemplate.convertAndSend("queue",msg); } //消费者 @RabbitListener(queues = "queue") public void receiver(Object msg){ log.info("============>>>>>>>>>接收消息:"+msg); } }4.广播列队 (消息会分发到绑定此交换机的queue上) 4.1:编写Config
@Configuration public class RabbitmqConfig { private static final String QUEUE01= "queue01"; private static final String QUEUE02= "queue02"; private static final String EXCHANGE_NAME = "fanout_exchange"//交换机 @Bean public Queue queue1(){ return new Queue(QUEUE01); } @Bean public Queue queue2(){ return new Queue(QUEUE02); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(EXCHANGE_NAME); } @Bean public Binding binding01(){ return BindingBuilder.bind(queue1()).to(fanoutExchange()); } @Bean public Binding binding02(){ return BindingBuilder.bind(queue2()).to(fanoutExchange()); } }4.2:编写工作列队生产者和消费者
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; //生产者 public void send(Object msg){ log.info("==========>>>>>>>>>>发送消息:"+msg); //没有绑定路由键 需要填写路由键为空 rabbitTemplate.convertAndSend("fanout_exchange","",msg); } //消费者01 @RabbitListener(queues = "queue01") public void receiver01(Object msg){ log.info("============>>>>>>>>>queue01接收消息:"+msg); } //消费者02 @RabbitListener(queues = "queue02") public void receiver02(Object msg){ log.info("============>>>>>>>>>queue02接收消息:"+msg); } }5.创建订阅列队 5.1:编写Config
@Configuration public class RabbitmqConfig { private static final String QUEUE01= "queue01"; private static final String QUEUE02= "queue02"; private static final String EXCHANGE_NAME = "direct_exchange"//交换机 //路由键 private static final String ROUTINGKEY01 = "queue.red"; private static final String ROUTINGKEY02 = "queue.pink"; @Bean public Queue queue01(){ return new Queue(QUEUE01); } @Bean public Queue queue02(){ return new Queue(QUEUE02); } @Bean public DirectExchange directExchange(){ return new DirectExchange(EXCHANGE_NAME); } @Bean public Binding binding1(){ //将列队01绑定到交换机上为给他设置路由键 return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01); } @Bean public Binding binding2(){ return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02); } }5.2:编写订阅列队生产者和消费者
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; //生产者 public void send(Object msg){ log.info("==========>>>>>>>>>>发送消息:"+msg); rabbitTemplate.convertAndSend("direct_exchange","queue.red",msg); rabbitTemplate.convertAndSend("direct_exchange","queue.pink",msg); } //消费者01 @RabbitListener(queues = "queue01") public void receiver01(Object msg){ log.info("============>>>>>>>>>queue01接收消息:"+msg+"路由键名为:queue.red"); } //消费者02 @RabbitListener(queues = "queue02") public void receiver02(Object msg){ log.info("============>>>>>>>>>queue02接收消息:"+msg+"路由键名为:queue.pink"); } }6.主题列队(使用最多的模式)
(与订阅列队相似,但是对路由键加模糊查找)通配符:*匹配.之前之后的一个字符、#匹配.之前之后的一个或多个字符和空字符。
6.1:编写Config:@Configuration public class RabbitmqConfig { private static final String QUEUE01= "queue01"; private static final String QUEUE02= "queue02"; private static final String EXCHANGE_NAME = "topic_exchange"//交换机 //主题路由键 private static final String ROUTINGKEY01 = "*.red.*"; private static final String ROUTINGKEY02 = "#.red.*"; @Bean public Queue queue01(){ return new Queue(QUEUE01); } @Bean public Queue queue02(){ return new Queue(QUEUE02); } @Bean public TopicExchange topicExchange(){ return new TopicExchange(EXCHANGE_NAME); } @Bean public Binding binding1(){ return BindingBuilder.bind(queue01()).to(directExchange()).with(ROUTINGKEY01); } @Bean public Binding binding2(){ return BindingBuilder.bind(queue02()).to(directExchange()).with(ROUTINGKEY02); } }6.2:编写主题列队生产者和消费者
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; //生产者 public void send(Object msg){ log.info("==========>>>>>>>>>>发送消息:"+msg); rabbitTemplate.convertAndSend("topic_exchange","queue.red.demo",msg); rabbitTemplate.convertAndSend("topic_exchange","test.queue.red.demo",msg); } //消费者01 @RabbitListener(queues = "queue01") public void receiver01(Object msg){ log.info("============>>>>>>>>>queue01接收消息:"+msg+"主题路由键名为:*.red.*"); } //消费者02 @RabbitListener(queues = "queue02") public void receiver02(Object msg){ log.info("============>>>>>>>>>queue02接收消息:"+msg+"主题路由键名为:#.red.*"); } }7.Headers模式(性能较低基本上不会使用) 7.1:编写Config
@Configuration public class RabbitMqConfig { private static final String QUEUE01= "queue01"; private static final String QUEUE02= "queue02"; private static final String EXCHANGE_NAME = "headersExchange"//交换机 //主题路由键 private static final String ROUTINGKEY01 = "*.red.*"; private static final String ROUTINGKEY02 = "#.red.*"; @Bean public Queue queue01() { return new Queue(QUEUE01); } @Bean public Queue queue02() { return new Queue(QUEUE02); } @Bean public HeadersExchange headersExchange(){ return new HeadersExchange(EXCHANGE_NAME); } @Bean public Binding binding01(){ Map7.2:编写Headers模式生产者和消费者map = new HashMap<>(); map.put("queueName","queue01"); map.put("bindType","whereAll"); return BindingBuilder.bind(queue01()).to(headersExchange()).whereAll(map).match(); } @Bean public Binding binding02(){ Map map = new HashMap<>(); map.put("queueName","queue02"); map.put("bindType","whereAny"); return BindingBuilder.bind(queue02()).to(headersExchange()).whereAny(map).match(); } }
@Service @Slf4j public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; public void send(Object msg){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("queueName","queue01"); messageProperties.setHeader("bindType","whereAll"); messageProperties.setHeader("queueName","queue02"); messageProperties.setHeader("bindType","whereAll"); Message message = new Message(msg, messageProperties); rabbitTemplate.send("headersExchange",null,message); } @RabbitListener(queues = "queue01") public void receiver01(byte[] msg){ log.info("============>>>>>>>>>queue01接收消息:"+new String(msg); } @RabbitListener(queues = "queue02") public void receiver02(byte[] msg){ log.info("============>>>>>>>>>queue02接收消息:"+new String(msg); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)