RocketMQ实现分布式事务

RocketMQ实现分布式事务,第1张

RocketMQ实现分布式事务

        基于RocketMQ可以实现分布式事务的最终一致性解决方案,它使用TransactionMQProducer发送事务消息,事务消息的状态包括:Unknown 未知状态,初始发送的消息就是该状态,该状态下事务消息不会被消费者消费;Commit状态 提交状态,该状态下事务消息会被消费者消费;Rollback状态 回滚状态,事务消息会在broker端被移除,不会被消费者消费。它的处理流程是这样:首先生产者通过TransactionMQProducer发送事务消息到broker,此时事务消息的状态是Unkown状态,不会被消费者消费;在生产者发送事务消息成功后,生产者会通TransactionListner接口执行本地事务,根据执行结果发送事务消息状态到broker,如果是Commit状态,则事务消息此时会被消费者消费处理,如果是Rollback状态,则事务消息会被broker移除,并不会被消费者消费,如果事务消息状态一直处于Unkown状态,则broker会通过生产者的TransactionLisner接口回查事务状态,根据事务状态按照前面的方式进行处理。消费者消费消息和普通的消费方式一样,通过重试机制尽量确保消费成功,如果重试多次还是失败进行人工通知处理。

一 发送事务消息
package com.tech.rocketmq.transaction;

import com.tech.rocketmq.jms.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;


@Slf4j
@RestController
public class TransactionController {
    
    @Autowired
    private TransactionProducer transactionMQProducer;

    
    @GetMapping("tran")
    public Object callback(String tag,String otherParam) throws Exception{
        Message message = new Message(JmsConfig.TOPIC, tag, tag + "_key", tag.getBytes());
        TransactionSendResult sendResult = transactionMQProducer.getProducer().sendMessageInTransaction(message, otherParam);
        log.info("发送结果={}",sendResult);
        return new HashMap<>();
    }
}
方便测试 otherParam参数与事务消息状态对应
1:此时会提交事务消息,消费者可以消费到
2:此时会回滚事务消息,broker端会移除事务消息,消费者无法消费到该条消息
3:broker定时回查事务消息状态
package com.tech.rocketmq.transaction;

import com.tech.rocketmq.jms.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;


@Slf4j
@Component
public class TransactionProducer {
    private String producerGroup = "trac_producer_group";
    //事务监听器
    private TransactionListener transactionListener = new TransactionListenerImpl();
    private TransactionMQProducer producer = null;
    private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
            TimeUnit.SECONDS, new ArrayBlockingQueue(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });

    public TransactionProducer() {
        producer=new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        producer.setTransactionListener(transactionListener);
        producer.setExecutorService(executorService);
        start();
    }
    
    public TransactionMQProducer getProducer(){
        return this.producer;
    }

    
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    
    public void shutdown(){
        this.producer.shutdown();
    }
    
}

@Slf4j
class TransactionListenerImpl implements TransactionListener {
    
    //在发送消息时发送线程会调用该方法,执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("===================executeLocalTransaction=====================");
        String body = new String(msg.getBody());
        String key = msg.getKeys();
        String transactionId = msg.getTransactionId();
        log.info("transactionId={},key={},body={}",transactionId,key,body);
        //执行本地事务 begin  TODO 
        //执行本地事务 end  TODO 
        int status = Integer.parseInt(arg.toString());
        if(status == 1){
            //已确认状态 对待确认的消息进行确认,消费者可以消费该消息了
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        if(status==2){
            //回滚状态 回滚消息,该状态下的消息会在broker端删除
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        if(status==3){
            //未知状态,broker端会调用checkLocalTransaction回查本地事务
            return LocalTransactionState.UNKNOW;
        }
        
        //超时未返回,和未知状态消息一样,也会在等待一定时间后调用checkLocalTransaction回查本地事务
        try {
            TimeUnit.SECONDS.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("===============checkLocalTransaction=================");
        String body = new String(msg.getBody());
        String key = msg.getKeys();
        String transactionId = msg.getTransactionId();
        log.info("transactionId={},key={},body={}",transactionId,key,body);
        //要么commit 要么rollback
        //可以根据key去检查本地事务消息是否完成
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
二 消费者
package com.tech.rocketmq.transaction;

import com.tech.rocketmq.jms.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;


@Slf4j
@Component
public class TransactionConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup = "tran_consumer_group";

    public TransactionConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(JmsConfig.TOPIC,"*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs.get(0);
                String key = messageExt.getKeys();
                try {
                    log.info("Receive New Message: {}",new String(messageExt.getBody()));
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    System.out.println("消费异常");
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}

在测试时,通过给otherParam传不同的值,来观测Transaction接口回查事务消息状态方法的调用情况和消费者的消费情况。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5597859.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存