目录
RabbitMQ持久化
概念
队列实现持久化
消息实现持久化
不公平分发
预取值
发布确认
发布确认的策略
开启发布确认的方法
单个确认发布
批量确认发布
异步确认发布(重点)
如何处理异步未确认消息
以上 3 种发布确认速度对比
RabbitMQ持久化 概念
我们已经看到了RabbitMQ如何处理任务不丢失的情况,就是依赖于它的消息自动重新入队机制,但是如何保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
队列实现持久化在创建队列的参数中进行设置
channel.queueDeclare("hello", true, false, false, null);
注意:如果之前创建过该名称的队列并且是不支持持久化的,则不能直接将参数修改为true,需要将原来已经存在了的队列进行删除
队列持久化后如下
这个时候即使重启 rabbitmq 队列也依然存在
消息实现持久化要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点此时并没有真正写入磁盘。
如果需要更强有力的持久化策略,参考后边的发布确认。
不公平分发RabbitMQ默认是轮训分发模式,就是两个消费者轮流接受消息,但是这种模式在某些场景下是不恰当的,如果其中一个消费者执行的时间很长,而消息的产生有很快,就会导致大量消息的堆积,效率很低,是不科学的
为了避免上述情况的出现,我们应使用不公平分发(能者多劳)
//设置为不公平分发 channel.basicQos(1);
演示结果如下
预取值
该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认
也就是说,定义一个信道上的最大数量是5,则信道中最多只能有5个待消费的消息
定义方法与不公平分发相同,参数值不同
channel.basicQos(5);//表示最大只能有5个待处理的消息发布确认
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始),使生产者知道消息已经正确到达目的队列,
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信 道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调 方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消 息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认的策略 开启发布确认的方法发布确认默认是没有开启的,如果要开启需要调用方法 /confirm/iSelect,每当你要想使用发布 确认,都需要在 channel 上调用该方法
//开启发布确认 channel.confirmSelect();单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它 被确认发布,后续的消息才能继续发布
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布
批量确认发布与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量
缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。
这种方案仍然是同步的,也一样阻塞消息的发布。
异步确认发布(重点)异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功
如何处理异步未确认消息最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用 ConcurrentlinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
具体实现
public static void publishMessageAsync() throws Exception { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置工厂的IP,连接RabbitMQ的队列 factory.setHost("192.168.31.65"); //设置用户名 factory.setUsername("admin"); //密码 factory.setPassword("123"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); channel.queueDeclare("hello", true, false, false, null); //开启发布确认 channel.confirmSelect(); long start = System.currentTimeMillis(); ConcurrentSkipListMapconcurrentSkipListMap = new ConcurrentSkipListMap<>(); //消息确认成功回调函数 ConfirmCallback AckCallback = (var1, var2) -> { if (var2) { //如果是批量 *** 作进行批量删除 //删除已经确认的消息 ConcurrentNavigableMap map = concurrentSkipListMap.headMap(var1); //根据编号得到被确认的消息 map.clear(); } else { //不是批量就直接删除 concurrentSkipListMap.remove(var1); } System.out.println("确认的消息" + var1); }; //消息确认失败的回调函数 ConfirmCallback nackCallback = (var1, var2) -> { System.out.println("未确认的消息" + var1); }; //准备消息的监听器 channel.addConfirmListener(AckCallback, nackCallback); //异步通知 //发送1000条消息 for (int i = 0; i < 1000; i++) { String message = i + ""; channel.basicPublish("", "hello", null, message.getBytes()); //记录要发送的所有消息 concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message); } long end = System.currentTimeMillis(); System.out.println("TIME:" + (end - start)); }
执行
public static void main(String[] args) throws Exception { publishMessageAsync(); }
结果可以看到异步的效果
以上 3 种发布确认速度对比单独发布消息
同步等待确认,简单,但吞吐量非常有限。
批量发布消息
批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
异步处理:
最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)