一、RabbitMq
1、入门案列2、消息应答
1、自动应答2、手动应答 3、持久化
1、队列持久化2、消息持久化 4、发布确认5、交换机
1、无名交换机2、Fanout2、Direct exchange3、Topic exchange 6、死信队列
1、演示TTL过期2、延迟队列 二、SpringBoot整合RabbitMq
1、SpringBoot演示延迟队列2、SpringBoot发布确认3、SpringBoot消息应答3、RabbitMq备份交换机
一、RabbitMq安装好后,访问http://localhost:15672/,账号密码默认都是guest
pom.xml配置
1、入门案列org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-configuration-processororg.springframework.boot spring-boot-starter-testtest org.mybatis.spring.boot mybatis-spring-boot-starter2.1.0 mysql mysql-connector-javaorg.projectlombok lombokprovided com.alibaba fastjson1.2.51 org.mybatis.generator mybatis-generator-core1.4.0 org.springframework.boot spring-boot-starter-amqp
生产者
import com.rabbitmq.client.*; public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("admin"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息发送完毕"); } }
消费者
import com.rabbitmq.client.*; public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); //channel 实现了自动 close 接口 自动关闭 不需要显示关闭 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //推送的消息如何进行消费的接口回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { System.out.println(consumerTag); String message = new String(delivery.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }2、消息应答
1、自动应答消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了一半,突然它挂掉了,会发生什么情况?
RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制。消息应答就是 :消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡。自动应答性能高数据存在丢失情况。手动应答性能低数据不丢失。 企业中一半都选用手动应答。上面入门案例(默认)就是自动应答。
2、手动应答代码案列:
手动应答效果演示:生产者发送两条信息,消费者1和消费者2会轮询消费。此时把消费者1停掉,消费者1的消息不会丢,因为消费者1模拟了睡眠10秒,没有消费完消息就停掉了,但消息不会丢失。消费者2会把两条消息都消费掉
工具类
public class RabbitMqUtils { //得到一个连接的 channel public static Channel getChannel() throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
生产者
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.Scanner; public class Task02 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel();) { channel.queueDeclare(QUEUE_NAME, true, false, false, null); //从控制台当中接受信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("发送消息完成:" + message); } } } }
消费者1
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Worker03 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody()); System.out.println("任务三:接收到消息:" + receivedMessage); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); }; System.out.println("C2 消费者启动等待消费.................. "); boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback); } }
消费者2
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Worker04 { private static final String QUEUE_NAME="hello"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback=(consumerTag, delivery)->{ String receivedMessage = new String(delivery.getBody()); System.out.println("任务四:接收到消息:"+receivedMessage); if (receivedMessage.equals("1")){ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }; CancelCallback cancelCallback=(consumerTag)-> {System.out.println(consumerTag + "消费者取消消费接口回调逻辑");}; System.out.println("C2 消费者启动等待消费.................. "); boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
手动应答方法:
1、Channel.basicAck(用于肯定确认),RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。
2、Channel.basicNack(用于否定确认)。
3、Channel.basicReject(用于否定确认),与Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。
之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
//声明一个队列时,把第二个参数设置成true。标识该队列持久化 channel.queueDeclare(QUEUE_NAME, true, false, false, null);2、消息持久化
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN,表示消息持久化 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());4、发布确认
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认的代码我就不单独写了,在后面整合SpringBoot的时候再写。
5、交换机RabbitMQ消息传递模型的核心思想:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中
临时队列:每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除
//声明一个临时队列 String queueName = channel.queueDeclare().getQueue();
绑定(bindings):binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系,下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
Exchanges 的类型 : 直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout);
1、无名交换机
前面我们没学交换机,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串("")进行标识。
//第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());2、Fanout
生产者
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import java.util.Scanner; public class Produce { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } } }
消费者1
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Consumer01 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("消费者1 : 控制台打印接收到的消息" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消费者2
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Consumer02 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); //把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("消费者2 : 控制台打印接收到的消息" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
先启动两个消费者,在启动生产者。当我们向交换机发送消息时,可以发现两个消费者都可以同时消费发送的消息。说白了,我们发送字符串"aaa",两个消费者都能消费该条信息,这就是Fanout交换机作用。
2、Direct exchange
生产者
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.util.HashMap; import java.util.Map; public class Producer { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建多个 bindingKey MapbindingKeyMap = new HashMap<>(); bindingKeyMap.put("info", "普通 info 信息"); bindingKeyMap.put("warning", "警告 warning 信息"); bindingKeyMap.put("error", "错误 error 信息"); //debug 没有消费这接收这个消息 所有就丢失了 bindingKeyMap.put("debug", "调试 debug 信息"); for (Map.Entry bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message + ";绑定key:" + bindingKey); } } } }
消费者1
public class Consumer01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "test-queue-direct01"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message; System.out.println("错误日志已经接收" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消费者2
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Consumer02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "test-queue-direct02"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消 息:" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
和Fanout交换机相比,Direct功能更强大,可以指定哪个消费者消费哪类消息。
3、Topic exchangeTopic比direct更强大,可以根据路由键*#号匹配。
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
通过下面代码理解*和#的区别。。。
生产者
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.util.HashMap; import java.util.Map; public class Producer { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //创建多个 bindingKey MapbindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); for (Map.Entry bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message + ";绑定key:" + bindingKey); } } } }
消费者1
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Consumer01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //声明 Q1 队列与绑定关系 String queueName = "Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("接收队列 :" + queueName + ";绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
消费者2
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Consumer02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //声明 Q2 队列与绑定关系 String queueName = "Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("接收队列 :" + queueName + " 绑 定 键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
注意:当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了。
如果队列绑定键当中没有#和出现,那么该队列绑定类型就是 direct 了。
死信队列:当消息消费发生异常时,会把异常消息投入到一个队列中,防止消息丢失。
死信的来源
消息 TTL 过期队列达到最大长度(队列满了,无法再添加数据到 mq 中)消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false. 1、演示TTL过期
生产者
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; public class Producer { private static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); //设置消息的 TTL 时间 10秒 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); //该信息是用作演示队列个数限制 for (int i = 1; i < 11; i++) { String message = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes()); System.out.println("生产者发送消息:" + message); } } } }
消费者01
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.util.HashMap; import java.util.Map; public class Consumer01 { //普通交换机名称 private static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机名称 private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //声明死信和普通交换机 类型为 direct channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明死信队列 String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); //死信队列绑定死信交换机与 routingkey channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); //正常队列绑定死信队列信息 Mapparams = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); String normalQueue = "normal-queue"; channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer01 接收到消息" + message); }; channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); } }
消费者02
import cn.yx.zg.rabbitmq.utils.RabbitMqUtils; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); String deadQueue = "dead-queue"; channel.queueDeclare(deadQueue, false, false, false, null); channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi"); System.out.println("等待接收死信队列消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8"); System.out.println("Consumer02 接收死信队列的消息" + message); }; channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); } }
先启动消费者1创建死信交换机和死信队列,然后关掉消费者1模拟消息超过10秒没消费,启动生产者。10秒后发现生产者消息进入了死信队列,然后启动消费者2,消费死信队列的消息
2、延迟队列
二、SpringBoot整合RabbitMq 1、SpringBoot演示延迟队列延迟队列,实际就是TTL过期之后,把数据放到死信队列中
延迟队列图解
application.yml
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
配置文件类代码
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; // 声明 xExchange @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } // 声明 xExchange @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明队列 A ttl 为 10s 并绑定到对应的死信交换机 @Bean("queueA") public Queue queueA() { Mapargs = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(args).build(); } // 声明队列 A 绑定 X 交换机 @Bean public Binding queueaBindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } //声明队列 B ttl 为 40s 并绑定到对应的死信交换机 @Bean("queueB") public Queue queueB() { Map args = new HashMap<>(3); //声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //声明当前队列的死信路由 key args.put("x-dead-letter-routing-key", "YD"); //声明队列的 TTL args.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(args).build(); } //声明队列 B 绑定 X 交换机 @Bean public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queue1B).to(xExchange).with("XB"); } //声明死信队列 QD @Bean("queueD") public Queue queueD() { return new Queue(DEAD_LETTER_QUEUE); } //声明死信队列 QD 绑定关系 @Bean public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
生产者代码
http://localhost/sendMsg/你好
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message); rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: "+message); rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: "+message); }
消费死信队列代码
@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg); } }2、SpringBoot发布确认
application.yml
spring: rabbitmq: #可以确保生产者到交换器exchange消息有没有发送成功,如果发送到交换机失败,会触发回调 publisher-/confirm/i-type: correlated #消息没有被路由到指定的queue时将消息返回,而不是丢弃 publisher-returns: true #return回调 template: mandatory: true host: 127.0.0.1 port: 5672 username: guest password: guest
配置详解(以properties格式解释):
spring.rabbitmq.publisher-confirms=true,设置该属性,如果消息发送到交换机失败,会触发RabbitConfig类中的set/confirm/iCallback回调。
spring.rabbitmq.publisher-returns: true,spring.rabbitmq.template.mandatory: true。同时设置这两个配置,当交换机的消息发送不到队列时,会触发RabbitConfig类中的setReturnCallback回调。
注意:
spring.rabbitmq.template.mandatory属性的优先级高于spring.rabbitmq.publisher-returns的优先级spring.rabbitmq.template.mandatory属性可能会返回三种值null、false、true.spring.rabbitmq.template.mandatory结果为true、false时会忽略掉spring.rabbitmq.publisher-returns属性的值spring.rabbitmq.template.mandatory结果为null(即不配置)时结果由spring.rabbitmq.publisher-returns确定
队列配置类
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class /confirm/iConfig { public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange"; public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue"; //声明业务 Exchange @Bean("/confirm/iExchange") public DirectExchange /confirm/iExchange() { return new DirectExchange(/confirm/i_EXCHANGE_NAME); } // 声明确认队列 @Bean("/confirm/iQueue") public Queue /confirm/iQueue() { return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build(); } // 声明确认队列绑定关系 @Bean public Binding queueBinding(@Qualifier("/confirm/iQueue") Queue queue, @Qualifier("/confirm/iExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("key1"); } }
RabbitConfig配置
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration public class RabbitConfig { @Bean(name = "myRabbitTemplete") public RabbitTemplate myRabbitTemplete(ConnectionFactory connectionFactory){ final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> { if (ack){ //表示消息发送成功,这里不做任何 *** 作 log.info("消息成功发送到交换机"); }else { //表示消息成功发送到服务器,但是没有找到交换器,这里可以记录日志,方便后续处理 log.warn("消息发布到交换器失败,错误原因为:{}"+cause); } }); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { //表示消息发送到交换机,但是没有找到队列,这里记录日志 log.warn("消息没有匹配到对应的队列 -> 消息{},发送到队列失败,应答码:{},原因:{},交换器: {},路由键:{}", message, replyCode, replyText, exchange, routingKey); }); return rabbitTemplate; } }
生产者-Controller
public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange"; @Autowired @Qualifier("myRabbitTemplete") private RabbitTemplate myRabbitTemplete; @GetMapping("sendMessage/{message}") public void sendMessage(@PathVariable String message) { CorrelationData correlationData1 = new CorrelationData("1"); String routingKey = "key1"; myRabbitTemplete.convertAndSend(/confirm/i_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1); CorrelationData correlationData2 = new CorrelationData("2"); routingKey = "key2"; myRabbitTemplete.convertAndSend(/confirm/i_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2); log.info("发送消息内容:{}", message); }
消费者
@Component @Slf4j public class /confirm/iConsumer { public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue"; @RabbitListener(queues = /confirm/i_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) throws Exception{ String msg = new String(message.getBody()); log.info("消费者接受到队列 /confirm/i.queue 消息:{}", msg); } }3、SpringBoot消息应答
application.yml
Spring: rabbitmq: #可以确保生产者到交换器exchange消息有没有发送成功,如果发送到交换机失败,会触发回调 publisher-/confirm/i-type: correlated #消息没有被路由到指定的queue时将消息返回,而不是丢弃 publisher-returns: true #return回调 template: mandatory: true host: 127.0.0.1 port: 5672 username: guest password: guest listener: simple: #消息手动应答 acknowledge-mode: manual #如果消费发生异常,配置重试机制 retry: enabled: true max-attempts: 5 max-interval: 10000 # 重试最大间隔时间 initial-interval: 2000 # 重试初始间隔时间 multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
消费者
public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue"; @RabbitListener(queues = /confirm/i_QUEUE_NAME) public void receiveMsg(Message message, Channel channel) throws Exception{ String msg = new String(message.getBody()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.info("消费者接受到队列 /confirm/i.queue 消息:{}", msg); }3、RabbitMq备份交换机
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)