- ①. SpringBoot案例 - 发布与订阅模式
- ②. SpringBoot案例 - 路由模式
- ③. SpringBoot案例 - 通配符模式
-
①. 生产和消费者工程如下
-
②. 导入依赖
org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-web
- ③. 编写yaml(生产者和消费者一样)
server: port: 8080 spring: rabbitmq: host: 139.198.169.136 port: 5672 virtual-host: /myvitrualhost username: tang password: 9602111022yxTZ@
- ④. 生产者配置文件如下
@Configuration public class FanoutRabbitConfig { //1. 声明交换机 @Bean public FanoutExchange fanoutOrderExchange() { return new FanoutExchange("fanout_order_exchange", true, false); } //2. 声明队列 @Bean public Queue emailQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { return new Queue("weixin.fanout.queue", true); } //3. 将队列和交换机绑定 //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean public Binding bindingEmail() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()); } @Bean public Binding bindingSms() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()); } @Bean public Binding bindingWeixin() { return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()); } }
- ⑤. service代码以及启动后的效果如下
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; //1. 发布与订阅模式 public void makeOrder(Long userId, Long productId) { // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ fanout rabbitTemplate.convertAndSend("fanout_order_exchange", "", orderNumer); } }
@SpringBootTest class ProducerApplicationTests { @Autowired private OrderService orderService; @Test void contextLoads() { orderService.makeOrder(1L,1L); } }
- ⑥. 三个消费者代码如下
@Service @RabbitListener(queues = {"email.fanout.queue"}) public class EmailService { @RabbitHandler public void messageRevice(String message){ System.out.println("email----------"+message); } }
@Component @RabbitListener(queues = {"sms.fanout.queue"}) public class SmsService { @RabbitHandler public void messageRevice(String message){ System.out.println("SMS----------"+message); } }
@Component @RabbitListener(queues = {"weixin.fanout.queue"}) public class WeixinService { @RabbitHandler public void messageRevice(String message){ System.out.println("Weixin----------"+message); } }
- ⑦. 先启动生产者,后启动消费者,可以看到如下演示:
- ①. 生产者代码如下
@Configuration public class DirectRabbitConfig { //1.声明交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange("direct_order_exchange",true,false); } //2.声明队列 @Bean public Queue emailDirectQueue() { return new Queue("email.direct.queue", true); } @Bean public Queue smsDirectQueue() { return new Queue("sms.direct.queue", true); } @Bean public Queue weChatDirectQueue() { return new Queue("weChat.direct.queue", true); } //3.队列和交换机绑定 @Bean public Binding bingDirectEmail(){ return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email"); } @Bean public Binding bingDirectSms(){ return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms"); } @Bean public Binding bindDirectWeChat(){ return BindingBuilder.bind(weChatDirectQueue()).to(directExchange()).with("weChat"); } }
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; //2. direct模式 public void makeDirectOrder(Long userId, Long productId) { // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ Sms和微信发送消息 rabbitTemplate.convertAndSend("direct_order_exchange", "sms", orderNumer); rabbitTemplate.convertAndSend("direct_order_exchange", "weChat", orderNumer); } }
// Direct模式 @Test public void DirectTest(){ orderService.makeDirectOrder(1L,1L); }
- ②. 消费者代码如下
@Service @RabbitListener(queues = {"email.direct.queue"}) public class EmailService { @RabbitHandler public void messageDirectRevice(String message){ System.out.println("email direct----------"+message); } }
@Component @RabbitListener(queues = {"sms.direct.queue"}) public class SmsService { @RabbitHandler public void messageDirectRevice(String message){ System.out.println("SMS direct----------"+message); } }
@Component @RabbitListener(queues = {"weChat.direct.queue"}) public class WeixinService { @RabbitHandler public void messageDirectRevice(String message){ System.out.println("weChat direct----------"+message); } }③. SpringBoot案例 - 通配符模式
- ①. 生产者代码如下
//3. topic模式 public void makeTopicOrder(Long userId, Long productId){ // 1: 模拟用户下单 String orderNumer = UUID.randomUUID().toString(); System.out.println("用户 " + userId + ",订单编号是:" + orderNumer); // 发送订单信息给RabbitMQ Sms和微信发送消息 // 匹配规则:#.sms.#、#.weChat.# rabbitTemplate.convertAndSend("topic_order_exchange", "weChat.sms", orderNumer); }
- ②. 消费者使用注解的方式替换配置类
// bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "email.topic.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "topic_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.TOPIC), key = "#.email.#" )) @Component public class TopicEmailConsumer { @RabbitHandler public void reviceMessage(String message){ System.out.println("email------topic模式:"+message); } }
@Component // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "sms.topic.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "topic_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.TOPIC), key = "#.sms.#" )) public class TopicSmsConsumer{ @RabbitHandler public void reviceMessage(String message){ System.out.println("sms------topic模式:"+message); } }
@Component // bindings其实就是用来确定队列和交换机绑定关系 @RabbitListener(bindings =@QueueBinding( // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。 value = @Queue(value = "weChat.topic.queue",autoDelete = "false"), // order.fanout 交换机的名字 必须和生产者保持一致 exchange = @Exchange(value = "topic_order_exchange", // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式 type = ExchangeTypes.TOPIC), key = "#.sms.#" )) public class TopicWeChatConsumer { @RabbitHandler public void reviceMessage(String message){ System.out.println("weChat------topic模式:"+message); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)