- 一、绪论
- 二、生产者
- 2.1事务机制
- 2.2/confirm/i模式
- 串行模式
- 批量模式
- 异步模式
- 三、消费者
- 3.1手动ACK
上篇文章介绍了rabbitmq的基本知识、交换机类型实战《【消息队列之rabbitmq】学习RabbitMQ必备品之一》
这篇文章主要围绕着消息确认机制为中心,展开实战;接触过消息中间件的伙伴都知道,消息会存在以下问题:
1、消息丢失问题和可靠性投递问题;
2、消息如何保证顺序消费;
3、消息如何保证幂等性问题,即重复消费问题等等…
本文主要以Rabbitmq消息中间件解决问题一的实践,其他问题小编会重新写文章总结;
故从业务代码设计层面,我们需要保证生产者发送消息可靠性投递到MQ中间件中,其次保证消费者可以从MQ中获取消息并消费成功;
从生产者角度控制消息的可靠性投递实践;rabbitmq提供了以下方式:事务机制和/confirm/i机制;
其他的工具类等相关代码,请移步到《【消息队列之rabbitmq】学习RabbitMQ必备品之一》
-
基础知识:
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
声明启动事务模式:channel.txSelect()
提交事务:channel.txComment()
回滚事务:channel.txRollback() -
实践代码
生产者端代码如下:
package com.itwx.mq.tx; import com.itwx.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class ProviderTx { private static final String QUEUE_NAME = "test_tx_queue"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { // 获取到连接 connection = ConnectionUtil.getConnection(); // 获取通道 channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 启动事务,必须用txCommit()或者txRollback()回滚 channel.txSelect(); // 假设这里处理业务逻辑 String message = "hello, tx message!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //todo 测试异常 int i = 1/ 0; // 提交事务 channel.txCommit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (channel != null) { // 回滚。如果未异常会提交事务,此时回滚无影响 channel.txRollback(); channel.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { } } } }
代码中含有TODO注释,大家可以结合rabbitmq管理界面,自测生产者事务是否生效等等;
1、业务异常产生,消息回滚测试;
2、生产者无异常产生,测试消息是否发送成功;
缺点:
开始事务属于同步 *** 作,消息发送成功后,生产者端处于阻塞状态,需要等待消息中间件接收消息的响应,降低生产者的吞吐量和性能;
/confirm/i主要存在以下三种方式:
方式一:channel.waitForConfirms()普通发送方确认模式(串行模式);
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
使用/confirm/i模式,大家可以考虑一下如果消息发送失败之后,如何处理补偿机制重新发送?redis+定时任务
串行模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端/confirm/i,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传;
1、启动生产者确认模式channel.confirmSelect();
2、等待消息中间件响应结果channel.waitForConfirms();
3、处理返回结果或者捕获异常,触发补偿任务;
- 生产者代码
package com.itwx.mq./confirm/i; import com.itwx.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; public class Provider/confirm/i { private static final String QUEUE_NAME = "test_one_/confirm/i_queue"; public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 启动发送者确认模式 channel./confirm/iSelect(); String message = "hello,message! /confirm/iSelect"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间,发送成功返回true,否则返回false boolean sendResult = channel.waitFor/confirm/is(); if (sendResult) { System.out.print("发送成功"); } channel.close(); connection.close(); } }批量模式
批量模式:producer每发送一批消息后,调用waitForConfirmsOrDie()方法,而此种模式方法无返回值,只能根据异常进行判断。如果确认失败会抛出IOException和InterruptedException。源码如下:
void waitFor/confirm/isOrDie() throws IOException, InterruptedException;
此外注意,写测试demo时,由于存在消息延迟等现象,故发送消息结束之后,主线程休眠5000s或者更多,之后再关闭信道连接;
- 生产者代码
package com.itwx.mq./confirm/i; import com.itwx.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.concurrent.TimeUnit; public class ProviderBatch/confirm/i { private static final String QUEUE_NAME = "test_batch_/confirm/i_queue"; public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 启动发送者确认模式 channel./confirm/iSelect(); String message = "hello,message! /confirm/iSelect"; for (int i = 1; i<=5; i++) { channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送第" + i +"条消息成功"); } // 阻塞线程,等待服务器返回响应。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。 try { channel.waitFor/confirm/isOrDie(); } catch (Exception e) { e.printStackTrace(); } TimeUnit.SECONDS.sleep(5000); //TODO,补偿机制只能依赖于捕获超时异常进行消息补发; channel.close(); connection.close(); } }异步模式
异步模式,开发者可以定义/confirm/iListener实现类处理消息发送成功或者失败情况,重写handleNack和handleAck方法;
handleNack():消息接收失败的通知方法,开发者可以在这里重新投递消息;
handleAck():消息发送成功之前,需要把消息先存起来,比如用KV存储,接收到ack后删除;
- 生产者代码
package com.itwx.mq./confirm/i; import com.itwx.mq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client./confirm/iListener; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeUnit; public class ProviderAsync/confirm/i { private static final String QUEUE_NAME = "test_async_/confirm/i_queue"; public static void main(String[] args) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); // 获取通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 启动发送者确认模式 channel./confirm/iSelect(); String message = "hello,message! /confirm/iSelect"; for (int i = 1; i<=5; i++) { channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("发送第" + i +"条消息成功"); } //异步监听确认和未确认的消息 channel.add/confirm/iListener(new /confirm/iListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { //消息接收失败的通知方法,用户可以在这里重新投递消息 System.out.println(String.format("未确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple)); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { //发送端投递消息前,需要把消息先存起来,比如用KV存储,接收到ack后删除 System.out.println(String.format("确认消息,序号:%d,是否多个消息:%b", deliveryTag, multiple)); } }); //主线程休眠,等待异步回调消息 TimeUnit.SECONDS.sleep(10000); channel.close(); connection.close(); } }三、消费者 3.1手动ACK
如果触发手动ACK机制,需要改动以下东西:
- 将自动ACK改为false;
channel.basicConsume(QUEUE_NAME, false, consumer);
-
考虑以下情况:
1、若未设置手动ACK,消费者获取消息后,发生异常,会发生什么情况?(消息丢失)
2、若设置手动ACK,消费者发生异常,会发生什么情况?(未消费状态)
3、设置手动ACK,消费者宕机,未即使发送ACK确认回调,会发生什么情况?(已消费,未确认)
(消息中间件会将消息标记为待确认状态,不会被重复消息);若再想消费该消息,重启消费者,消息中间件会将该消息标记为待消费状态(从unacked->ready) -
小编demo写了TODO测试用例,注意测试
-
消费者代码
package com.itwx.mq.ack; import com.itwx.mq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerACK { private final static String QUEUE_NAME = "wx_test_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnectionUtil.getConnection(); //创建会话通道,生产者和mq服务所有通信都在channel通道中完成 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //实现消费方法 DefaultConsumer consumer = new DefaultConsumer(channel){ // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //TODO 手动抛异常,造成消息丢失现象 //测试情况2 // int i= 1 / 0; //交换机 String exchange = envelope.getExchange(); //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收 long deliveryTag = envelope.getDeliveryTag(); // body 即消息体 String msg = new String(body,"utf-8"); System.out.println("consumer receive message:" + msg + ",messageId:" + deliveryTag + ",exchange name:" + exchange); //消息消费成功,手动ACK, //测试情况三,注释 // channel.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列,第二个参数:是否自动进行消息确认。 //设置成手动ACK,避免重要消息丢失 channel.basicConsume(QUEUE_NAME, false, consumer); } }
参考资料:
RabbitMQ系列(四)RabbitMQ事务和/confirm/i发送方消息确认——深入解读
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)