阿里云消息中间件(MQ)探秘

阿里云消息中间件(MQ)探秘,第1张

消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失: 生产阶段 、 存储阶段 、 消费阶段

所以要从这三个阶段考虑:

在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。

存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步

从 Consumer 角度分析,如何保证消息被成功消费?

Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。

对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的 有且仅有一次 。 RocketMQ 择了确保一定投递,保证消息不丢失,但有可能造成消息重复。

处理消息重复问题,主要有业务端自己保证,主要的方式有两种: 业务幂等 和 消息去重

发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

顺序消息是指消息的 消费顺序 和 产生顺序 相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。

顺序消息分为 全局顺序消息 和 部分顺序消息 :

部分顺序消息相对比较好实现,生产端需要做到把同 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从同一个 Message Queue 读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。

发送端使用 MessageQueueSelector 类来控制 把消息发往哪个 Message Queue

消费端通过使用 MessageListenerOrderly 来解决单 Message Queue 的消息被并发处理的问题

RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。

要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后 Producer Consumer 的并发设置,也要是一。简单来说,为了保证整个 Topic 全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲 RocketMQ 的高并发、高吞吐的特性了。

有两种方案:

对消息的过滤有三种方式:

电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息, 1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

RocketMQ 是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:

但是目前 RocketMQ 支持的延时级别是有限的:

那么 RocketMQ 怎么实现延时消息的

简单,八个字: 临时存储 + 定时任务

Broker 收到延时消息了,会先发送到主题( SCHEDULE_TOPIC_XXXX )的相应时间段的 Message Queue 中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息。

半消息 :是指暂时还不能被 Consumer 消费的消息, Producer 成功发送到 Broker 端的消息,但是此消息被标记为 暂不可投递 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后, Consumer 才能消费此条消息。

依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:

RocketMQ 实现消息事务 :

死信队列 用于处理无法被正常消费的消息,即 死信消息

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为 死信队列

死信消息的特点:

死信队列的特点:

RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。

NameServer 因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。

RocketMQ 的高可用主要是在体现在 Broker 的读和写的高可用, Broker 的高可用是通过集群和主从实现的。

也就是说 Producer 只能向 Master 角色的 Broker 写入消息, Cosumer 可以从 Master 和 Slave 角色的 Broker 读取消息。

Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave 。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后, Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。

如何达到发送端写的高可用性

注意 : RocketMQ 目前还不支持 Broker 把 Slave 自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker

package cnsdeitservlet;

import javaioIOException;

import javaioOutputStream;

import javaioPrintWriter;

import javasqlConnection;

import javasqlPreparedStatement;

import javasqlSQLException;

import javautilArrayList;

import javaxservletServletException;

import javaxservletannotationWebServlet;

import javaxservlet>

通过rocketmq控制台查看。

除了控制台查看,还可以通过主机地址进行查看,具体步骤如下:1、首先登陆nameserver主机地址。2、执行以下命令(shmqadminconsumerProgress-gconsumeGroupNameeg:shmqadminconsumerProgress-gtest)查看。3、出来的结果BrokerOffset为生产的条数,ConsumerOffset为消费的条数,Diff为堆积的条数。

RocketMQ是阿里巴巴开源的分布式消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。

你用的是IBMMQ还是Apache的?一般通过JAVA的JMS可以取得。

例如IBMMQ里有个MQQueue对象

//获取队列实例

MQQueuequeue=qMgraessQueue("TEST_QUEUE",openOptions);

//获取当前队列最长消息的长度

queue()

//获取当前队列最长深度

queue()

等等功能都是提供的,具体你下载个WebSphereMQAPI找到MQQueue一看便知。

以上就是关于阿里云消息中间件(MQ)探秘全部的内容,包括:阿里云消息中间件(MQ)探秘、如何使用 WebSphere MQ 的引用消息传输文件、RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/web/10156229.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存