RabbitMQ消息中间件

RabbitMQ消息中间件,第1张

RabbitMQ消息中间件 一、概述 1. MQ

MQ:Message Queue消息队列,是在消息传输过程中保存消息的容器。用于分布式系统间通信,应用程序和应用程序之间的通信方法。

作用:在项目中,可将一些无需及时返回且耗时的 *** 作提取出来,进行异步处理,从而节省了服务器的请求响应时间。

优势和劣势

优势

应用解耦:提高系统容错性和可维护性

异步提速:将消息交给MQ后,直接响应请求,而无需消息处理完成。提高系统容错性和可维护性

削峰填谷:提高系统稳定性

劣势

降低系统的可用性:MQ是外部依赖,一旦宕机会对系统造成影响系统复杂度提高:MQ的引入是系统的消息从同步变为异步,如何保证消息未被重复消费、消息的顺序性、消息丢失的情况?一致性问题:MQ的多个消费者处理结果有成功有失败,如何保证消息处理一致性?

MQ消息通信模型

AMQP高级消息队列协议,进程间传递异步消息的网络协议,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,不受客户端、中间件、开发语言的限制。JMS,JAVA消息服务应用接口,是JAVA中关于面向消息中间件的API,用于两个应用程序之间,或分布式系统中发送消息,进行异步通信。

消息队列产品

2. RabbitMQ

基础架构

Broker:接受和分发消息的应用,RabbitMQ Server就是 Message BrokerVirtual host:虚拟主机,类似于namespacce,当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,每个用户在自己的 vhost 创建 exchange/queue 等Connection:publisher/consumer 和 broker 之间的 TCP 连接Channel:Channel 作为轻量级的 Connection
极大减少了 *** 作系统建立 TCP connection 的开销Exchange:根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。Queue:消息最终被送到这里等待 consumer 取走exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。

工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

二、入门案例

生产者

    创建Connection创建Channel创建Queue发送消息释放资源
//1. 创建Connection
	//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
	//设置参数
factory.setHost("RabbitMQ主机ip");
factory.setPort(5672); //端口  默认值 5672
factory.setVirtualHost("/demo");//虚拟机 默认值/
factory.setUsername("jeff");//用户名 默认 guest
factory.setPassword("jeff");//密码 默认值 guest
	//获取连接
Connection connection = factory.newConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 创建队列:若没有该名字会创建队列
//队列名、持久化(MQ重启后还在)、独占(只有一个消费者)、自动删除、其它参数
channel.queueDeclare("hello_world",true,false,false,null);
//4. 发送消息
//交换机名称(简单模式使用默认)、路由名称、配置信息、发送的消息
String body = "hello rabbitmq~~~";
channel.basicPublish("","hello_world",null,body.getBytes());
//5. 释放资源
channel.close();
connection.close();

消费者:不需要关闭资源,因为需要一直监听MQ

//1. 创建Connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.98.133");//ip  默认值 localhost
factory.setPort(5672); //端口  默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("demo");//用户名 默认 guest
factory.setPassword("demo");//密码 默认值 guest
Connection connection = factory.newConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 创建Queue:生产者已经创建了,消费者就不会创建了
channel.queueDeclare("hello_world",true,false,false,null);
//4. 接收消息
	//创建消费者
Consumer consumer = new DefaultConsumer(channel){
    //1. consumerTag:标识
    //2. envelope:获取一些信息,交换机,路由key...
    //3. properties:配置信息
    //4. body:数据
     @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("consumerTag:"+consumerTag);
        System.out.println("Exchange:"+envelope.getExchange());
        System.out.println("RoutingKey:"+envelope.getRoutingKey());
        System.out.println("properties:"+properties);
        System.out.println("body:"+new String(body));
    }
}
	//接收消息
channel.basicConsume("hello_world",true,consumer);

三、工作模式 1. Work queues

工作队列模式:与入门程序的简单模式相比,多了一个或一些消费端。

多个消费端共同消费同一个队列中的消息,消费者之间对于同一个消息是竞争关系,即同一个消息不能被两个消费者使用。对于任务过重或任务较多情况,使用工作队列可以提高任务处理速度工作队列模式的交换机使用的是默认交换机

代码示例

Work queues模式的多个消费端的代码相同无需修改,只要复制多个消费者的代码同时运行,多个消费者就会按照顺序逐个到队列中获取消息。

2. Pub/Sub

订阅模式:生产者将消息发送给指定的交换机,所有消费者共享绑定队列中的消息。

Exchange交换机:接收生产者发送的消息;将消息交按照指定的规则交给指定的消息队列,或者丢弃。

交换机只负责转发消息,不负责存储消息,因此若没有队列与交换机绑定,或则没有符合路由规则的队列,则消息就会丢失

Exchange交换机类型:

Fanout:广播,将消息交给所有绑定到交换机的队列Direct:定向,将消息交给符合指定routing key的队列Topic:通配符,将消息交给符合routing pattern(路由模式)的队列

代码示例

生产者

//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("172.16.98.133");//ip  默认值 localhost
factory.setPort(5672); //端口  默认值 5672
factory.setVirtualHost("/itcast");//虚拟机 默认值/
factory.setUsername("demo");//用户名 默认 guest
factory.setPassword("demo");//密码 默认值 guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();

String exchangeName = "test_fanout";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6. 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机

channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");

String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());

//9. 释放资源
channel.close();
connection.close();

消费者:不同的消费者只要监听不同的消息队列即可

...
// 获取queue2的消息
channel.basicConsume(queue2Name,true,consumer);
...
// 获取queue1的消息
channel.basicConsume(queue1Name,true,consumer);
3. Routing

路由模式:

生产者与交换机发送消息、交换机与队列绑定,都需要指定一个Routing Key交换机根据Routing Key将消息交给绑定的队列

代码示例

生产者

//1.创建连接工厂
//2. 设置参数
//3. 创建连接 Connection
//4. 创建Channel
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6. 创建队列

...
    
//7. 绑定队列和交换机

//队列1绑定 error
channel.queueBind(queue1Name,exchangeName,"error");
//队列2绑定 info  error  warning
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");

String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";
//8. 发送消息:指定routing key为warning
channel.basicPublish(exchangeName,"warning",null,body.getBytes());

//9. 释放资源
channel.close();
connection.close();

消费者:不同的消费者只要监听不同的消息队列即可

...
// 获取queue2的消息
channel.basicConsume(queue2Name,true,consumer);
...
// 获取queue1的消息
channel.basicConsume(queue1Name,true,consumer);
4. Topics 通配符模式

通配符模式:Topic类型与Direct相比,Topic类型Exchange可以将队列绑定在Routing key的时候使用。通过Topic模式的通配符可以实现Pub/Sub模式与Routing模式,更加的灵活

#匹配一个或多个词*匹配一个词

代码示例

生产者

//1.创建连接工厂
//2. 设置参数
//3. 创建连接 Connection
//4. 创建Channel

String exchangeName = "test_topic";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//6. 创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");

String body = "日志信息:张三调用了findAll方法...日志级别:info...";
//8. 发送消息
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

//9. 释放资源
channel.close();
connection.close();

消费者:不同的消费者只要监听不同的消息队列即可

...
// 获取queue2的消息
channel.basicConsume(queue2Name,true,consumer);
...
// 获取queue1的消息
channel.basicConsume(queue1Name,true,consumer);
四、框架整合 1. Spring整合

生产者



    
    

    
    
    
    

    
    


    
    
    

    
    

    
    
        
            
            
        
    

    

    
    
    
    
    
    
    

    
        
            
            
            
        
    

    
    

rebbitmq.properties:MQ的主机信息

rabbitmq.host=172.16.98.133
rabbitmq.port=5672
rabbitmq.username=jeff
rabbitmq.password=123456
rabbitmq.virtual-host=/demo

spring-rabbitmq-properties.xml

    引入rebbit命名空间

    
    

    加载配置文件

    
    

    定义connection 工厂

    
    
    
    

    定义队列

    
    
    

    定义交换机,并绑定队列

        
    
    
    
        
            
            
        
    
    
    
    
        
            
        
    
    
    
    
        
            
            
        
    
    

    定义rabbitTemplate对象 *** 作,以便发送消息

    
    

消费者



    
    

    
    

    
    
    
        
       
    

    加载配置文件

    
    

    定义rabbitmq connection Factory

    
    

    导入监听类

    监听程序

    public class SpringQueueListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            //打印消息
            System.out.println(new String(message.getBody()));
        }
    }
    

    Spring维护

    
    

    创建监听容器,监听队列

    
            
    
    

    测试

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
    public class ConsumerTest {
        @Test
        public void test1(){
            boolean flag = true;
            while (true){}
        }
    }
    
2. SpringBoot整合

生产者

    导入依赖

        
            org.springframework.boot
            spring-boot-starter-parent
            2.1.4.RELEASE
        
    
        
            
                org.springframework.boot
                spring-boot-starter-amqp
            
            
                org.springframework.boot
                spring-boot-starter-test
            
        
    

    配置yml基本配置

    spring:
      rabbitmq:
        host: 172.16.98.133
        username: guest
        password: guest
        virtual-host: /
        port: 5672
    

    定义交换机、队列、绑定关系的配置类

    @Configuration
    public class RabbitMQConfig {
        //交换机
        @Bean("bootExchange")	//返回值加载到Spring
        public Exchange bootExchange() {
            Exchange exchange = ExchangeBuilder.topicExchange("boot_topic_exchange").durable(true).build;
            return exchange;
        }
        //队列
        @Bean("bootQueue")
        public Queue bootQueue() {
            return QueueBuilder.durable("boot_queue").build;
        }
        //绑定
        @Bean
        public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifer("bootExchange") Exchange exchange) {
             return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();	
        }
    }
    

    注入RabbitTemplate,调用方法,完成消息发送

    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class ProducerTest {
        //注入RabbitTemplate
        private RabbitTemplate rabbitTemplate;
        @Test
        public void testSend() {
            rabbitTemplate.convertAndSend("bootExchange_name", "routing_key", "MQ message");
        }
    }
    

消费者

    导入依赖

        
            org.springframework.boot
            spring-boot-starter-parent
            2.1.4.RELEASE
        
    
        
            
                org.springframework.boot
                spring-boot-starter-amqp
            
            
                org.springframework.boot
                spring-boot-starter-test
            
        
    

    配置yml基本配置

    spring:
      rabbitmq:
        host: 172.16.98.133
        username: guest
        password: guest
        virtual-host: /
        port: 5672
    

    定义监听类,监听队列

    @Component
    public class RabbitMQListener {
        public void ListenerQueue() {
            @RabbitListener(queues = "boot_queue")
            public void ListenerQueue(Message message) {
                sout(message);
            }
        }
    }
    
五、高级特性 1. 生产者消息的可靠投递

消息从 product 到 exchange 则会返回一个 confirmCallback 。

在回调函数中根据成功失败做业务处理

消息从 exchange 到 queue 投递失败则会返回一个 returnCallback 。

只有在失败的时候才会调用,因此此处不做业务处理,只要写到日志中即可

/confirm/i确认模式

    创建/confirm/i回调函数:实现RabbitTemplate./confirm/iCallback接口

    @Component
    public class My/confirm/iCallback implements RabbitTemplate./confirm/iCallback {
        //回调函数
    
        
        @Override
        public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                System.out.println("成功======");
            } else {
                System.out.println("失败,失败的原因:" + cause);
            }
        }
    }
    

    开启/confirm/i模式,设置回调函数

    开启/confirm/i模式

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        publisher-/confirm/is: true	#开启/confirm/i模式
    server:
      port: 8080
    

    设置回调函数

    @RestController
    @RequestMapping("/test")
    public class TestController {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        //注入回调函数的类
        @Autowired
        private My/confirm/iCallback my/confirm/iCallback;
    
        @RequestMapping("/send1")
        public String send1(){
            //设置/confirm/i回调函数
            rabbitTemplate.set/confirm/iCallback(my/confirm/iCallback);
    		//发送消息
            rabbitTemplate.convertAndSend("exchange_direct_demo01","item.insert","数据");
    
            return "ok";
        }
    }
    

    测试:访问对应的网址

return退回模式

    创建return回调函数:只有出错了才会调用

    @Component
    public class MyReturnCallBack  implements RabbitTemplate.ReturnCallback {
        
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("消息本身:"+new String(message.getBody()));
            System.out.println("退回的replyCode是:"+replyCode);
            System.out.println("退回的replyText是:"+replyText);
            System.out.println("退回的exchange是:"+exchange);
            System.out.println("退回的routingKey是:"+routingKey);
        }
    }
    

    开启return退回模式,设置return回调函数

    spring:
      rabbitmq:
        virtual-host: /pay
        username: lisi
        password: lisi
        port: 5672
        publisher-/confirm/is: true # 开启confirm 模式
        publisher-returns: true # 开启return模式
    
    @RestController
    @RequestMapping("/test")
    public class TestController {
        @Autowired
        private MyReturnCallBack myReturnCallBack;
    
        @RequestMapping("/send1")
        public String send1(){
    
            //设置return模式
            rabbitTemplate.setReturnCallback(myReturnCallBack);
    
            rabbitTemplate.convertAndSend("exchange_direct_demo01","item.insert","数据");
    
            return "ok";
        }
    }
    

    测试,访问网址

2. 消费者ACK确认机制

生产者通过可靠性投递,避免在消息发送过程中出现问题,但消费者也可能未收到消息,或者在收到消息后执行代码出现异常。因此需要消费者进行确认签收消息。

ACK机制:

自动确认none :只要消息被消费者接收到,就会自动确认收到。不关心消费者业务处理是否异常手动确认manual:在业务处理成功/失败后,调用对应的方法手动确认/拒绝。

channel.basicAck()确认签收channel.basicNack()拒绝签收

代码实现:消费者

    设置确认模式

    spring:
      rabbitmq:
        virtual-host: /pay
        username: lisi
        password: lisi
        port: 5672
        publisher-/confirm/is: true # 开启confirm 模式
        publisher-returns: true # 开启return模式
        listener:
          simple:
            acknowledge-mode: manual  # 手动模式 需要消费端自己自定义ack
            prefetch: 1 # 设置每一个消费端 最多处理的未确认的消息的数量
    
    

    消费者监听队列:ACK确认

    @Component
    @RabbitListener(queues = "queue_demo01",concurrency = "10-100") //指定要监听的队列名称
    public class MyRabbitListener {
        //消息、通道、消息内容
        @RabbitHandler//处理消息
        public void msg(Message message, Channel channel, String msg){
            //模拟处理业务
            try {
                int i=1/0;
    			//确认签收  参数:消息序号 消息批量确认
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    //拒绝签收 参数:消息序号 消息批量拒签 消息丢弃
                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }
            }
        }
    }
    
    

保证消息高可靠传输

    持久化:Exchange、Queue、Message生产者确认:/confirm/i、Return消费者确认:ACK
3. 消费端限流

如果并发量大的情况下,生产方不停的发送消息,此时消息在队列中堆积很多,当消费端启动,消费端有可能因为瞬间涌入大量消息而垮掉。此时我们可以在消费端进行限流 *** 作,每秒钟放行多少个消息。

spring:
  rabbitmq:
    virtual-host: /pay
    username: lisi
    password: lisi
    port: 5672
    publisher-/confirm/is: true # 开启confirm 模式
    publisher-returns: true # 开启return模式
    listener:
      simple:
        acknowledge-mode: manual  # 手动模式 需要消费端自己自定义ack
        prefetch: 1 # 设置每一个消费端 最多处理的未确认的消息的数量

4. 延迟队列

延迟队列:即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

在rabbitmq中,并没有延迟队列概念,但可以使用ttl 和死信队列的方式进行达到延迟的效果。

4.1 TTL过期时间

TTL:当消息到达存活时间后,还没有被消费,会被自动清除。RabbitMQ设置过期时间有两种:

针对某一个队列设置过期时间 ;队列中的所有消息在过期时间到之后,如果没有被消费则被全部清除针对某一个特定的消息设置过期时间;队列中的消息设置过期时间之后,如果这个消息没有被消息则被清除。

由于消息存储在消息队列中,因此消息的处理与删除都是按照先进先出的原则,所以即便当前消息过期时间到达了,如果其前面的消息未被消费或删除,则当前消息也不会被删除。因此可能会造成时间差。

代码示例

创建过期队列:createqueuettl1()

@Configuration
public class TtlConfig {
    //创建过期队列
    @Bean
    public Queue createqueuettl1(){
        //设置队列过期时间为10000 10S钟
        return QueueBuilder.durable("queue_demo02").withArgument("x-message-ttl",10000).build();
    }

    //创建交换机
    @Bean
    public DirectExchange createExchangettl(){
        return new DirectExchange("exchange_direct_demo02");
    }

    //创建绑定
    @Bean
    public Binding createBindingttl(){
        return BindingBuilder.bind(createqueuettl1()).to(createExchangettl()).with("item.ttl");
    }
}

创建过期消息

@RestController
@RequestMapping("/test")
public class TestController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private My/confirm/iCallback my/confirm/iCallback;

    @Autowired
    private MyReturnCallBack myReturnCallBack;

    @RequestMapping("/send5")
    public String send5(){
        //生产者 发送过期消息
        rabbitTemplate.convertAndSend("queue_order_queue1", (Object) "延迟队列的消息:orderId的值:123456", new MessagePostProcessor() {
            //设置过期时间 设置消息的一些属性
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");//设置过期时间 单位是毫秒
                return message;
            }
        });
        return "ok";
    }
}
4.2 死信队列

死信队列:当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是Dead Letter Exchange(死信交换机 简写:DLX)。成为死信的三种条件:

队列消息长度到达限制;消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;原队列存在消息过期设置,消息到达超时时间未被消费;

代码示例

    创建接收生产者发送信息的消息队列queue1,要设置死信交换机

    @Bean
    public Queue createqueuetdelq1(){
        return QueueBuilder
            .durable("queue_demo03_deq")	//设置队列名称
            .withArgument("x-max-length",1)//设置队列的长度,超过长度就是死信
            .withArgument("x-message-ttl",10000)//设置队列的消息过期时间 10S
            .withArgument("x-dead-letter-exchange","exchange_direct_demo03_dlx")//设置死信交换机名称
            .withArgument("x-dead-letter-routing-key","item.dlx")//设置死信路由key  item.dlx 就是routingkey
            .build();
    }
    

    创建接收死信交换机发送的死信队列queue2

    @Bean
    public Queue createqueuetdlq2(){
        return QueueBuilder.durable("queue_demo03").build();
    }
    

    创建转发死信的交换机DLX

    @Bean
    public DirectExchange createExchangedel(){
        return new DirectExchange("exchange_direct_demo03_dlx");
    }
    

    绑定死信交换机和死信队列:注意routing key要与转发的相同

    @Bean
    public Binding createBindingdel(){
        return BindingBuilder.bind(createqueuetdlq()).to(createExchangedel()).with("item.dlx");
    }
    
    
    
4.3 延迟队列

举例:下订单之后,30分钟如果还未支付则,取消订单回滚库存。

    导入依赖

    
    
        4.0.0
    
        com.itheima
        itheima-springboot-rabbitmq-demo01
        1.0-SNAPSHOT
    
        
            org.springframework.boot
            spring-boot-starter-parent
            2.1.4.RELEASE
        
    
        
            
                org.springframework.boot
                spring-boot-starter-amqp
            
            
                org.springframework.boot
                spring-boot-starter-web
            
    
    
            
                org.springframework.boot
                spring-boot-starter-test
            
        
    
    
    
    

    配置文件

    spring:
      rabbitmq:
        virtual-host: /pay
        username: lisi
        password: lisi
        port: 5672
        publisher-/confirm/is: true # 开启confirm 模式
        publisher-returns: true # 开启return模式
        listener:
          simple:
            acknowledge-mode: manual  # 手动模式 需要消费端自己自定义ack
            prefetch: 1 # 设置每一个消费端 最多处理的未确认的消息的数量
    
    

    发送过期消息

    @RestController
    @RequestMapping("/test")
    public class TestController {
    
        @Autowired
        private MyReturnCallBack myReturnCallBack;
        
        @RequestMapping("/send6")
        public String send6() {
            //发送消息
            rabbitTemplate.convertAndSend("queue_order_queue1", (Object) "检查是否有支付", new MessagePostProcessor() {
                //创建过期消息
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setExpiration("10000");//设置该消息的过期时间
                    return message;
                }
            });
            return "用户下单成功,10秒钟之后如果没有支付,则过期,回滚订单";
        }
    }
    
    

    定义交换机、队列、绑定关系的配置类

    @Configuration
    public class DelayConfig {
        //创建正常队列
        @Bean
        public Queue createQueue2(){
            return QueueBuilder.durable("queue_order_queue2").build();
        }
    
        //死信队列   --->将来消息发送到这里  这里不设置过期时间,我们应该在发送消息时设置某一个消息(某一个用户下单的)的过期时间
        @Bean
        public Queue createQueue1(){
            return QueueBuilder
                    .durable("queue_order_queue1")
                    .withArgument("x-dead-letter-exchange","exchange_order_delay")//设置死信交换机
                    .withArgument("x-dead-letter-routing-key","item.order")//设置死信路由key
                    .build();
        }
    
        //创建交换机
        @Bean
        public DirectExchange createOrderExchangeDelay(){
            return new DirectExchange("exchange_order_delay");
        }
    
        //创建绑定 将正常队列绑定到死信交换机上
        @Bean
        public Binding createBindingDelay(){
            return BindingBuilder.bind(createQueue2()).to(createOrderExchangeDelay()).with("item.order");
        }
    }
    

    接收消息

    @Component
    @RabbitListener(queues = "queue_order_queue2")
    public class OrderListener {
    
        @RabbitHandler
        public void orderhandler(Message message, Channel channel, String msg) {
            System.out.println("获取到消息:" + msg + ":时间为:" + new Date());
            try {
                System.out.println("模拟检查开始=====start");
                Thread.sleep(1000);
                System.out.println("模拟检查结束=====end");
                System.out.println("用户没付款,检查没通过,进入回滚库存处理");
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

幂等性:指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

以转账为例:

    发送消息消息内容包含了id 和 版本和 金额.消费者接收到消息,则根据ID 和版本执行sql语句,update account set money=money-?,version=version+1 where id=? and version=?如果消费第二次,那么同一个消息内容是修改不成功的。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5708971.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存