基于RocketMQ可以实现分布式事务的最终一致性解决方案,它使用TransactionMQProducer发送事务消息,事务消息的状态包括:Unknown 未知状态,初始发送的消息就是该状态,该状态下事务消息不会被消费者消费;Commit状态 提交状态,该状态下事务消息会被消费者消费;Rollback状态 回滚状态,事务消息会在broker端被移除,不会被消费者消费。它的处理流程是这样:首先生产者通过TransactionMQProducer发送事务消息到broker,此时事务消息的状态是Unkown状态,不会被消费者消费;在生产者发送事务消息成功后,生产者会通TransactionListner接口执行本地事务,根据执行结果发送事务消息状态到broker,如果是Commit状态,则事务消息此时会被消费者消费处理,如果是Rollback状态,则事务消息会被broker移除,并不会被消费者消费,如果事务消息状态一直处于Unkown状态,则broker会通过生产者的TransactionLisner接口回查事务状态,根据事务状态按照前面的方式进行处理。消费者消费消息和普通的消费方式一样,通过重试机制尽量确保消费成功,如果重试多次还是失败进行人工通知处理。
一 发送事务消息package com.tech.rocketmq.transaction; import com.tech.rocketmq.jms.JmsConfig; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; @Slf4j @RestController public class TransactionController { @Autowired private TransactionProducer transactionMQProducer; @GetMapping("tran") public Object callback(String tag,String otherParam) throws Exception{ Message message = new Message(JmsConfig.TOPIC, tag, tag + "_key", tag.getBytes()); TransactionSendResult sendResult = transactionMQProducer.getProducer().sendMessageInTransaction(message, otherParam); log.info("发送结果={}",sendResult); return new HashMap<>(); } }
方便测试 otherParam参数与事务消息状态对应 1:此时会提交事务消息,消费者可以消费到 2:此时会回滚事务消息,broker端会移除事务消息,消费者无法消费到该条消息 3:broker定时回查事务消息状态
package com.tech.rocketmq.transaction; import com.tech.rocketmq.jms.JmsConfig; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.util.concurrent.*; @Slf4j @Component public class TransactionProducer { private String producerGroup = "trac_producer_group"; //事务监听器 private TransactionListener transactionListener = new TransactionListenerImpl(); private TransactionMQProducer producer = null; private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue二 消费者(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); public TransactionProducer() { producer=new TransactionMQProducer(producerGroup); producer.setNamesrvAddr(JmsConfig.NAME_SERVER); producer.setTransactionListener(transactionListener); producer.setExecutorService(executorService); start(); } public TransactionMQProducer getProducer(){ return this.producer; } public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public void shutdown(){ this.producer.shutdown(); } } @Slf4j class TransactionListenerImpl implements TransactionListener { //在发送消息时发送线程会调用该方法,执行本地事务 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("===================executeLocalTransaction====================="); String body = new String(msg.getBody()); String key = msg.getKeys(); String transactionId = msg.getTransactionId(); log.info("transactionId={},key={},body={}",transactionId,key,body); //执行本地事务 begin TODO //执行本地事务 end TODO int status = Integer.parseInt(arg.toString()); if(status == 1){ //已确认状态 对待确认的消息进行确认,消费者可以消费该消息了 return LocalTransactionState.COMMIT_MESSAGE; } if(status==2){ //回滚状态 回滚消息,该状态下的消息会在broker端删除 return LocalTransactionState.ROLLBACK_MESSAGE; } if(status==3){ //未知状态,broker端会调用checkLocalTransaction回查本地事务 return LocalTransactionState.UNKNOW; } //超时未返回,和未知状态消息一样,也会在等待一定时间后调用checkLocalTransaction回查本地事务 try { TimeUnit.SECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return null; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("===============checkLocalTransaction================="); String body = new String(msg.getBody()); String key = msg.getKeys(); String transactionId = msg.getTransactionId(); log.info("transactionId={},key={},body={}",transactionId,key,body); //要么commit 要么rollback //可以根据key去检查本地事务消息是否完成 return LocalTransactionState.COMMIT_MESSAGE; } }
package com.tech.rocketmq.transaction; import com.tech.rocketmq.jms.JmsConfig; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component public class TransactionConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "tran_consumer_group"; public TransactionConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe(JmsConfig.TOPIC,"*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String key = messageExt.getKeys(); try { log.info("Receive New Message: {}",new String(messageExt.getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println("消费异常"); e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
在测试时,通过给otherParam传不同的值,来观测Transaction接口回查事务消息状态方法的调用情况和消费者的消费情况。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)