消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失: 生产阶段 、 存储阶段 、 消费阶段
所以要从这三个阶段考虑:
在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。
存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步
从 Consumer 角度分析,如何保证消息被成功消费?
Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的 有且仅有一次 。 RocketMQ 择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要有业务端自己保证,主要的方式有两种: 业务幂等 和 消息去重
发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:
顺序消息是指消息的 消费顺序 和 产生顺序 相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。
顺序消息分为 全局顺序消息 和 部分顺序消息 :
部分顺序消息相对比较好实现,生产端需要做到把同 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从同一个 Message Queue 读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。
发送端使用 MessageQueueSelector 类来控制 把消息发往哪个 Message Queue
消费端通过使用 MessageListenerOrderly 来解决单 Message Queue 的消息被并发处理的问题
RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。
要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后 Producer Consumer 的并发设置,也要是一。简单来说,为了保证整个 Topic 全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲 RocketMQ 的高并发、高吞吐的特性了。
有两种方案:
对消息的过滤有三种方式:
电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息, 1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
RocketMQ 是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:
但是目前 RocketMQ 支持的延时级别是有限的:
那么 RocketMQ 怎么实现延时消息的
简单,八个字: 临时存储 + 定时任务
Broker 收到延时消息了,会先发送到主题( SCHEDULE_TOPIC_XXXX )的相应时间段的 Message Queue 中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息。
半消息 :是指暂时还不能被 Consumer 消费的消息, Producer 成功发送到 Broker 端的消息,但是此消息被标记为 暂不可投递 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后, Consumer 才能消费此条消息。
依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:
RocketMQ 实现消息事务 :
死信队列 用于处理无法被正常消费的消息,即 死信消息
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为 死信队列
死信消息的特点:
死信队列的特点:
RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。
NameServer 因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。
RocketMQ 的高可用主要是在体现在 Broker 的读和写的高可用, Broker 的高可用是通过集群和主从实现的。
也就是说 Producer 只能向 Master 角色的 Broker 写入消息, Cosumer 可以从 Master 和 Slave 角色的 Broker 读取消息。
Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave 。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后, Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。
如何达到发送端写的高可用性
注意 : RocketMQ 目前还不支持 Broker 把 Slave 自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker
package cnsdeitservlet;
import javaioIOException;
import javaioOutputStream;
import javaioPrintWriter;
import javasqlConnection;
import javasqlPreparedStatement;
import javasqlSQLException;
import javautilArrayList;
import javaxservletServletException;
import javaxservletannotationWebServlet;
import javaxservlet>
通过rocketmq控制台查看。
除了控制台查看,还可以通过主机地址进行查看,具体步骤如下:1、首先登陆nameserver主机地址。2、执行以下命令(shmqadminconsumerProgress-gconsumeGroupNameeg:shmqadminconsumerProgress-gtest)查看。3、出来的结果BrokerOffset为生产的条数,ConsumerOffset为消费的条数,Diff为堆积的条数。
RocketMQ是阿里巴巴开源的分布式消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
你用的是IBMMQ还是Apache的?一般通过JAVA的JMS可以取得。
例如IBMMQ里有个MQQueue对象
//获取队列实例
MQQueuequeue=qMgraessQueue("TEST_QUEUE",openOptions);
//获取当前队列最长消息的长度
queue()
//获取当前队列最长深度
queue()
等等功能都是提供的,具体你下载个WebSphereMQAPI找到MQQueue一看便知。
以上就是关于阿里云消息中间件(MQ)探秘全部的内容,包括:阿里云消息中间件(MQ)探秘、如何使用 WebSphere MQ 的引用消息传输文件、RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)