RabbitMQ03

RabbitMQ03,第1张

RabbitMQ03

文章目录
  • ①. SpringBoot案例 - 发布与订阅模式
  • ②. SpringBoot案例 - 路由模式
  • ③. SpringBoot案例 - 通配符模式

①. SpringBoot案例 - 发布与订阅模式
  • ①. 生产和消费者工程如下

  • ②. 导入依赖

    
        org.springframework.boot
        spring-boot-starter-amqp
    
    
        org.springframework.boot
        spring-boot-starter-web
    
  • ③. 编写yaml(生产者和消费者一样)
server:
  port: 8080
spring:
  rabbitmq:
    host: 139.198.169.136
    port: 5672
    virtual-host: /myvitrualhost
    username: tang
    password: 9602111022yxTZ@
  • ④. 生产者配置文件如下
@Configuration
public class FanoutRabbitConfig {
    //1. 声明交换机
    @Bean
    public FanoutExchange fanoutOrderExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    //2. 声明队列
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        // return new Queue("TestDirectQueue",true,true,false);
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        return new Queue("weixin.fanout.queue", true);
    }

    //3. 将队列和交换机绑定
    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    public Binding bindingEmail() {
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
    }
    @Bean
    public Binding bindingSms() {
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
    }
    @Bean
    public Binding bindingWeixin() {
        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange());
    }
}
  • ⑤. service代码以及启动后的效果如下
@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //1. 发布与订阅模式
    public void makeOrder(Long userId, Long productId) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend("fanout_order_exchange", "", orderNumer);
    }
}
@SpringBootTest
class ProducerApplicationTests {
    @Autowired
    private OrderService orderService;
    @Test
    void contextLoads() {
        orderService.makeOrder(1L,1L);
    }
}

  • ⑥. 三个消费者代码如下
@Service
@RabbitListener(queues = {"email.fanout.queue"})
public class EmailService {
    @RabbitHandler
    public void messageRevice(String message){
        System.out.println("email----------"+message);
    }
}
@Component
@RabbitListener(queues = {"sms.fanout.queue"})
public class SmsService {
    @RabbitHandler
    public void messageRevice(String message){
        System.out.println("SMS----------"+message);
    }
}
@Component
@RabbitListener(queues = {"weixin.fanout.queue"})
public class WeixinService {
    @RabbitHandler
    public void messageRevice(String message){
        System.out.println("Weixin----------"+message);
    }
}
  • ⑦. 先启动生产者,后启动消费者,可以看到如下演示:
②. SpringBoot案例 - 路由模式
  • ①. 生产者代码如下
@Configuration
public class DirectRabbitConfig {

    //1.声明交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("direct_order_exchange",true,false);
    }
    //2.声明队列
    @Bean
    public Queue emailDirectQueue() {
        return new Queue("email.direct.queue", true);
    }
    @Bean
    public Queue smsDirectQueue() {
        return new Queue("sms.direct.queue", true);
    }
    @Bean
    public Queue weChatDirectQueue() {
        return new Queue("weChat.direct.queue", true);
    }
    //3.队列和交换机绑定
    @Bean
    public Binding bingDirectEmail(){
        return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
    }
    @Bean
    public Binding bingDirectSms(){
        return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
    }
    @Bean
    public Binding bindDirectWeChat(){
        return BindingBuilder.bind(weChatDirectQueue()).to(directExchange()).with("weChat");
    }
}
@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //2. direct模式
    public void makeDirectOrder(Long userId, Long productId) {
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ Sms和微信发送消息
        rabbitTemplate.convertAndSend("direct_order_exchange", "sms", orderNumer);
        rabbitTemplate.convertAndSend("direct_order_exchange", "weChat", orderNumer);
    }
}
  // Direct模式
  @Test
  public void DirectTest(){
      orderService.makeDirectOrder(1L,1L);
  }

  • ②. 消费者代码如下
@Service
@RabbitListener(queues = {"email.direct.queue"})
public class EmailService {
    @RabbitHandler
    public void messageDirectRevice(String message){
        System.out.println("email direct----------"+message);
    }
}
@Component
@RabbitListener(queues = {"sms.direct.queue"})
public class SmsService {
    @RabbitHandler
    public void messageDirectRevice(String message){
        System.out.println("SMS direct----------"+message);
    }
}
@Component
@RabbitListener(queues = {"weChat.direct.queue"})
public class WeixinService {

    @RabbitHandler
    public void messageDirectRevice(String message){
        System.out.println("weChat direct----------"+message);
    }
}
③. SpringBoot案例 - 通配符模式
  • ①. 生产者代码如下
    //3. topic模式
    public void makeTopicOrder(Long userId, Long productId){
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ Sms和微信发送消息
        // 匹配规则:#.sms.#、#.weChat.#
        rabbitTemplate.convertAndSend("topic_order_exchange", "weChat.sms", orderNumer);
    }
  • ②. 消费者使用注解的方式替换配置类
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "email.topic.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "topic_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.TOPIC),
        key = "#.email.#"
))
@Component
public class TopicEmailConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email------topic模式:"+message);
    }
}
@Component
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "sms.topic.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "topic_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.TOPIC),
        key = "#.sms.#"
))
public class TopicSmsConsumer{
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("sms------topic模式:"+message);
    }
}
@Component
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "weChat.topic.queue",autoDelete = "false"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "topic_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.TOPIC),
        key = "#.sms.#"
))
public class TopicWeChatConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("weChat------topic模式:"+message);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5669396.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存