SpringBoot整合RabbitMQ
创建SpringBoot工程的时候选择Web依赖
导入mq依赖
org.springframework.boot spring-boot-starter-amqpyml配置文件配置mq五大参数,除了hostip和端口,补上用户名密码和虚拟主机/test
spring: rabbitmq: host: 192.168.200.129 port: 5672 username: test password: test virtual-host: /testJava配置类,声明exchange、queue,并且绑定它们
@Configuration public class RabbitMQConfig { //1. 创建exchange - topic @Bean public TopicExchange getTopicExchange(){ return new TopicExchange("boot-topic-exchange",true,false); } //2. 创建queue @Bean public Queue getQueue(){ return new Queue("boot-queue",true,false,false,null);//建议交换机和队列构造都选最长的,持久化赋值true,不认识给默认值 } //3. 绑定在一起 @Bean public Binding getBinding(TopicExchange topicExchange, Queue queue){ return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*"); } //4. 上面要注意的是交换机的名字和队列的名字不能是topic-exchange和topic-queue否则跟内部冲突启动失败 }写一个测试类来模拟生产者发布消息到RabbitMQ,以后这个单元测试类是一个另一个发送消息的工程代码↓
@SpringBootTest class SpringbootRabbitmqApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test void contextLoads() { rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!"); } }@Component public class Consumer { @RabbitListener(queues = "boot-queue") public void getMessage(Object message){ System.out.println("接收到消息:" + message); } }先启动应用程序监听消息,然后启动单元测试类发消息,控制台输出监听接收到的消息,说明整合成功
手动Ack,yml配置增加和java监听增加执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完成,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。
消息应答是默认打开的。我们通过显示的设置autoAsk=true可关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听队列的下一个消费者。
yml文件配置手动ack,而不是默认的自动ack↓
spring: rabbitmq: host: 192.168.200.129 port: 5672 username: test password: test virtual-host: /test listener: simple: acknowledge-mode: manual代码手动ack,修改消费者的监听方法,增加client包的Channel接口和和amqp包的Message↓
@Component public class Consumer { @RabbitListener(queues = "boot-queue") public void getMessage(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到消息:" + msg); //int i = 1 / 0; //手动ack //channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }为什么要改为手动ack?因为如果是自动ack,消费者即使出了异常,没有正常完成任务,mq也收到自动应答表示完成了,就删除mq中的消息,但是如果改为手动ack配置,当消费者出现异常就中断了,没有走后面手动ack的代码,就没有正确应答,mq不会把消息删除,如果消费者没有出现异常,即调用手动ack代码,给mq应答正常,删除消费消息
消息的可靠性
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的 *** 作,效率太低,加了事务 *** 作后,比平时的 *** 作效率至少要慢100倍。
RabbitMQ除了事务,还提供了/confirm/i的确认机制,这个效率比事务高很多,所以先学习这个确认机制。
这个机制三种方式:1普通/confirm/i方式 2批量/confirm/i方式 3异步/confirm/i方式,接下来分别介绍一下↓
消息传递可靠性 普通/confirm/i方式↓
//3.1 开启confirm channel.confirmSelect(); //3.2 发送消息 String msg = "Hello-World!"; channel.basicPublish("","HelloWorld",null,msg.getBytes());//""这是默认交换机,改为xxx交换机测试即可 //3.3 判断消息发送是否成功 if(channel.waitForConfirms()){ System.out.println("消息发送成功"); }else{ System.out.println("发送消息失败"); }
批量/confirm/i方式↓
//3.1 开启confirm channel.confirmSelect(); //3.2 批量发送消息 for (int i = 0; i < 1000; i++) { String msg = "Hello-World!" + i; channel.basicPublish("","HelloWorld",null,msg.getBytes()); } //3.3 确定批量操作是否成功 channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException异步/confirm/i方式↓
//3.1 开启confirm channel.confirmSelect(); //3.2 批量发送消息 for (int i = 0; i < 1000; i++) { String msg = "Hello-World!" + i; channel.basicPublish("","HelloWorld",null,msg.getBytes()); } //3.3 开启异步回调 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple); } });Return机制,监听消息是否从exchange送到了指定的queue↓
/confirm/i只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息的。
可以采用Return机制来监听消息是否从exchange送到了指定的queue中
开启Return机制,并在发送消息时,指定mandatory为true
// 开启return机制 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { // 当消息没有送达到queue时,才会回调执行。 System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!"); } }); // 在发送消息时,换另外一个,方法重载,指定mandatory参数为true channel.basicPublish("","HelloWorld",true,null,"msg".getBytes()); System.in.read();上个例子的基础上,增加返回机制代码,故意设置一个不存在的路由键找不到队列,来进行消息没有到达队列的回调
SpringBoot实现/confirm/i确认机制和return返回机制编写配置文件,增加/confirm/i和return
spring: rabbitmq: host: 192.168.200.129 port: 5672 username: test password: test virtual-host: /test listener: simple: acknowledge-mode: manual publisher-/confirm/i-type: simple publisher-returns: true开启/confirm/i和Return的Java代码配置,核心用到了RabbitTemplate来搞定
@Component public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct//构造本类对象后,调用初始化方法,设置回调来触发下面确认方法和返回方法的调用↓ public void initMethod(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息已经送达到Exchange"); }else{ System.out.println("消息没有送达到Exchange"); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息没有送达到Queue"); } }避免消息重复消费重复消费消息的原因是,消费者没有给RabbitMQ一个ack↓
重复消费 重复消费消息,会对非幂等行 *** 作造成问题↓
幂等性 *** 作:比如数据库删除, *** 作一次和多次效果是一样的。
非幂等性 *** 作:比如添加,而且主键是自增的。
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,先将消息的id放到Redis中,比如
id为0表示(正在执行业务)
id为1表示(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
生产者,发送消息时,指定messageId↓
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(1) //指定消息是否需要持久化 1 - 需要持久化 2 - 不需要持久化 .messageId(UUID.randomUUID().toString()) .build(); String msg = "Hello-World!"; channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());消费者,在消费消息时,根据具体业务逻辑去 *** 作redis↓
DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Jedis jedis = new Jedis("192.168.200.129",6379);//记得修改为自己的Linux服务器ip String messageId = properties.getMessageId(); //1. setnx到Redis中,默认指定value-0 String result = jedis.set(messageId, "0", "NX", "EX", 10); if(result != null && result.equalsIgnoreCase("OK")) { System.out.println("接收到消息:" + new String(body, "UTF-8")); //消费消息了,打印消息 //2. 消费成功,set messageId 1 jedis.set(messageId,"1"); channel.basicAck(envelope.getDeliveryTag(),false); }else { //3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1则手动ack String s = jedis.get(messageId); if("1".equalsIgnoreCase(s)){ channel.basicAck(envelope.getDeliveryTag(),false); } } } };public class Publisher { @Test public void publish() throws Exception { //1. 获取Connection Connection connection = RabbitMQClient.getConnection(); //2. 创建Channel Channel channel = connection.createChannel(); //3. 发布消息到exchange,同时指定路由的规则 //修改的部分↓ AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(1) //指定消息是否需要持久化 1 - 需要持久化 2 - 不需要持久化 .messageId(UUID.randomUUID().toString()) .build(); String msg = "Hello-World!"; channel.basicPublish("","HelloWorld",true,properties,msg.getBytes()); //修改的部分↑ System.out.println("生产者发布消息成功!"); //4. 释放资源 channel.close(); connection.close(); } }3消费者根据jedis调用的api结果,做响应的业务逻辑↓
public class Consumer {//其他消费者 @Test public void consume() throws Exception { //1. 获取连接对象 Connection connection = RabbitMQClient.getConnection(); //2. 创建channel final Channel channel = connection.createChannel(); //3. 声明队列-HelloWorld channel.queueDeclare("HelloWorld",true,false,false,null); //4. 开启监听Queue //修改的部分↓ DefaultConsumer consume = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Jedis jedis = new Jedis("192.168.200.129",6379);//记得修改为自己的Linux服务器ip String messageId = properties.getMessageId(); //1. setnx到Redis中,默认指定value为0 String result = jedis.set(messageId, "0", "NX", "EX", 10); if(result != null && result.equalsIgnoreCase("OK")) { System.out.println("接收到消息:" + new String(body, "UTF-8")); //2. 消费成功,set messageId 1 jedis.set(messageId,"1"); channel.basicAck(envelope.getDeliveryTag(),false); }else { //3. 如果1中的setnx失败,获取key对应的value,如果是0啥都不做,如果是1则手动ack String s = jedis.get(messageId); if("1".equalsIgnoreCase(s)){ channel.basicAck(envelope.getDeliveryTag(),false); } } } }; //修改的部分↑ channel.basicConsume("HelloWorld",true,consume); System.out.println("消费者开始监听队列!"); // System.in.read(); System.in.read();//让程序不停止,只有键盘录入数据才会往下走 //5. 释放资源 channel.close(); connection.close(); } }欢迎分享,转载请注明来源:内存溢出
评论列表(0条)