rocketmq发送事务消息的过程为
1.发送half消息
2.消息队列返回发送结果
3.执行本地事务
4.提交/回滚/未知
5.如果第四步为未知,则消息队列会反查本地事务,本地事务查询后再通知消息队列最终提交或回滚
生产者
public class Producer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { TransactionMQProducer producer = new TransactionMQProducer("group_09"); producer.setNamesrvAddr("localhost:9876"); producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("执行本地事务"); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("执行补偿事务"); return LocalTransactionState.ROLLBACK_MESSAGE; } }); producer.start(); Message msg = new Message("topic14", "tag1", ("Hello RocketMQ transaction").getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("发送成功" + sendResult); } }
消费者
public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_03"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("topic14", "*"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt : list) { System.out.println(messageExt); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)