分布式消息mq的两种订阅方式

分布式消息mq的两种订阅方式,第1张

分布式消息 MQ 的两种订阅方式如下:

一、点对点模式:

1、场景:

客户端A和客户端B使用同一队列,进行消息通讯,客户端 A 发布消息,客户端 B 接收消息。

2、点对点模式包含三个角色——消息队列,发送者,接收者:

发送者发送消息到消息队列中,接收者从消息队列中取出消息进行接收,消息接收后,消息队列中将不再存储该消息,其他接收者不可能再接收到这条消息。

3、特点:

(1)每个消息只有一个接收者。

(2)发送者和接收者之间没有依赖性,发送者发送消息后,消息直接存储在消息队列中,接收者是否在线并不影响发送。

(3)接收者成功接收消息之后,需要向消息队列应答成功,以便消息队列删除该条消息。

二、发布订阅模式:

1、场景:

客户端 A,客户端 B,客户端 N 等订阅同一主题,进行消息发布和接收。

2、点对点模式包含三个角色——角色主题(topic),发布者(publisher),订阅者(subscriber):

发送者将消息发送到topic,系统将这些消息传递给多个订阅者。

3、特点:

(1)每个消息可以有多个订阅者。

(2)发布者和订阅者之间有时间上的依赖性。针对某个主题的订阅者,它必须创建一个订阅者之后,才可以接收发布者发布的消息。

(3)为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行。

分布式消息(服务)介绍:

分布式消息服务是一个用来处理分布式系统中的消息,分布式应用程序通过网络访问位于不同服务器上的消息队列,就像访问本地系统一样。

分布式消息服务是一个专为企业级应用开发的软件服务,具有高可用,高扩展,高性能,可根据需要灵活部署和伸缩的特点。分布式消息服务是一个托管的高性能消息队列服务,拥有高吞吐,高可用,高可靠,可根据需要灵活配置的队列服务,满足不同应用场景的需要。

分布式消息服务是一个高吞吐、高可用的消息中间件服务,使用消息队列通信,具有大规模、高可靠、高并发访问、可扩展、高安全、可d性伸缩、便捷管理的特点。

百度百科-分布式

百度百科-分布式信息系统

开篇之前奉上几条黄金链接:

MQ参考文档

>

消息队列RabbitMQ版对集群、接口调用以及字符进行了限制,您在使用消息队列RabbitMQ版时注意不要超过相应的限制值,以免程序出现异常。

集群限制

限制项 专业版 企业版 铂金版

单实例的Vhost数量 单实例的Vhost数量取值范围为[Queue数量,200]。 200 4096

单实例的Connection数量 等于Queue数量×4。

单实例的Connection数量取值范围为[1000,5000]。

根据实例规格购买。

单实例至少有2000的免费额度,最大连接数为1万。

根据实例规格购买。

单实例有5万的免费额度,最大连接数为10万。

单Vhost的Exchange数量 等于Queue数量。 等于Queue数量。 无

单Vhost的Binding数量 等于Queue数量×10。

单Vhost的Binding数量取值范围为[4096,20000]。

等于Queue数量×10。

单Vhost的Binding数量取值范围为[4096,20000]。

单Exchange绑定的Queue数量 256 256 256

如需定制,请提交工单。

单Queue的Consumer数量 等于Queue数量×4。

单Queue的Consumer数量取值范围为[50,5000]。

等于Queue数量×4。

单Queue的Consumer数量取值范围为[800,5000]。

单Connection的Channel数量 单Connection的Channel数量取值为64或Queue数量。

如果Queue数量小于或等于64,则单Connection的Channel数量等于64。

如果Queue数量大于64,则单Connection的Channel数量等于Queue数量。

等于Queue数量。

单Connection的Channel数量取值范围为[Queue数量,2048]。

3000

消息大小 64 KB 1 MB

说明 当消息小于64 KB时,算一次请求;当消息大于64 KB时,每4 KB算一次请求。

1 MB

延时消息最大延时时间 24小时 24小时 7天

消息最大保留时间 3天 3天 3天

消息最大重入Queue次数 16 16 64

消息超时重试间隔 60秒 5分钟 30分钟

接口调用限制

注意

预付费专业版和企业版:单个接口的调用量受限制,且所有接口的调用总量受限于购买的TPS流量峰值。

预付费铂金版:单个接口的调用量无限制,但所有接口的调用总量受限于购买的TPS流量峰值。

限制项 限制项接口 专业版 企业版 铂金版

单实例发送消息 basicPublish 25000 TPS 25000 TPS 无

单实例同步获取消息 basicGet 500 TPS 500 TPS 无

单实例清Queue purgeQueue 500 TPS 500 TPS 无

单实例创建Exchange exchangeDeclare 500 TPS 500 TPS 无

单实例删除Exchange exchangeDelete 500 TPS 500 TPS 无

单实例创建Queue queueDeclare 500 TPS 500 TPS 无

单实例删除Queue queueDelete 500 TPS 500 TPS 无

单实例创建Binding queueBind 500 TPS 500 TPS 无

单实例删除Binding queueUnbind 500 TPS 500 TPS 无

单实例恢复消息 basicRecover 500 TPS 500 TPS 无

单实例重入Queue消息

basicReject(requeue=true)

basicNack(requeue=true)

20 TPS 20 TPS 无

字符限制

限制项 限制值

Queue名称 只能包含字⺟、数字、短划线(-)、下划线(_)、半角句号()、井号(#)、正斜线(/)、at符号(@),⻓度限制1~255字符。

Exchange名称 只能包含字⺟、数字、短划线(-)、下划线(_)、半角句号()、井号(#)、正斜线(/)、at符号(@),⻓度限制1~255字符。

Vhost名称 只能包含字⺟、数字、短划线(-)、下划线(_)、半角句号()、井号(#)、正斜线(/)、at符号(@),⻓度限制1~255字符。

Binding Key名称

普通类型:只能包含字⺟、数字、短划线(-)、下划线(_)、半角句号()、正斜线(/)、at符号(@),⻓度限制1~255字符。

Topic类型:只能包含字母、数字、短划线(-)、下划线(_)、星号()、半角句号()、井号(#)、正斜线(/)、at符号(@),长度限制1~255字符。

不能以半角句号()开头或结尾。对于井号(#)或星号(),如果以其开头,则其后需有半角句号(),如果以其结尾,则其前需有半角句号(),如果既不是开头也不是结尾,则其前后均需有半角句号()。

Consumer Tag名称 只能包含字⺟、数字、短划线(-)、下划线(_)、半角句号()、井号(#)、正斜线(/)、at符号(@),⻓度限制1~255字符。

IBM MQ多实例失败的原因可能有很多,比如:1、网络问题,多实例之间的网络连接不稳定或者网络不通;2、安装问题,多实例之间的安装不一致;3、设置问题,多实例之间的设置不一致;4、资源问题,多实例之间的资源分配不均衡;5、硬件问题,多实例之间的硬件设备不兼容;6、程序问题,多实例之间的程序出现错误;7、其他原因,如系统更新、数据库升级等。

IBM的mq管理创建管理器时间的太长,那主要是我们的设置问题,我们应该把这个嗯设备然后打开,打开之后我们找到设置,然后进行嗯,重新安置安排创设把这个mbmmq,创建管理器的时间,然后变成正常的速度或者是正常的时间,这样的话我们就不会有出现时间太短时间太长的情况。

第一步:

canal原理是把自己伪装成一个mysql的从节点来读取mysql主库的binlog日志。

所以需要mysql主库先开启binlog日志功能。可以参考其他帖子打开binlog功能。

ps!!!!! 这里有一个非常值得注意的问题就是canal采集到MQ数据中使用的是binlog的的row模式

一定要是row模式。并且canal中配置canalinstancefilterregex 如果配制指定采集某几个表一定要在mysql中配置binlog_rows_query_log_events是OFF模式的。否则canal中的canalinstancefilterregex过滤器不生效。

第二步:

canal服务解压之后

其中canal_localproperties是canal控制台配置文件

canalproperties 是canal基础服务配置文件

ht_order_sync文件夹 是canal的服务实例

ht_product_sync文件夹 是canal的服务实例

上面这些是比较关键的文件

其中ht_order_sync 和 ht_product_sync 是我自己创建的文件名字可以随便叫什么都可以

如果是全新的canal解压之后 有一个example文件那个就是样例文件。

我创建了 ht_order_sync 和 ht_product_sync 是因为我有两个业务需求是 同步订单业务 和 商品业务 所以创建了两个实例,canal启动之后会加载自己创建的文件夹。

cd 进入 ht_order_sync文件夹后 如下图

我们一般需要改的只有 instanceproperties 这个文件。其他文件是记录binlog的同步位置的文件。删除之后就重置binlog的采集位置,所以不要轻易删除。

下面打开instanceproperties 如下图

图中

1 是要采集的mysql的账号和密码

2 是要采集的哪张表可以配置全部也可以配置部分我是配置了部分表可以直接写库和表名,我库名用了变量后面讲怎么传进来的

3 是黑明单结合上面那个白名单用的 我固定了采集某几张表所以不要配置

4 是采集的每一行的变动会发送到配置的mq的topic中作为mysql的一条改动数据(增删改)

mq数据如下图会包含改动前和改动后的数据表名库名等等。

上图是我做的一个insert的样例数据,type类型就是insert,还有update和delete 有字段类型描述

old 是改动前的数据 因为insert *** 作所以是null 如果是update *** 作此处会有值 可以做变更监听逻辑

data是当前改动后的数据。

上面说的都是具体实例配置

下面贴出canal的实例上层配置文件也就是canal服务配置文件 canalproperties

如下图 一张图截图放不下放了三张图

图上编号

1 是代表canal采集到信息推送到哪里。tcp是代表可以推送到程序采集模式

如果是mq配置成对应的mq比如 kafka活着rocketmq等等

2 是canaldestinations 扫描上面说的自己创建的实例 配置几个文件夹扫描几个

canalautoscan = true 的意思是自动扫描自己创建的实例。

所以应该可以把canalautoscan 配置成false然后配置canaldestinations自己创建的文件夹

即可

3 canalmqflatMessage = true

关注一下

采集的消息的消息方式之前好像不设置成true没采集到,已经忘了 也需要配置 mq读取 也使用这个方式。 可以参考一下官网等等。

4 配置成了rocketMQ那么就配置 相关的主题等等

结束

在MQ中,我们把应用程序交由MQ传输的数据定义为消息,我们可以定义消息的内容并对消息进行广义的理解,比如:用户的各种类型的数据文件,某个应用向其它应用发出的处理请求等都可以作为消息。消息有两部分组成:

消息描述符(Message Discription或Message Header),描述消息的特征,如:消息的优先级、生命周期、消息Id等;

消息体(Message Body),即用户数据部分。在MQ中,消息分为两种类型,非永久性(non-persistent)消息和永久性(persistent)消息,非永久性消息是存储在内存中的,它是为了提高性能而设计的,当系统掉电或MQ队列管理器重新启动时,将不可恢复。当用户对消息的可靠性要求不高,而侧重系统的性能表现时,可以采用该种类型的消息,如:当发布股票信息时,由于股票信息是不断更新的,我们可能每若干秒就会发布一次,新的消息会不断覆盖旧的消息。永久性消息是存储在硬盘上,并且纪录数据日志的,它具有高可靠性,在网络和系统发生故障等情况下都能确保消息不丢、不重。

此外,在MQ中,还有逻辑消息和物理消息的概念。利用逻辑消息和物理消息,我们可以将大消息进行分段处理,也可以将若干个本身完整的消息在应用逻辑上归为一组进行处理。

3) 队列

队列是消息的安全存放地,队列存储消息直到它被应用程序处理。

我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用,同时网络环境也是不稳定的,造成了我们多个机器之间的数据同步问题,这就是典型的分布式事务问题。

在分布式事务中事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是要保证不同节点之间的数据一致性。

1、2PC(二阶段提交)方案 - 强一致性

2、3PC(三阶段提交)方案

3、TCC (Try-Confirm-Cancel)事务 - 最终一致性

4、Saga事务 - 最终一致性

5、本地消息表 - 最终一致性

6、MQ事务 - 最终一致性

消息的生产方,除了维护自己的业务逻辑之外,同时需要维护一个消息表。这个消息表里面记录的就是需要同步到别的服务的信息,当然这个消息表,每个消息都有一个状态值,来标识这个消息有没有被成功处理。

发送放的业务逻辑以及消息表中数据的插入将在一个事务中完成,这样避免了业务处理成功 + 事务消息发送失败,或业务处理失败 + 事务消息发送成功,这个问题。

举个栗子:

我们假定目前有两个服务,订单服务,购物车服务,用户在购物车中对几个商品进行合并下单,之后需要情况购物车中刚刚已经下单的商品信息。

1、消息的生产方也就是订单服务,完成了自己的逻辑(对商品进行下单 *** 作)然后把这个消息通过 mq 发送到需要进行数据同步的其他服务中,也就是我们栗子中的购物车服务。

2、其他服务(购物车服务)会监听这个队列;

1、如果收到这个消息,并且数据同步执行成功了,当然这也是一个本地事务,就通过 mq 回复消息的生产方(订单服务)消息已经处理了,然后生产方就能标识本次事务已经结束。如果是一个业务上的错误,就回复消息的生产方,需要进行数据回滚了。

2、很久没收到这个消息,这种情况是不会发生的,消息的发送方会有一个定时的任务,会定时重试发送消息表中还没有处理的消息;

3、消息的生产方(订单服务)如果收到消息回执;

1、成功的话就修改本次消息已经处理完,也就是本次分布式事务的同步已经完成;

2、如果消息的结果是执行失败,同时在本地回滚本次事务,标识消息已经处理完成;

3、如果消息丢失,也就是回执消息没有收到,这种情况也不太会发生,消息的发送方(订单服务)会有一个定时的任务,定时重试发送消息表中还没有处理的消息,下游的服务需要做幂等,可能会收到多次重复的消息,如果一个回复消息生产方中的某个回执信息丢失了,后面持续收到生产方的 mq 消息,然后再次回复消息的生产方回执信息,这样总能保证发送者能成功收到回执,消息的生产方在接收回执消息的时候也要做到幂等性。

这里有两个很重要的 *** 作:

1、服务器处理消息需要是幂等的,消息的生产方和接收方都需要做到幂等性;

2、发送放需要添加一个定时器来遍历重推未处理的消息,避免消息丢失,造成的事务执行断裂。

该方案的优缺点

优点:

1、在设计层面上实现了消息数据的可靠性,不依赖消息中间件,弱化了对 mq 特性的依赖。

2、简单,易于实现。

缺点:

主要是需要和业务数据绑定到一起,耦合性比较高,使用相同的数据库,会占用业务数据库的一些资源。

下面分析下几种消息队列对事务的支持

RocketMQ 中的事务,它解决的问题是,确保执行本地事务和发消息这两个 *** 作,要么都成功,要么都失败。并且,RocketMQ 增加了一个事务反查的机制,来尽量提高事务执行的成功率和数据一致性。

主要是两个方面,正常的事务提交和事务消息补偿

正常的事务提交

1、发送消息(half消息),这个 half 消息和普通消息的区别,在事务提交 之前,对于消费者来说,这个消息是不可见的。

2、MQ SERVER写入信息,并且返回响应的结果;

3、根据MQ SERVER响应的结果,决定是否执行本地事务,如果MQ SERVER写入信息成功执行本地事务,否则不执行;

如果MQ SERVER没有收到 Commit 或者 Rollback 的消息,这种情况就需要进行补偿流程了

补偿流程

1、MQ SERVER如果没有收到来自消息发送方的 Commit 或者 Rollback 消息,就会向消息发送端也就是我们的服务器发起一次查询,查询当前消息的状态;

2、消息发送方收到对应的查询请求,查询事务的状态,然后把状态重新推送给MQ SERVER,MQ SERVER就能之后后续的流程了。

相比于本地消息表来处理分布式事务,MQ 事务是把原本应该在本地消息表中处理的逻辑放到了 MQ 中来完成。

Kafka 中的事务解决问题,确保在一个事务中发送的多条信息,要么都成功,要么都失败。也就是保证对多个分区写入 *** 作的原子性。

通过配合 Kafka 的幂等机制来实现 Kafka 的 Exactly Once,满足了读取-处理-写入这种模式的应用程序。当然 Kafka 中的事务主要也是来处理这种模式的。

什么是读取-处理-写入模式呢?

栗如:在流计算中,用 Kafka 作为数据源,并且将计算结果保存到 Kafka 这种场景下,数据从 Kafka 的某个主题中消费,在计算集群中计算,再把计算结果保存在 Kafka 的其他主题中。这个过程中,要保证每条消息只被处理一次,这样才能保证最终结果的成功。Kafka 事务的原子性就保证了,读取和写入的原子性,两者要不一起成功,要不就一起失败回滚。

这里来分析下 Kafka 的事务是如何实现的

它的实现原理和 RocketMQ 的事务是差不多的,都是基于两阶段提交来实现的,在实现上可能更麻烦

先来介绍下事务协调者,为了解决分布式事务问题,Kafka 引入了事务协调者这个角色,负责在服务端协调整个事务。这个协调者并不是一个独立的进程,而是 Broker 进程的一部分,协调者和分区一样通过选举来保证自身的可用性。

Kafka 集群中也有一个特殊的用于记录事务日志的主题,里面记录的都是事务的日志。同时会有多个协调者的存在,每个协调者负责管理和使用事务日志中的几个分区。这样能够并行的执行事务,提高性能。

下面看下具体的流程

事务的提交

1、协调者设置事务的状态为PrepareCommit,写入到事务日志中;

2、协调者在每个分区中写入事务结束的标识,然后客户端就能把之前过滤的未提交的事务消息放行给消费端进行消费了;

事务的回滚

1、协调者设置事务的状态为PrepareAbort,写入到事务日志中;

2、协调者在每个分区中写入事务回滚的标识,然后之前未提交的事务消息就能被丢弃了;

这里引用一下消息队列高手课中的

RabbitMQ 中事务解决的问题是确保生产者的消息到达MQ SERVER,这和其他 MQ 事务还是有点差别的,这里也不展开讨论了。

先来分析下一条消息在 MQ 中流转所经历的阶段。

生产阶段 :生产者产生消息,通过网络发送到 Broker 端。

存储阶段 :Broker 拿到消息,需要进行落盘,如果是集群版的 MQ 还需要同步数据到其他节点。

消费阶段 :消费者在 Broker 端拉数据,通过网络传输到达消费者端。

发生网络丢包、网络故障等这些会导致消息的丢失

在生产者发送消息之前,通过channeltxSelect开启一个事务,接着发送消息, 如果消息投递 server 失败,进行事务回滚channeltxRollback,然后重新发送, 如果 server 收到消息,就提交事务channeltxCommit

不过使用事务性能不好,这是同步 *** 作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。

使用确认机制,生产者将信道设置成 confirm 确认模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(BasicAck)给生产者(包含消息的唯一 deliveryTag 和 multiple 参数),这就使得生产者知晓消息已经正确到达了目的地了。

multiple 为 true 表示的是批量的消息确认,为 true 的时候,表示小于等于返回的 deliveryTag 的消息 id 都已经确认了,为 false 表示的是消息 id 为返回的 deliveryTag 的消息,已经确认了。

确认机制有三种类型

1、同步确认

2、批量确认

3、异步确认

同步模式的效率很低,因为每一条消息度都需要等待确认好之后,才能处理下一条;

批量确认模式相比同步模式效率是很高,不过有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送;

异步模式就是个很好的选择了,不会有同步模式的阻塞问题,同时效率也很高,是个不错的选择。

Kafaka 中引入了一个 broker。 broker 会对生产者和消费者进行消息的确认,生产者发送消息到 broker,如果没有收到 broker 的确认就可以选择继续发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

只要正确处理 Broker 的确认响应,就可以避免消息的丢失。

RocketMQ 提供了3种发送消息方式,分别是:

同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。

异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

防止在存储阶段消息额丢失,可以做持久化,防止异常情况(重启,关闭,宕机)。。。

RabbitMQ 持久化中有三部分:

消息的持久化,在投递时指定 delivery_mode=2(1是非持久化),消息的持久化,需要配合队列的持久,只设置消息的持久化,重启之后队列消失,继而消息也会丢失。所以如果只设置消息持久化而不设置队列的持久化意义不大。

对于持久化,如果所有的消息都设置持久化,会影响写入的性能,所以可以选择对可靠性要求比较高的消息进行持久化处理。

不过消息持久化并不能百分之百避免消息的丢失

比如数据在落盘的过程中宕机了,消息还没及时同步到内存中,这也是会丢数据的,这种问题可以通过引入镜像队列来解决。

镜像队列的作用:引入镜像队列,可已将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上来保证服务的可用性。(更细节的这里不展开讨论了)

*** 作系统本身有一层缓存,叫做 Page Cache,当往磁盘文件写入的时候,系统会先将数据流写入缓存中。

Kafka 收到消息后也会先存储在也缓存中(Page Cache)中,之后由 *** 作系统根据自己的策略进行刷盘或者通过 fsync 命令强制刷盘。如果系统挂掉,在 PageCache 中的数据就会丢失。也就是对应的 Broker 中的数据就会丢失了。

处理思路

1、控制竞选分区 leader 的 Broker。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。

2、控制消息能够被写入到多个副本中才能提交,这样避免上面的问题1。

1、将刷盘方式改成同步刷盘;

2、对于多个节点的 Broker,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段就很简单了,如果在网络传输中丢失,这个消息之后还会持续的推送给消费者,在消费阶段我们只需要控制在业务逻辑处理完成之后再去进行消费确认就行了。

总结:对于消息的丢失,也可以借助于本地消息表的思路,消息产生的时候进行消息的落盘,长时间未处理的消息,使用定时重推到队列中。

消息在 MQ 中的传递,大致可以归类为下面三种:

1、At most once: 至多一次。消息在传递时,最多会被送达一次。是不安全的,可能会丢数据。

2、At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。

3、Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

大部分消息队列满足的都是At least once,也就是可以允许重复的消息出现。

我们消费者需要满足幂等性,通常有下面几种处理方案

1、利用数据库的唯一性

根据业务情况,选定业务中能够判定唯一的值作为数据库的唯一键,新建一个流水表,然后执行业务 *** 作和流水表数据的插入放在同一事务中,如果流水表数据已经存在,那么就执行失败,借此保证幂等性。也可先查询流水表的数据,没有数据然后执行业务,插入流水表数据。不过需要注意,数据库读写延迟的情况。

2、数据库的更新增加前置条件

3、给消息带上唯一ID

每条消息加上唯一ID,利用方法1中通过增加流水表,借助数据库的唯一性来处理重复消息的消费。

以上就是关于分布式消息mq的两种订阅方式全部的内容,包括:分布式消息mq的两种订阅方式、linux系统如何启动mq、mq重入队列次数限制等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/10132372.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存