消息队列原理及选型

消息队列原理及选型,第1张

消息队列(Message Queue)是一种进程间通信或同一进程的不同线程间的通信方式。

Broker(消息服务器)
Broker的概念来自与Apache ActiveMQ,通俗的讲就是MQ的服务器。

Producer(生产者)
业务的发起方,负责生产消息传输给broker

Consumer(消费者)
业务的处理方,负责从broker获取消息并进行业务逻辑处理

Topic(主题)
发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅 者,实现消息的广播

Queue(队列)
PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收。

Message(消息体)
根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

点对点模型用于消息生产者和消息消费者之间点到点的通信。

点对点模式包含三个角色:

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,可以放在内存 中也可以持久化,直到他们被消费或超时。

特点:

发布订阅模型包含三个角色:

多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

特点:

AMQP即Advanced Message Queuing Protocol,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

优点:可靠、通用

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统

STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互 *** 作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。

优点:命令模式(非topic\queue模式)

XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时 *** 作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其 *** 作系统和浏览器不同。

优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

RabbitMQ 是一个开源的 AMQP 实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

Channel(通道)
道是两个管理器之间的一种单向点对点的的通信连接,如果需要双向交流,可以建立一对通道。

Exchange(消息交换机)
Exchange类似于数据通信网络中的交换机,提供消息路由策略。

RabbitMq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。

Exchange有4种类型:direct(默认),fanout, topic, 和headers。
不同类型的Exchange转发消息的策略有所区别:

Binding(绑定)
所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系。

Routing Key(路由关键字)
exchange根据这个关键字进行消息投递。

vhost(虚拟主机)
在RabbitMq server上可以创建多个虚拟的message broker,又叫做virtual hosts (vhosts)。每一个vhost本质上是一个mini-rabbitmq server,分别管理各自的exchange,和bindings。vhost相当于物理的server,可以为不同app提供边界隔离,使得应用安全的运行在不同的vhost实例上,相互之间不会干扰。producer和consumer连接rabbit server需要指定一个vhost。

假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。
基本的通信流程大概如下所示:

Consumer收到消息时需要显式的向rabbit broker发送basic。ack消息或者consumer订阅消息时设置auto_ack参数为true。

在通信过程中,队列对ACK的处理有以下几种情况:

即消息的Ackownledge确认机制,为了保证消息不丢失,消息队列提供了消息Acknowledge机制,即ACK机制,当Consumer确认消息已经被消费处理,发送一个ACK给消息队列,此时消息队列便可以删除这个消息了。如果Consumer宕机/关闭,没有发送ACK,消息队列将认为这个消息没有被处理,会将这个消息重新发送给其他的Consumer重新消费处理。

消息的收发处理支持事务,例如:在任务中心场景中,一次处理可能涉及多个消息的接收、处理,这应该处于同一个事务范围内,如果一个消息处理失败,事务回滚,消息重新回到队列中。

消息的持久化,对于一些关键的核心业务来说是非常重要的,启用消息持久化后,消息队列宕机重启后,消息可以从持久化存储恢复,消息不丢失,可以继续消费处理。

fanout 模式
模式特点:

direct 模式
任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue。

如果一个exchange 声明为direct,并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key。

简而言之就是:生产者生成消息发送给Exchange, Exchange根据Exchange类型和basic_publish中的routing_key进行消息发送 消费者:订阅Exchange并根据Exchange类型和binding key(bindings 中的routing key) ,如果生产者和订阅者的routing_key相同,Exchange就会路由到那个队列。

topic 模式
前面讲到direct类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。

topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同。
它约定:

以上图中的配置为例,routingKey=”quickorangerabbit”的消息会同时路由到Q1与Q2,routingKey=”lazyorangefox”的消息会路由到Q1,routingKey=”lazybrownfox”的消息会路由到Q2,routingKey=”lazypinkrabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quickbrownfox”、routingKey=”orange”、routingKey=”quickorangemalerabbit”的消息将会被丢弃,因为它们没有匹配任何bindingKey。

RabbitMQ,部署分三种模式:单机模式,普通集群模式,镜像集群模式。

普通集群模式
多台机器部署,每个机器放一个rabbitmq实例,但是创建的queue只会放在一个rabbitmq实例上,每个实例同步queue的元数据。

如果消费时连的是其他实例,那个实例会从queue所在实例拉取数据。这就会导致拉取数据的开销,如果那个放queue的实例宕机了,那么其他实例就无法从那个实例拉取,即便开启了消息持久化,让rabbitmq落地存储消息的话,消息不一定会丢,但得等这个实例恢复了,然后才可以继续从这个queue拉取数据, 这就没什么高可用可言,主要是提供吞吐量 ,让集群中多个节点来服务某个queue的读写 *** 作。

镜像集群模式

queue的元数据和消息都会存放在多个实例,每次写消息就自动同步到多个queue实例里。这样任何一个机器宕机,其他机器都可以顶上,但是性能开销太大,消息同步导致网络带宽压力和消耗很重,另外,没有扩展性可言,如果queue负载很重,加机器,新增的机器也包含了这个queue的所有数据,并没有办法线性扩展你的queue。此时,需要开启镜像集群模式,在rabbitmq管理控制台新增一个策略,将数据同步到指定数量的节点,然后你再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

Kafka 是 Apache 的子项目,是一个高性能跨语言的分布式发布/订阅消息队列系统(没有严格实现 JMS 规范的点对点模型,但可以实现其效果),在企业开发中有广泛的应用。高性能是其最大优势,劣势是消息的可靠性(丢失或重复),这个劣势是为了换取高性能,开发者可以以稍降低性能,来换取消息的可靠性。

一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。

Kafka和JMS(Java Message Service)实现(activeMQ)不同的是:即使消息被消费,消息仍然不会被立即删除。日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费。kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支。

对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费。事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值。(offset将会保存在zookeeper中,参见下文)

kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响。

partitions的设计目的有多个。最根本原因是kafka基于文件存储。通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions,来消息保存/消费的效率。此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力。(具体原理参见下文)。

一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写 *** 作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。

基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";leader负责所有的读写 *** 作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可。由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定。

Producers
Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于"round-robin"方式或者通过其他的一些算法等。

Consumers
本质上kafka只支持Topic。每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer。发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。

如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡。

如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者。

在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。事实上,从Topic角度来说,消息仍不是有序的。

Kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。

Guarantees

Kafka就比较适合高吞吐量并且允许少量数据丢失的场景,如果非要保证“消息可靠传输”,可以使用JMS。

Kafka Producer 消息发送有两种方式(配置参数 producertype):

对于同步方式(producertype=sync)?Kafka Producer 消息发送有三种确认方式(配置参数 acks):

kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力。

持久性
kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的本身特性。且无论任何OS下,对文件系统本身的优化几乎没有可能。文件缓存/直接内存映射等是常用的手段。因为kafka是对日志文件进行append *** 作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。

性能
需要考虑的影响性能点很多,除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题。kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息。不过消息量的大小可以通过配置文件来指定。对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换。 其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。可以将任何在网络上传输的消息都经过压缩。kafka支持gzip/snappy等多种压缩方式。

生产者
负载均衡: producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何“路由层“。事实上,消息被路由到哪个partition上,有producer客户端决定。比如可以采用“random““key-hash““轮询“等,如果一个topic中有多个partitions,那么在producer端实现“消息均衡分发“是必要的。

其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件。
异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。

消费者
consumer端向broker发送“fetch”请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息。

在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端。不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch。

其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态。这就要求JMS broker需要太多额外的工作。在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的。当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset。由此可见,consumer客户端也很轻量级。

对于JMS实现,消息传输担保非常直接:有且只有一次(exactly once)。
在kafka中稍有不同:

at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理。那么此后"未处理"的消息将不能被fetch到,这就是"at most once"。

at least once: 消费者fetch消息,然后处理消息,然后保存offset。如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存 *** 作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once",原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态。

exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的。

通常情况下“at-least-once”是我们首选。(相比at most once而言,重复接收数据总比丢失数据要好)。

kafka高可用由多个broker组成,每个broker是一个节点;

创建一个topic,这个topic会划分为多个partition,每个partition存在于不同的broker上,每个partition就放一部分数据。

kafka是一个分布式消息队列,就是说一个topic的数据,是分散放在不同的机器上,每个机器就放一部分数据。

在08版本以前,是没有HA机制的,就是任何一个broker宕机了,那个broker上的partition就废了,没法写也没法读,没有什么高可用性可言。

08版本以后,才提供了HA机制,也就是就是replica副本机制。每个partition的数据都会同步到其他的机器上,形成自己的多个replica副本。然后所有replica会选举一个leader出来,那么生产和消费都跟这个leader打交道,然后其他replica就是follower。

写的时候,leader会负责把数据同步到所有follower上去,读的时候就直接读leader上数据即可。

kafka会均匀的将一个partition的所有replica分布在不同的机器上,从而提高容错性。

如果某个broker宕机了也没事,它上面的partition在其他机器上都有副本的,如果这上面有某个partition的leader,那么此时会重新选举一个新的leader出来,大家继续读写那个新的leader即可。这就有所谓的高可用性了。

写数据的时候,生产者就写leader,然后leader将数据落地写本地磁盘,接着其他follower自己主动从leader来pull数据。一旦所有follower同步好数据了,就会发送ack给leader,leader收到所有follower的ack之后,就会返回写成功的消息给生产者。

消息丢失会出现在三个环节,分别是生产者、mq中间件、消费者:

RabbitMQ

Kafka
大体和RabbitMQ相同。

Rabbitmq
需要保证顺序的消息投递到同一个queue中,这个queue只能有一个consumer,如果需要提升性能,可以用内存队列做排队,然后分发给底层不同的worker来处理。

Kafka
写入一个partition中的数据一定是有序的。生产者在写的时候 ,可以指定一个key,比如指定订单id作为key,这个订单相关数据一定会被分发到一个partition中去。消费者从partition中取出数据的时候也一定是有序的,把每个数据放入对应的一个内存队列,一个partition中有几条相关数据就用几个内存队列,消费者开启多个线程,每个线程处理一个内存队列。

虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。

消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

生产者提供一个消息id,消费者要把已经处理过的消息 ID 号记录下来。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

发送消息日志

消费消息日志

Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列 *** 作命令。

XADD:插入消息,保证有序,可以自动生成全局唯一 ID;

XREAD:用于读取消息,可以按 ID 读取数据;

XREADGROUP:按消费组形式读取消息;

XPENDING :XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,

XACK:XACK 命令用于向消息队列确认消息处理已完成。

监听器

监听器容器

初始化消息队列(integral-queue)和消费者组(integral-group),执行RedisMqTestinitMessageQueue()即可

可参考List实现的处理方式

可使用RedisMqTestgetPendingQueue()获取未成功消费的消息队列进行处理

消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失: 生产阶段 、 存储阶段 、 消费阶段

所以要从这三个阶段考虑:

在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。

存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步

从 Consumer 角度分析,如何保证消息被成功消费?

Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。

对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的 有且仅有一次 。 RocketMQ 择了确保一定投递,保证消息不丢失,但有可能造成消息重复。

处理消息重复问题,主要有业务端自己保证,主要的方式有两种: 业务幂等 和 消息去重

发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

顺序消息是指消息的 消费顺序 和 产生顺序 相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。

顺序消息分为 全局顺序消息 和 部分顺序消息 :

部分顺序消息相对比较好实现,生产端需要做到把同 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从同一个 Message Queue 读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。

发送端使用 MessageQueueSelector 类来控制 把消息发往哪个 Message Queue

消费端通过使用 MessageListenerOrderly 来解决单 Message Queue 的消息被并发处理的问题

RocketMQ 默认情况下不保证顺序,比如创建一个 Topic ,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer ,每个 Consumer 也可能启动多个线程并行处理,所以消息被哪个 Consumer 消费,被消费的顺序和写人的顺序是否一致是不确定的。

要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后 Producer Consumer 的并发设置,也要是一。简单来说,为了保证整个 Topic 全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲 RocketMQ 的高并发、高吞吐的特性了。

有两种方案:

对消息的过滤有三种方式:

电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息, 1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

RocketMQ 是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:

但是目前 RocketMQ 支持的延时级别是有限的:

那么 RocketMQ 怎么实现延时消息的
简单,八个字: 临时存储 + 定时任务
Broker 收到延时消息了,会先发送到主题( SCHEDULE_TOPIC_XXXX )的相应时间段的 Message Queue 中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息。

半消息 :是指暂时还不能被 Consumer 消费的消息, Producer 成功发送到 Broker 端的消息,但是此消息被标记为 暂不可投递 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后, Consumer 才能消费此条消息。

依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:

RocketMQ 实现消息事务 :

死信队列 用于处理无法被正常消费的消息,即 死信消息

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为 死信队列

死信消息的特点:

死信队列的特点:

RocketMQ 控制台提供对死信消息的查询、导出和重发的功能。

NameServer 因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。

RocketMQ 的高可用主要是在体现在 Broker 的读和写的高可用, Broker 的高可用是通过集群和主从实现的。

也就是说 Producer 只能向 Master 角色的 Broker 写入消息, Cosumer 可以从 Master 和 Slave 角色的 Broker 读取消息。

Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave 。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后, Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。

如何达到发送端写的高可用性

注意 : RocketMQ 目前还不支持 Broker 把 Slave 自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker

安装sql2005一直失败,以为提示的问题是这个com+目录问题警告所致,找了很久找到这个问题的解决方案sql2005_STD_X86在XPSP2下安装失败的一点经验软环境是XPSP2,安装SQL2005_STD_X86版。故障提示:1。如果 SQL Server 安装程序失败,安装程序将回滚所安装的系统,但可能不会删除所有 manifest 文件。解决方法是重命名这些文件,然后重新运行安装程序。有关详细信息,请参阅“如何处理 SQL Server 安装过程中的 COM+ 检查失败问题”。如果未运行 Microsoft 分布式事务处理协调器 (MS DTC),或者,在使用 Microsoft 群集服务器的情况下,如果 MS DTC 不是群集资源,则可能会发生 COM+ 错误。COM+ 依赖于 MS DTC,而 Integration Services 中的消息队列任务依赖于 COM +。如果出现 COM+ 错误,则只有将 COM+ 系统正确配置后,Integration Services 中的消息队列任务才可用。 2。对性能监视器计数器注册表值执行系统配置检查失败。有关详细信息,请参阅自述文件或 SQL Server 联机丛书中的“如何在 SQL Server 2005 中为安装程序增加计数器注册表项值”。安装中止。查找联机丛书,有如下提示: 1。Microsoft SQL Server 2005 安装程序检查 COM+ 是否已正确配置。如果发现配置错误,安装程序仍将继续,但是在系统配置检查 (SCC) 报告中显示以下警告:“如果 SQL Server 安装程序失败,安装程序将回滚所进行的安装,但可能不会删除所有的 manifest 文件。解决方法是重命名这些文件,然后重新运行安装程序。”如果未运行 Microsoft 分布式事务处理协调器 (MS DTC),或者,在使用 Microsoft 群集服务器的情况下,如果 MS DTC 不是群集资源,则可能会发生 COM+ 错误。COM+ 依赖于 MS DTC,而 Integration Services 中的消息队列任务依赖于 COM +。如果出现 COM+ 错误,则只有将 COM+ 系统正确配置后,Integration Services 中的消息队列任务才可用。 若要使用消息队列(亦称 MSMQ),请确保 MS DTC 正在运行并且已正确配置。如果 SQL Server 安装在群集上,则 MS DTC 必须是群集资源。 按照下列过程重新安装 COM+。 安装组件服务管理单元 在 Windows 桌面上,单击“开始”,然后单击“运行”。 在“打开”框中,键入 MMC,然后单击“确定”。 在“控制台”窗口中,单击菜单栏上的“文件”,然后单击“添加/删除管理单元”。 在“添加/删除管理单元”窗口,单击“添加”。 在“添加独立管理单元”窗口,从管理单元列表中选择“组件服务”,然后单击“添加”。单击“关闭”以关闭“添加独立管理单元”窗口,然后单击“确定”以关闭“添加/删除管理单元”窗口。 在“控制台根节点\组件服务”窗口,展开“组件服务”树。这就是当 COM+ 出现问题时,错误消息可能发生的地方。再次运行 SQL Server 2005 安装程序。如果收到错误消息,请重新安装 COM+。重新安装 COM+从控制面板的“添加或删除程序”中,单击“添加/删除 Windows 组件”。在“Windows 组件向导”中,不对选择做任何更改,单击“下一步”。一直单击以完成向导,然后再次运行 SQL Server 2005 安装程序。 2。在 SQL Server 安装开始前,Microsoft SQL Server 安装程序中的安装配置检查器 (SCC) 会验证计数器注册表项的值。如果 SCC 无法验证现有的注册表项,或 SCC 无法运行 lodctrexe 系统程序,则 SCC 检查会失败,致使安装受阻。错误编辑注册表会严重损坏您的系统。更改注册表项之前,建议您备份计算机中的所有重要数据。 手动设置计数器注册表项的增量在 Microsoft Windows 2003 或 Windows XP 桌面上,依次单击“开始”、“运行
>

昨天线上除了一点故障,因为突发性的用户增加,导致系统负载暴增,小部分机器上面因此CPU跟IO暴增,出现了不少的延迟、超时,导致消息队列MQ的部分任务重复执行,对用户推送了重复的消息,造成一定的影响。

虽然这是一个小故障,但仍然暴露着一些问题,在分布式系统中,随着用户量的增加,系统的稳定性、可用性、一致性都会遇到极大的挑战,即便是一个非常小的功能,稍有不慎,便会造成严重的后果。消息队列MQ是我们常用的一种分布式解耦神器,设计MQ的时候有一点常常被我们被我们忽略,便是MQ的幂等性。举一个简单的例子,经常我们手机上的APP某个软件无缘无故地重复推送某个消息,很大一部分原因就是因为没有做消息队列的幂等。例如当一个电商后台系统降价的时候,我们给用户推一个apppush,假设已经执行成功了,但是返回给消息队列系统的状态为成功的时候丢包了,消息队列系统误以为这个任务还没执行成功,又重新发起任务,重新向用户发送Push,便会造成重复的推送。

什么是幂等性呢?简单来说,就是一次或者多次请求某个资源,执行某次 *** 作,最终的结果应该一致。用数学上的语言来说,就是F(x)=F(F(x))。例如求一个数的绝对值,无论执行多少次,结果都是一样的。

为什么说消息队列的幂等性非常重要呢?消息不都是执行成功就是执行失败么?不,还有超时的情况,超时的时候消息队列并不知道消息是否已经消费成功,所以会继续消费,这种情况,在网络波动或者突发的流量最容易发生,而且通常一旦爆发就一大片。带来灾难性的影响。

如果只是简单的Push一条退款消息还好,只是造成了用户体验上的不适。如果因为幂等性问题重复给用户退款了,那就要造成资损了。

一般为了解决幂等性问题,我们一般有两种解决思路。 每次执行任务之前,查询下游系统 ,每次要执行MQ任务之前,先去下游系统问一问,这个任务之前是否已经执行过了,如果已经执行过了,那就直接返回成功。 另外一种解决方案,是下游提供幂等的接口 我也不关心是否之前已经执行过了,直接调用下游系统,由下游系统去保证幂等。

要做到幂等,我们往往需要唯一的标识,来标识是某一次 *** 作。如果用户量小,我们一般采用随机生成十几位字符即可。如果用户量大,请求量非常大,我们可能需要一个全局的唯一id生成算法,这里我推荐Twitter的Snowflake,github已经封装了不同语言的不同版本,非常容易使用。

好了,回到我们 一开始遇到的问题,假如我们生成MQ任务的时候,就生成一个全局的ID,在推送Push的时候,我们可以使用Redis等缓存组件把这个ID标识为已经发送,如果已经发送过,那就直接返回成功,在用不用担心给用户重复推送了。

在我们的开发中,消息队列或者其他分布式的调用随处可见,如果有重试逻辑,我们一定要注意幂等设计。在写代码的时候多10分钟去实现一次幂等 *** 作,可以省下后面10个小时查问题跟修复数据的时间。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/13401961.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-07-29
下一篇 2023-07-29

发表评论

登录后才能评论

评论列表(0条)

保存