目录
一:普通消息
二:顺序消息
三:延时消息
四:事务消息
五:批量消息
六:消息过滤
七:消息发送重试机制
八:消息消费重试机制
九:死信队列
一:普通消息
消息发送分类
Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。
1.同步发送
同步发送消息是指,Producer发出一条消息后,会在收到MQ返回的ACK之后才发一条消息。该方式的消息可靠性最高,但消息发送效率太低。
2.异步发送
异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下一条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
3.单向发送
单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。
代码举例
1.定义同步消息发送生产者
public static void main(String[] args) throws Exception { // 创建一个在名为pg组的生产者 DefaultMQProducer producer = new DefaultMQProducer("pg"); // 制定nameServer的地址 producer.setNamesrvAddr("MQ:9876"); // 设置生产者发送消息失败是重新发送消息的次数,默认为2次 producer.setRetryTimesWhenSendFailed(3); // 设置消息发送超时的时限,默认为3s producer.setSendMsgTimeout(5000); // 开启生产者 producer.start(); // 循环发送一百条消息 for (int i = 0; i < 100; i++) { // 消息发送结果 SendResult sendResult = new SendResult(); // 消息主体 byte[] body = ("Hi" + i).getBytes(); Message message = new Message("someTopic", "Sometag", body); // 为消息指定key message.setKeys("key-" + i); // 同步发送消息 sendResult = producer.send(message); System.out.println(sendResult); } // 关闭producer producer.shutdown(); }
2.定义异步消息发送生产者
{ // 创建生产者并且为其设置相关的属性 DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("MQ:9876"); // 指定异步发送失败后不进行重试发送 producer.setRetryTimesWhenSendAsyncFailed(0); // 指定新创建的Topic的Queue数量为2,默认为4 producer.setDefaultTopicQueueNums(2); // producer.setRetryTimesWhenSendFailed(5); // producer.setSendMsgTimeout(5000); producer.start(); // 异步发送100条消息 for (int i = 0; i < 100; i++) { byte[] bytes = ("Hi"+ i).getBytes(); Message message = new Message("MyTopicA", "MyTag", bytes); message.setKeys("key-"+ i); // 使用异步发送消息,并且指定回调函数 producer.send(message, new SendCallback() { // 当producer接收到MQ发送过来的ACK之后会回调此函数 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } // sleep一会儿 // 由于采用的是异步发送,所以若这里不sleep, // 则消息还未发送就会将producer给关闭,报错 TimeUnit.SECONDS.sleep(3); producer.shutdown(); }
3.定义单向消息发送生产者
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("MQ:9876"); producer.start(); for (int i = 0; i < 10; i++) { byte[] bytes = ("Hi," + i).getBytes(); Message message = new Message("single", "sometag", bytes); message.setKeys("key-" + i); producer.sendoneway(message); } producer.shutdown(); System.out.println("producer shutdown"); }
4.定义消息消费者
public static void main(String[] args) throws Exception { // 定义一个pull的消费者 DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer("cg"); // 定义一个push的消费者 DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("cg"); // 指定nameServer pushConsumer.setNamesrvAddr("MQ:9876"); // 指定从第一条消息开始消费 pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定消费topic与tag pushConsumer.subscribe("someTopic", "*"); // 指定采用“广播模式”进行消费,默认为“集群模式” // consumer.setMessageModel(MessageModel.BROADCASTING); // 注册消息监听器 pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List二:顺序消息list, ConsumeConcurrentlyContext context) { // 逐条消费消息 for (MessageExt message : list) { System.out.println(message); } // 返回消息消费成功消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 开启消费者消费消息 pushConsumer.start(); System.out.println("Consumer start"); }
顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。
默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。
采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。
四:事务消息在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在30分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消订单,将商品再次放回到库存;如果完成支付,则忽略。
在12306平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在45分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成付。如果未完成,则取消预订,将车票再次放回到票池;如果完成支付,则忽略。
1.问题引入
这里的一个需求场景是:工行用户A向建行用户B转账1万元。
我们可以使用同步消息来处理该需求场景:
1. 工行系统发送一个给B增款1万元的同步消息M给Broker
2. 消息被Broker成功接收后,向工行系统发送成功ACK
3. 工行系统收到成功ACK后从用户A中扣款1万元
4. 建行系统从Broker中获取到消息M
5. 建行系统消费消息M,即向用户B中增加1万元
问题引入:若第3步中的扣款 *** 作失败,但消息已经成功发送到了Broker。对于MQ来
说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元,而工行中用户并没有减少一万元,因此出现了数据不一致问题。
2.思路解决
解决思路是,让第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布式事务解决方案。
1. 事务管理器TM向事务协调器TC发起指令,开启全局事务
2. 工行系统发一个给B增款1万元的事务消息M给TC
3. TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker。此时的建行系统是看不到Broker中的消息M的
4. Broker会将预提交执行结果Report给TC。
5. 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调 *** 作,去完成工行用户A的预扣款1万元的 *** 作
6. 工行系统会向TC发送预扣款执行结果,即本地事务的执行状态
7. TC收到预扣款执行结果后,会将结果上报给TM。
预扣款 *** 作存在三种可能性:
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果}
8. TM会根据上报结果向TC发出不同的确认指令
- 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
- 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
- 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查 *** 作。回查 *** 作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback
9. TC在接收到指令后会向Broker与工行系统发出确认指令
- TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款 *** 作才真正被确认
- TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款 *** 作将被回滚
以上方案就是为了确保消息投递与扣款 *** 作能够在一个事务中,要成功都成功,有一个失败,则全部回滚。
以上方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款 *** 作间是同步的。
3.分布式事务
对于分布式事务,通俗地说就是,一次 *** 作由若干分支 *** 作组成,这些分支 *** 作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支 *** 作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证 *** 作结果的一致性。
4.事务消息
RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。
5.半事务消息
暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。
6.本地事务状态
Producer回调 *** 作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令。
7.消息回查
消息回查,即重新查询本地事务的执行状态。本例就是重新到DB中查看预扣款 *** 作是否执行成功。
8.XA模式三剑客
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式 *** 作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。XA模式中有三个重要组件:TC、TM、RM。
1)TC
Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。
RocketMQ中Broker充当TC
2)TR
Transaction Manager,事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。
RocketMQ中事务消息的Producer充当TM
3)RM
Resource Manager,资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
RocketMQ中事务消息的Producer及Broker均是RM
9.代码举例
1)定义工行事务监听器
package com.lwz.rocketmq.transcation; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; public class ICBCTransactionListener implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("预提交消息成功:" + message); // 假设接收到TAGA的消息就表示扣款 *** 作成功,TAGB的消息表示扣款失败, // TAGC表示扣款结果不清楚,需要执行消息回查 if (StringUtils.equals("TAGA", message.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TAGB", message.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } else if (StringUtils.equals("TAGC", message.getTags())) { return LocalTransactionState.UNKNOW; } return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("执行消息回查" + messageExt); return LocalTransactionState.COMMIT_MESSAGE; } }
2)定义事务消息生产者
package com.lwz.rocketmq.transcation; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import java.util.concurrent.*; public class TransactionProducer { public static void main(String[] args) throws Exception { TransactionMQProducer producer = new TransactionMQProducer("tpg"); producer.setNamesrvAddr("106.52.41.171:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); // 为生产者指定一个线程池 producer.setExecutorService(executorService); // 为生产者添加事务监听器 producer.setTransactionListener(new ICBCTransactionListener()); producer.start(); String[] tags = {"TAGA","TAGB","TAGC"}; for (int i = 0; i < 3; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("TTopic", tags[i], body); // 发送事务消息 // 第二个参数用于指定在执行本地事务时要使用的业务参数(下面的代码相当于开启了全局事务) SendResult sendResult = producer.sendMessageInTransaction(msg,null); System.out.println("发送结果为:" + sendResult.getSendStatus()); } } }
3)定义消费者
package com.lwz.rocketmq.transcation; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class SomeConsumer { public static void main(String[] args) throws Exception { // 定义一个pull的消费者 DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer("cg"); // 定义一个push的消费者 DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("cg"); // 指定nameServer pushConsumer.setNamesrvAddr("106.52.41.171:9876"); // 指定从第一条消息开始消费 pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定消费topic与tag pushConsumer.subscribe("TTopic", "*"); // 指定采用“广播模式”进行消费,默认为“集群模式” // consumer.setMessageModel(MessageModel.BROADCASTING); // 注册消息监听器 pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List五:批量消息list, ConsumeConcurrentlyContext context) { // 逐条消费消息 for (MessageExt message : list) { System.out.println(message); } // 返回消息消费成功消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 开启消费者消费消息 pushConsumer.start(); System.out.println("Consumer start"); } }
生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点:
- 批量发送的消息必须具有相同的Topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。
七:消息发送重试机制Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略
八:消息消费重试机制Producer对消费失败的消息进行重新消费的机制,称为消息消费重试机制。消息消费重试机制有顺序消息的消费重试、无序消息的消费重试
九:死信队列当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这个队列就是死信队列(Dead-Letter Queue,DLQ),而其中的消息则称为死信消息(Dead-Letter Message,DLM)。
死信队列是用于处理无法被正常消费的消息的。
死信队列中的消息不会再被消费者正常消费,即DLQ对于消费者是不可见的
死信存储有效期与正常消息相同,均为 3 天(commitlog文件的过期时间),3 天后会被自动删除
死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,即每个消费者组都有一个死信队列
如果一个消费者组未产生死信消息,则不会为其创建相应的死信队列
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)