<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-spring-boot-starterartifactId>
<version>2.0.4version>
dependency>
<dependency>
<groupId>org.apache.rocketmqgroupId>
<artifactId>rocketmq-clientartifactId>
<version>4.5.2version>
dependency>
配置
#RocketMQ配置
#消费者只需要配置mq的server地址即可,生产者也要配置
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=${spring.application.name}
# 发送时间超时时间
rocketmq.producer.send-message-timeout=300000
#异步消息发送失败重试次数
rocketmq.producer.retry-times-when-send-async-failed=0
#消息发送失败后的最大重试次数
rocketmq.producer.retry-times-when-send-failed=2
#消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
rocketmq.producer.compress-message-body-threshold=4096
#消息最大容量
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-next-server=true
事务消息生产者
/**
* @author: zhangzengxiu
* @createDate: 2022/4/25
*/
@Service
public class ProducerMsg {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发事务消息
*
* @param msg
* @return
*/
public boolean sendTransactionMsg(String msg) {
try {
Message message = new Message();
message.setBody(msg.getBytes());
System.out.println("========sending message=========");
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null);
System.out.println("========finish send =========");
return true;
} catch (MessagingException e) {
e.printStackTrace();
return false;
}
}
}
本地事务状态监听器
package com.test.msg;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
/**
* 事务消息Listener
*
* @author: zhangzengxiu
* @createDate: 2022/4/25
*/
@RocketMQTransactionListener(txProducerGroup = "tx-group")
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
/**
* 执行本地事务
* 如果本地事务返回UNKNOWN,会进行事务补偿,自动执行下面的checkLocalTransaction方法
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务=====");
//模拟提交事务
//return RocketMQLocalTransactionState.COMMIT;
//模拟回滚事务
//return RocketMQLocalTransactionState.ROLLBACK;
//让去check本地事务状态 进行事务补偿
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 检测本地事务状态
* 事务补偿过程
* 当消息服务器没有收到消息生产者的事务提交或者回滚确认时,会主动要求消息生产者进行确认,
* 消息生产者便会去检测本地事务状态,该过程称为事务补偿过程
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("执行事务补偿======");
//事务补偿提交
return RocketMQLocalTransactionState.COMMIT;
//事务补偿回滚
//return RocketMQLocalTransactionState.ROLLBACK;
//如果事务补偿过程还是UNKNOWN 就会一直进行事务补偿,60s一次
//return RocketMQLocalTransactionState.UNKNOWN;
}
}
事务消息消费者
package com.test.msg;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 消费事务消息的消费者
*
* @author: zhangzengxiu
* @createDate: 2022/4/25
*/
@Component
@RocketMQMessageListener(topic = "topic-tx", consumerGroup = "consumerMsg3")
public class ConsumerMsg3 implements RocketMQListener {
@Override
public void onMessage(Object message) {
System.out.println("xiaofei===");
System.out.println(message);
}
}
Java之RocketMQ详解
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)