目录
1. 消息可靠性保证
1.1. 异常捕获机制
1.2. RabbitMQ/AMQP的事务机制
1.3. 发送端的消息确认机制
1.4. 消息持久化机制
1.5. 消费端的消息确认机制
1.6. 消费端限流
1.7. 消息可靠性保障
1.8. 消息幂等
2. 消息消费性能提升
2.1. 如何提升下游消费效率
3. 消息可靠性分析
3.1. Firehose分析消息丢失原因
3.2. rabbitmq_tracing 插件
一般我们使用支付宝或微信转账支付的时候,都是扫码,支付,然后立刻得到结果,说你支付了多少钱,如果你绑定的是yhk,可能这个时候你并没有收到支付的确认消息。往往是在一段时间之后,你会收到yhk发来的短信,告诉你支付的信息。 支付平台如何保证这笔帐不出问题呢?
一般是通过以下几种方式来保证数据一致性: 1.分布式锁 这种方式比较容易理解,就是在 *** 作某条数据时先锁定,可以通过Redis或者Zookeeper等常用框架来实现。比如修改账单时,先锁定账单,如果账单有并发 *** 作,后面的 *** 作只能等到锁释放之后才能进行后续 *** 作。 优点 :保证数据强一致性 缺点 :高并发场景下会有性能问题 2.消息队列 通过消息队列保障数据的最终一致性,因此就需要确保消息队列在发送的时候肯定发送成功,在消费消息的时候也是需要保障发送成功的,并且在中间的过程中消息不会丢失。 优点 :异步,高并发 缺点 :弱一致性,且存在一定延迟,并且需确保该 *** 作一定能完成不能失败因此我们在使用rabbitmq来保障消息一致性的时候需要从以下几方面保障消息的可靠性: 1.客户端异常捕获,包括生产者与消费者 2.RabbitMQ/AMQP的事务机制 3.发送端的消息确认机制 4.消息持久化机制 5.Broker的高可用集群 6. 消费端的消息确认机制 7.消费端限流 8.消息幂等性1. 消息可靠性保证 1.1. 异常捕获机制 消息发送过程通过 try catch 方式捕获异常, 在异常处理的代码块中执行回滚业务 *** 作或者执行重发 *** 作等。这是一种最大努力确保的方式, 并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。 1.2. RabbitMQ/AMQP的事务机制
可以通过原生的接口通过事务的方式来保障消息投递成功,但是这种方式在性能方面的开销比较大,一般不推荐使用
1.3. 发送端的消息确认机制1.4. 消息持久化机制 持久化是提高RabbitMQ 可靠性的基础,否则当 RabbitMQ 遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性: 1. Exchange 的持久化。通过定义时设置 durable 参数为 ture 来保证 Exchange 相关的元数据不丢失。 2. Queue 的持久化。也是通过定义时设置 durable 参数为 ture 来保证 Queue 相关的元数据不丢失。 3. 消息 的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性 ) 设置为 2 即可实现消息的持久化,保证消息自身不丢失。详情可参考:RabbitMQ中消息确认机制_李嘉图呀李嘉图的博客-CSDN博客
注:Exchange 和 Queue 的持久化只能保证 Exchange 跟 Queue 在RabbitMQ重启之后仍然存在,如果消息没有设置持久化的话,仅设置 Exchange 和 Queue 的持久化,消息仍然会丢失,想要保证消息不丢失, 交换机,队列,消息 三者的持久化缺一不可 详情可参考: RabbitMQ的基本架构与实现原理_李嘉图呀李嘉图的博客-CSDN博客1.5. 消费端的消息确认机制
1.6. 消费端限流详情可参考:RabbitMQ中消息确认机制_李嘉图呀李嘉图的博客-CSDN博客
在电商秒杀活动中,活动会有大量并发请求发到服务端,服务器肯定无法同时处理这么多请求,因此需要对消息进行削峰处理,但是如何削峰呢?
当消息投递速度远大于消费速度时,随着时间积累就会出现“消息积压”,消息中间件本身具备一定得缓冲能力,但是这个能力受限于服务器的设置的容量,长时间消息积压则会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,进而导致更大的崩溃......
因此我们需要从多个角度进行限流,防止以上问题的发生。
1.在RabbitMQ中对内存和磁盘使用量设置阈值,当到达一定得阈值之后,生产者被阻塞(block), 直到对应项指标恢复正常。在全局上防止超大流量,消息积压冲垮Broker。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停发布消息的已连接客户端的套接字读取数据,连接心跳监视也将会被禁用。所有网络连接将会在rabbitmqctl和管理插件中显示为“已阻止”。
# 设置磁盘可用空间大小,单位字节。当磁盘可用空间低于这个值的时候。 # 发出磁盘警告,引发限流。 # 如果设置了相对大小,则忽略此绝对大小。 disk_free_limit.absolute = 50000 # 使用计量单位, 从rabbitmq 3.6.0 开始有效。对 vm_memory_high_watermark 同样有效 # disk_free_limit.absolute = 500KB # disk_free_limit.absolute = 50mb # disk_free_limit.absolute = 5GB # 还可以使用相对于总可用内存的相对值来设置,注意:此相对值不要低于1.0! # 当磁盘可用空间低于总可用内存的2.0倍的时候,出发限流 # disk_free_limit.relative = 2.0 # 内存限流阈值设置 # 0.4标识阈值总可用内存的比值,总可用内存标识 *** 作系统给每隔进程分配的大小,或实际内存大小 # vm_memory_high_watemark.relative = 0.4 # # 还可以直接通过绝对值限制可用内存大小,单位字节 # vm_memory_high_watermark.absolute = 107374184 # # 从RabbitMQ 3.6.0 开始,绝对值支持计量单位,如果设置了相对值,则忽略此相对值。 # vm_memory_high_watermark.absolute = 2GB
2.RabbitMQ默认提供了一种基于 credit flow 的流控机制,针对每一个连接进行流控,当单个队列达到最大流速时,或者多个队列达到总流速时,都会出发流控,出发单个连接的流控可能是应为 connection、channel、queue 的某一个过程处于 flow 状态,这些状态都可以从监控平台看到。
3.RabbitMQ中有一种Qos保证机制,可以限制 Channel 上接收的未被 ACK 的消息数量,如果超过这个数量限制 RabbitMQ将不会再往消费端推送消息,这样可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费者)。需要注意的是,Qos机制仅对于消费端推模式有效,对拉模式无效。且不支持NONE Ack模式。在执行channel.basicConsume 方法之前通过 channel.basicQos方法可以设置该数量。 通过对Qos 的 prefetchCount 进行设置,保证消费者待处理消息永远小于 prefetchCount 个
1.7. 消息可靠性保障 消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级:
1. At most once:最多一次。消息可能会丢失,但绝不会重复传输
2. At least once:最少一次。消息绝不会丢失,但可能会重复传输
3. Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次
RabbitMQ 支持其中的“最多一次”和“最少一次“
其中“最少一次”投递实现需要考虑以下这个几个方面的内容:
1. 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传 输到RabbitMQ 中。
2. 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路 由到队列中,进而能够保存下来而不会被丢弃。
3. 消息和队列都需要进行持久化处理,以确保RabbitMQ 服务器在遇到异常情况时不会 造成消息丢失。
4. 消费者在消费消息的同时需要将autoAck 设置为false,然后通过手动确认的方式去确 认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。
“最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样 很难确保消息不会丢失。(估计有不少公司的业务系统都是这样的)
“恰好一次”是RabbitMQ 目前无法保障的。
考虑这样一种情况,消费者在消费完一条消息之后向RabbitMQ 发送确认Basic.Ack 命令,此时由于网络断开或者其他原因造成RabbitMQ 并没有收到这个确认命令,那么RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。
再考虑一种情况,生产者在使用publisher /confirm/i机制的时候,发送完一条消息等待RabbitMQ返
回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样
RabbitMQ 中就有两条同样的消息,在消费的时候消费者就会重复消费。
由于 RabbitMQ 并没有去重机制来保证消息恰好一次,因此我们只能在业务中自行进行处理,相对于金融类业务之外,很多业务场景都是可以容忍重复消费的。因此常规的解决办法就是在消费端让我们的消费消息的 *** 作具备幂等性。
幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC框架调用超时重试机制,HTTP请求重复发起。
幂等是一个数学上的概念:
如果一个函数f(x) 满足:f(f(x)) = f(x),则函数f(x) 满足幂等性。这个概念被拓展到计算机领域,被用来描述一个 *** 作、方法或者服务。
一个幂等 *** 作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。对于幂等的方法,不用担心重复执行会对系统造成任何改变。
举个简单的例子(在不考虑并发问题的情况下):
#这两条sql语句就是天然幂等的,它本身的重复执行并不会引起什么改变。而update就要看情况的 select * from xx where id=1 delete from xx where id=1 #这条语句执行1次和100次都是一样的结果(最终余额都还是100),所以它是满足幂等性的。 update xxx set amount = 100 where id =1 #这条语句就不满足幂等性的 update xxx set amount = amount + 100 where id =1
业界对于幂等性的一些常见做法:
1. 借助数据库唯一索引,重复插入直接报错,事务回滚。还是举经典的转账的例子,为了保证不重复扣款或者重复加钱,维护一张“资金变动流水表”,里面至少需要交易单号、变动账户、变动金额等3个字段。选择交易单号和变动账户做联合唯一索引(单号是上游生成的可保证唯性),这样如果同一笔交易发生重复请求时就会直接报索引冲突,事务直接回滚。现实中,数据库唯一索引的方式通常做为兜底保证;
2. 前置检查机制。这个很容易理解且有几种实现办法。还是引用转账的例子,在执行更改账户余额之前,我得先检查下资金变动流水表(或者Tair中)中是否已经存在这笔交易相关的记录, select * from xxx where accountNumber=xxx and orderId=yyy ,如果已经存在,那么直接返回,否则执行正常的更新余额的动作。为了防止并发问题,我们通常需要借助“排他锁”来完成。在支付宝有一条铁律叫:一锁、二判、三 *** 作。当然,我们也可以使用乐观锁或CAS机制,乐观锁一般会使用扩展一个版本号字段做判断条件
3. 唯一Id机制,比较通用的方式。对于每条消息都可以生成唯一Id,消费前判断Tair中是否存在(MsgId做Tair排他锁的key),消费成功后将状态写入Tair中,这样就可以防止重复消费了。
对于接口请求类的幂等性保证要相对更复杂,通常要求上游请求时传递一个类GUID的请求号(或TOKEN),如果我们发现已经存在了并且上一次请求处理结果是成功状态的(有时候上游的重试请求是正常诉求,我们不能将上一次异常/失败的处理结果返回或者直接提示“请求异常”,如果这样重试就变得没意义了)则不继续往下执行,直接返回 “重复请求” 的提示和上次的处理结果(上游通常是由于请求超时等未知情况才发起重试的,所以直接返回上次请求的处理结果就好了)。如果请求ID都不存在或者上次处理结果是失败/异常的,那就继续处理流程,并最终记录最终的处理结果。这个请求序号由上游自己生成,上游通用需要根据请求参数、时间间隔等因子来生成请求ID。同样也需要利用这个请求ID做分布式锁的KEY实现排他。
2. 消息消费性能提升 2.1. 如何提升下游消费效率1.优化应用程序性能,缩短响应时间(需要时间)
2.增加消费者节点实例(成本增加,且底层数据库 *** 作也可能是瓶颈)
3.调整并发消费者线程数(线程数并非越大越好,需要大量压测调优至合理值,且需要 注意 channel 内非线程安全)
ConnectionFactory factory = new ConnectionFactory(); //设置channel的并发请求最大数 factory.setRequestedChannelMax(10); //设置自定义线程工厂 ThreadFactory threadFactory = Executors.defaultThreadFactory(); factory.setThreadFactory(threadFactory);
@Bean public RabbitListenerContainerFactory(ConnectionFactory connectionFactory){ // SimpleRabbitListenerContainerFactory 发现消息中有 connect_type 有 text 就会默认将其转 // 换成 String 类型的,没有 content_type 都按 byte[] 类型 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 设置并发线程数 factory.setConcurrentConsumers(10); // 设置最大并发线程数 factory.setMaxConcurrentConsumers(20); return factory; }3. 消息可靠性分析 3.1. Firehose分析消息丢失原因
在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。
在 RabbitMQ 中可以使用 Firehose 功能来实现消息追踪,Firehose 可以记录每一次发送或者消费消息的记录,方便 RabbitMQ 的使用者进行调试、排错等。
Firehose 的原理是将生产者投递给 RabbitMQ 的消息,或者 RabbitMQ 投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认的交换器的名称为 amq.rabbitmq.trace,它是一个 topic 类型的交换器。发送到这个交换器上的消息的路由键为 publish.{exchangename} 和 deliver.{queuename} 。其中 exchangename 和 queuename 为交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。
#开启Firehose命令: rabbitmqctl trace_on [-p vhost] #关闭Firehose命令: rabbitmqctl trace_off [-p vhost]
Firehose 默认情况下处于关闭状态且状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。由于 Firehose 开启之后会影响整体服务性能,因此在生产环境上是不允许开启的。
3.2. rabbitmq_tracing 插件rabbitmq_tracing 插件相当于 Firehose 的 GUI 版本,它同样能跟踪 RabbitMQ 中消息的流入流出情况。rabbitmq_tracing 插件同样会对流入流出的消息进行封装,然后将封装后的消息日志存入相应的 trace 文件中。
# 启动rabbitmq_ tracing 插件 rabbitmq-plugins enable rabbitmq_tracing # 关闭该插件 rabbitmq-plugins disable rabbitmq_tracing
Name表示rabbitmq_tracing的一个条目的名称,Format可以选择 Text 或 JSON
Pattern:发布的消息:publish. 欢迎分享,转载请注明来源:内存溢出
Pattern:消费的消息:deliver.
评论列表(0条)