- 前言
- RocketMQ原生API使用
- 测试环境搭建
- RocketMQ的编程模型
- RocketMQ的消息样例
- 基本样例
- 顺序消息
- 广播消息
- 延迟消息
- 批量消息
- 过滤消息
- 事务消息
- 事务消息的实现机制
- 事务消息的作用
- ACL权限控制
记录RocketMQ
RocketMQ原生API使用使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里我们用SpringBoot来搭建一系列消息生产者和消息消费者,来访问我们之前搭建的RocketMQ集群。
测试环境搭建首先创建一个基于Maven的SpringBoot工程,引入如下依赖:
org.apache.rocketmq rocketmq-client4.7.1
另外还与一些依赖,例如openmessage、acl等扩展功能还需要添加对应的依赖。具体可以参见RocketMQ源码中的example模块。在RocketMQ源码包中的example模块提供了非常详尽的测试代码,也可以拿来直接调试。这里就用源码包中的示例来连接搭建的RocketMQ集群来进行演示。RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较老,但是还是都可以运行的。所以我们还是以官网上的顺序进行学习。
但是在调试这些代码的时候要注意一个问题:这些测试代码中的生产者和消费者都需要依赖NameServer才能运行,只需要将NameServer指向搭建的RocketMQ集群,而不需要管Broker在哪里,就可以连接自己的RocketMQ集群。而RocketMQ提供的生产者和消费者寻找NameServer的方式有两种:
- 在代码中指定namesrvAddr属性。例如consumer.setNamesrvAddr(“127.0.0.1:9876”);
- 通过NAMESRV_ADDR环境变量来指定。多个NameServer之间用分号连接。
然后RocketMQ的生产者和消费者的编程模型都是有个比较固定的步骤的,掌握这个固定的步骤,对于我们学习源码以及以后使用都是很有帮助的。
- 消息发送者的固定步骤
- 创建消息生产者producer,并制定生产者组名
- 指定Nameserver地址
- 启动producer
- 创建消息对象,指定主题Topic、Tag和消息体
- 发送消息
- 关闭生产者producer
- 消息消费者的固定步骤
- 创建消费者Consumer,制定消费者组名
- 指定Nameserver地址
- 订阅主题Topic和Tag
- 设置回调函数,处理消息
- 启动消费者consumer
那我们来逐一连接下RocketMQ都支持哪些类型的消息:
基本样例基本样例部分我们使用消息生产者分别通过三种方式发送消息,同步发送、异步发送以及单向发送。然后使用消费者来消费这些消息。
- 同步发送消息的样例见:org.apache.rocketmq.example.simple.Producer
//简单样例:同步发送消息 public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // producer.setNamesrvAddr("192.168.232.128:9876"); producer.start(); for (int i = 0; i < 20; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //同步传递消息,消息会发给集群中的一个Broker节点。 // SendResult sendResult = producer.send(msg); // System.out.printf("%s%n", sendResult); producer.sendoneway(msg); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
- 等待消息返回后再继续进行下面的 *** 作。异步发送消息的样例见:org.apache.rocketmq.example.simple.AsyncProducer
//简单样例:异步发送消息 public class AsyncProducer { public static void main( String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); // producer.setNamesrvAddr("192.168.232.128:9876"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; //由于是异步发送,这里引入一个countDownLatch,保证所有Producer发送消息的回调方法都执行完了再停止Producer服务。 final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); System.out.println("消息发送完成"); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }
这个示例有个比较有趣的地方就是引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。
- 单向发送消息的样例:
关键点就是使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。就是只管把消息发出去就行了。 - 使用消费者消费消息。
-消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。
拉模式的样例见:org.apache.rocketmq.example.simple.PullConsumer
public class PullConsumer { private static final MapOFFSE_TABLE = new HashMap (); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("192.168.232.128:9876"); consumer.start(); Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } }
推模式的样例见:org.apache.rocketmq.example.simple.PushConsumer
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); // consumer.setNamesrvAddr("192.168.232.128:9876"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
通常情况下,用推模式比较简单,实际上RocketMQ的推模式也是由拉模式封装出来的。4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。
顺序消息顺序消息生产者样例见:org.apache.rocketmq.example.order.Producer
public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // producer.setNamesrvAddr("192.168.232.128:9876"); producer.start(); for (int i = 0; i < 10; i++) { int orderId = i; for(int j = 0 ; j <= 5 ; j ++){ Message msg = new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId, ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(Listmqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } } producer.shutdown(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } } }
顺序消息消费者样例见:org.apache.rocketmq.example.order.Consumer
public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("192.168.232.128:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("OrderTopicTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for(MessageExt msg:msgs){ System.out.println("收到消息内容 "+new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); // 这样是保证不了最终消费顺序的。 // consumer.registerMessageListener(new MessageListenerConcurrently() { // @Override // public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { // for(MessageExt msg:msgs){ // System.out.println("收到消息内容 "+new String(msg.getBody())); // } // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } // }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
验证时,可以启动多个Consumer实例,观察下每一个订单的消息分配以及每个订单下多个步骤的消费顺序。不管订单在多个Consumer实例之前是如何分配的,每个订单下的多条消息顺序都是固定从0~5的。RocketMQ保证的是消息的局部有序,而不是全局有序。先从控制台上看下List mqs是什么。再回看我们的样例,实际上,RocketMQ也只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序。所以这就涉及到了RocketMQ消息有序的原理。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。首先在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。**而Broker中一个队列内的消息是可以保证有序的。**然后在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个
来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。
广播消息的消息生产者样例见org.apache.rocketmq.example.broadcast.PushConsumer
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } }
广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。
延迟消息public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); }// Shutdown producer after use. producer.shutdown(); } }
延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。
那会延迟多久呢?延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这从哪里看出来的?其实从rocketmq-console控制台就能看出来。而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改。
批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
批量消息的消息生产者样例见:org.apache.rocketmq.example.batch.SimpleBatchProducer
public class SplitBatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.start(); //large batch String topic = "BatchTest"; Listmessages = new ArrayList<>(100 * 1000); for (int i = 0; i < 100 * 1000; i++) { messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes())); } // producer.send(messages); //split the large batch into small ones: ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { List listItem = splitter.next(); producer.send(listItem); } producer.shutdown(); } } class ListSplitter implements Iterator > { private int sizeLimit = 1000 * 1000; private final List
messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; //for log overhead if (tmpSize > sizeLimit) { //it is unexpected that single message exceeds the sizeLimit //here just let it go, otherwise it will block the splitting process if (nextIndex - currIndex == 0) { //if the next sublist has no element, add this one and then break, otherwise just break nextIndex++; } break; } if (tmpSize + totalSize > sizeLimit) { break; } else { totalSize += tmpSize; } } List subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } @Override public void remove() { throw new UnsupportedOperationException("Not allowed to remove"); } }
org.apache.rocketmq.example.batch.SplitBatchProducer
public class SimpleBatchProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.start(); //If you just send messages of no more than 1MiB at a time, it is easy to use batch //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support String topic = "BatchTest"; Listmessages = new ArrayList<>(); messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes())); messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes())); messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes())); producer.send(messages); producer.shutdown(); } }
相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限
制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。
使用Tag过滤消息的消息生产者案例见:org.apache.rocketmq.example.filter.TagFilterProducer
public class TagFilterProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC"}; for (int i = 0; i < 15; i++) { Message msg = new Message("TagFilterTest", tags[i % tags.length], "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
使用Tag过滤消息的消息消费者案例见:org.apache.rocketmq.example.filter.TagFilterConsumer
public class TagFilterConsumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); consumer.subscribe("TagFilterTest", "TagA || TagC"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
主要是看消息消费者。consumer.subscribe(“TagFilterTest”, “TagA || TagC”); 这句只订阅TagA和TagC的消息。
TAG是RocketMQ中特有的一个消息属性。RocketMQ的最佳实践中就建议,使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。 这时候,可以使用SQL表达式来对消息进行过滤。
SQL过滤的消息生产者案例见:org.apache.rocketmq.example.filter.SqlFilterProducer
public class SqlFilterProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC"}; for (int i = 0; i < 15; i++) { Message msg = new Message("SqlFilterTest", tags[i % tags.length], ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
SQL过滤的消息消费者案例见:org.apache.rocketmq.example.filter.SqlFilterConsumer
public class SqlFilterConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Don't forget to set enablePropertyFilter=true in broker consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 and 3)")); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
这个模式的关键是在消费者端使用MessageSelector.bySql(String sql)返回的一个MessageSelector。这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。
SQL92语法:RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。
首先,了解下什么是事务消息。官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个 *** 作的原子性,也就是这两个 *** 作一起成功或者一起失败。
其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个 *** 作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。
事务消息生产者的案例见:org.apache.rocketmq.example.transaction.TransactionProducer
public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); // producer.setNamesrvAddr("127.0.0.1:9876"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。源码中的案例有点复杂,这里准备了一个更清晰明了的事务监听器示例
public class TransactionListenerImpl implements TransactionListener { //在提交完事务消息后执行。 //返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。 //返回ROLLBACK_MESSAGE状态的消息会被丢弃。 //返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String tags = msg.getTags(); //TagA的消息会立即被消费者消费到 if(StringUtils.contains(tags,"TagA")){ return LocalTransactionState.COMMIT_MESSAGE; //TagB的消息会被丢弃 }else if(StringUtils.contains(tags,"TagB")){ return LocalTransactionState.ROLLBACK_MESSAGE; //其他消息会等待Broker进行事务状态回查。 }else{return LocalTransactionState.UNKNOW; } }//在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String tags = msg.getTags(); //TagC的消息过一段时间会被消费者消费到 if(StringUtils.contains(tags,"TagC")){ return LocalTransactionState.COMMIT_MESSAGE; //TagD的消息也会在状态回查时被丢弃掉 }else if(StringUtils.contains(tags,"TagD")){ return LocalTransactionState.ROLLBACK_MESSAGE; //剩下TagE的消息会在多次状态回查后最终丢弃 }else{return LocalTransactionState.UNKNOW;}}}
然后,我们要了解下事务消息的使用限制:
- 事务消息不支持延迟消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SEConDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
- 事务性消息可能不止一次被检查或消费。
- 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。
事务消息只保证了发送者本地事务和发送消息这两个 *** 作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务,RocketMQ提供的事务消息也是目前业内最佳的降级方案。
ACL权限控制权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在
$ROCKETMQ_HOME/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。
public class AclClient { private static final MapOFFSE_TABLE = new HashMap (); private static final String ACL_ACCESS_KEY = "RocketMQ"; private static final String ACL_SECRET_KEY = "1234567"; public static void main(String[] args) throws MQClientException, InterruptedException { producer(); pushConsumer(); pullConsumer(); } public static void producer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook()); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } public static void pushConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAclRPCHook(), new AllocateMessageQueueAveragely()); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // Wrong time format 2017_0422_221800 consumer.setConsumeTimestamp("20180422221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); printBody(msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } public static void pullConsumer() throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_6", getAclRPCHook()); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); printBody(pullResult); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void printBody(PullResult pullResult) { printBody(pullResult.getMsgFoundList()); } private static void printBody(List msg) { if (msg == null || msg.size() == 0) return; for (MessageExt m : msg) { if (m != null) { System.out.printf("msgId : %s body : %s nr", m.getMsgId(), new String(m.getBody())); } } } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); } }
如果要在自己的客户端中使用RocketMQ的ACL功能,还需要引入一个单独的依赖包
org.apache.rocketmq rocketmq-acl4.7.1
而Broker端具体的配置信息可以参见源码包下docs/cn/acl/user_guide.md。主要是在broker.conf中打开acl的标志:aclEnable=true。然后就可以用plain_acl.yml来进行权限配置了。并且这个配置文件是热加载的,也就是说要修改配置时,只要修改配置文件就可以了,不用重启Broker服务。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)