rabbitmq基础

rabbitmq基础,第1张

安装和配置的过程省略。
简单的使用:
1.创建配置类(消费者和生产者都需要):

/*
* 队列和交换机在消费者还是生产者创建都无所谓 主要是 要等两个都创建好过后才能够发消息
*
* */
@Configuration
public class RabbitMqConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";

    //创建业务交换机
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);//扇出交换机 发布订阅
    }

    //创建死信交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);//直接相连的交换机 只有 routerKey相同才会通信
    }

    //创建业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String,Object> map=new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUTING_KEY);//表明死信交换机那个routerKey 是我要发送没用消息的队列
        /*return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();*/
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY)/*.withArguments(map)*/.build();
    }

    //创建业务队列A
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String,Object> map=new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);//绑定要发送死信到哪个交换机
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUTING_KEY);//表明死信交换机那个routerKey 是我要发送没用消息的队列
        /*return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();*/
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEB_ROUTING_KEY)/*.withArguments(map)*/.build();
    }

    //创建死信队列A
    @Bean("deadQueueA")
    public Queue deadQueueA(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUEA_NAME).build();
    }

    //创建死信队列B
    @Bean("deadQueueB")
    public Queue deadQueueB(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUEB_NAME).build();
    }

    //绑定业务交换机和队列 发布订阅模式不需要routerkey
    @Bean
    public Binding businessBindingA(@Qualifier("businessExchange") FanoutExchange exchange,
                                    @Qualifier("businessQueueA") Queue queueA){
        return BindingBuilder.bind(queueA).to(exchange);
    }

    /*在bind的时候一定要注意 不管是怎么做 最后的返回值一定要是Binding对象 否则无法绑定 导致绑定失败*/
    //绑定业务交换机和队列
    @Bean
    public Binding businessBindingB(@Qualifier("businessExchange") FanoutExchange exchange,
                                    @Qualifier("businessQueueB") Queue queueB){
        return BindingBuilder.bind(queueB).to(exchange);
    }

    //绑定死信队列和它的交换机
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadExchange") DirectExchange exchange,
                                      @Qualifier("deadQueueA") Queue queueA){
        return BindingBuilder.bind(queueA).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    //绑定死信队列和它的交换机
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadExchange") DirectExchange exchange,
                                      @Qualifier("deadQueueB") Queue queueB){
        return BindingBuilder.bind(queueB).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

版本自己决定
		<dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-amqpartifactId>
        dependency>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>
生产者:
spring:
  application:
    name: nacos-consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        default-requeue-rejected: false #被拒绝后是否重新入队
        acknowledge-mode: manual #手动应答

消息发送的类:

@Component
public class BusinessMessageSender {
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
======================================================================================
发送消息接口
@RequestMapping("rabbitmq")
@RestController
public class RabbitMqController {

    @Autowired
    private BusinessMessageSender sender;

    @RequestMapping("sendmsg")
    public String sendMsg(String msg){
        System.out.println(msg);
        sender.sendMsg(msg);
        return msg;
    }
}
消费者A:
spring:
  application:
    name: nacos-provider-9001
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        default-requeue-rejected: false #被拒绝后是否重新入队
        acknowledge-mode: manual #手动应答
配置类:
@Configuration
public class RabbitMqConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";

    //创建业务交换机
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);//扇出交换机 发布订阅
    }

    //创建死信交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);//直接相连的交换机 只有 routerKey相同才会通信
    }

    //创建业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String,Object> map=new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUTING_KEY);//表明死信交换机那个routerKey对应的队列 是我要发送没用消息的队列
        /*return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();*/
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY).build();
    }

    //创建死信队列A
    @Bean("deadQueueA")
    public Queue deadQueueA(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUEA_NAME).build();
    }


    //绑定业务交换机和队列 发布订阅模式不需要routerkey
    @Bean
    public Binding businessBindingA(@Qualifier("businessExchange") FanoutExchange exchange,
                                    @Qualifier("businessQueueA") Queue queueA){
        return BindingBuilder.bind(queueA).to(exchange);
    }

    //绑定死信队列和它的交换机
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadExchange") DirectExchange exchange,
                                      @Qualifier("deadQueueA") Queue queueA){
        return BindingBuilder.bind(queueA).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }
}

业务队列A的消息接收类:

@Component
@Slf4j
public class BunsinessMessageReceive {

    @RabbitListener(queues = {RabbitMqConfig.BUSINESS_QUEUEA_NAME})
    public void ReceiveA(Message message, Channel channel) throws Exception{
        String msg=new String(message.getBody());
        log.info("接收到消息A: "+msg);
        boolean ack=true;//是否应答
        Exception exception=null;
        try {
            if (msg.contains("deadletter")){
                throw new IllegalArgumentException("dead letter exception");
            }
        }catch (Exception e){
            ack=false;//拒绝应答
            exception=e;
        }
        if(ack){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }else {
            log.error("消息发生异常 , error message : "+exception.getMessage());
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//拒绝应答
        }
    }
}

业务队列A对应的死信队列A消息接收代码:

@Component
public class DeadLetterMessageReceive {

    //死信队列A消息的收到
    @RabbitListener(queues = {DEAD_LETTER_QUEUEA_NAME})
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

消费者B:

@Configuration
public class RabbitMqConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";

    //创建业务交换机
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);//扇出交换机 发布订阅
    }

    //创建死信交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);//直接相连的交换机 只有 routerKey相同才会通信
    }

    //创建业务队列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String,Object> map=new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);//绑定要发送死信到哪个交换机
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUTING_KEY);//表明死信交换机那个routerKey对应的队列 是我要发送没用消息的队列
        //两种方式绑定  本质上都是一样的
        /*return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();*/
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEB_ROUTING_KEY).build();
    }

    //创建死信队列B
    @Bean("deadQueueB")
    public Queue deadQueueB(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUEB_NAME).build();
    }

    //绑定业务交换机和队列
    @Bean
    public Binding businessBindingB(@Qualifier("businessExchange") FanoutExchange exchange,
                                    @Qualifier("businessQueueB") Queue queueB){
        return BindingBuilder.bind(queueB).to(exchange);
    }

    //绑定死信队列和它的交换机
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadExchange") DirectExchange exchange,
                                                                        @Qualifier("deadQueueB") Queue queueB){
        return BindingBuilder.bind(queueB).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

业务队列B的消息接收类:

//接收消息
@Component
@Slf4j
public class BunsinessMessageReceive {

    @RabbitListener(queues = {RabbitMqConfig.BUSINESS_QUEUEB_NAME})
    public void ReceiveA(Message message, Channel channel) throws Exception{
        String msg=new String(message.getBody());
        log.info("接收到消息B: "+msg);
        boolean ack=true;//是否应答
        Exception exception=null;
        try {
            if (msg.contains("deadletter")){
                throw new IllegalArgumentException("dead letter exception");
            }
        }catch (Exception e){
            ack=false;//拒绝应答
            exception=e;
        }
        if(ack){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }else {
            log.error("消息发生异常 , error message : "+exception.getMessage());
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//拒绝应答
        }
    }
}

业务队列A对应的死信队列A消息接收代码:


@Component
public class DeadLetterMessageReceive {

    //死信队列A消息的收到
    @RabbitListener(queues = {RabbitMqConfig.DEAD_LETTER_QUEUEB_NAME})
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

访问接口:http://localhost:8080/rabbitmq/sendmsg?msg=msg

再看这个:http://localhost:8080/rabbitmq/sendmsg?msg=deadletter
死信队列也同时处理成功了

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/905209.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-15
下一篇 2022-05-15

发表评论

登录后才能评论

评论列表(0条)

保存