在分析完RocketMQ Consumer端的消息拉取流程之后,发现一个很有意思的钩子函数,可以用来自定义消息的过滤消费。那么我们来看一下RocketMQ是如何实现过滤消费的?
二、消息过滤方式 1、使用Tag标签在大多数情况下,TAG是一个简单而有用的设计,其可以来简单的选择我们想要的消息。
1)实现方式Consumer代码:
public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer"); consumer.setNamesrvAddr("10.10.100.100:9876"); // 消费者订阅topic时,添加Tag过滤规则 consumer.subscribe("saint-study-topic", "TAG-A"); // 集群模式,只有一个consumer能消费到消息。 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer start。。。。。。"); }
生产者代码:
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("saint-study"); // 设置nameserver地址 producer.setNamesrvAddr("10.10.100.100:9876"); producer.start(); // topic 和body Message msg = new Message("saint-study-topic", "TAG-A", "key01", "study010".getBytes(StandardCharsets.UTF_8)); Message msg2 = new Message("saint-study-topic", "TAG-B", "key02", "study011".getBytes(StandardCharsets.UTF_8)); Message msg3 = new Message("saint-study-topic", "TAG-C", "key03", "study012".getBytes(StandardCharsets.UTF_8)); Message msg4 = new Message("saint-study-topic", "TAG-A", "key04", "study013".getBytes(StandardCharsets.UTF_8)); Listlist = new ArrayList<>(); list.add(msg); list.add(msg2); list.add(msg3); list.add(msg4); // 批量消息发送消息 SendResult send = producer.send(list); System.out.println("sendResult: " + send); // 关闭生产者 producer.shutdown(); System.out.println("已经停机"); }
运行生产者,我们可以发现成功生产了4条消息:
运行消费者,我们可以发现只消费到了TAG为“TAG-A”的消息,即:study010、study013
- 启动消费者1,消费组为study-consumer,订阅saint-study-topic的消息,tag设置为TAG-A
- 启动消费者2,消费组也为study-consumer,也订阅saint-study-topic的消息,但是tag设置为TAG-B
- 启动生产者,生产者发送含有TAG-A,TAG-B的消息各3条
- 消费者1没有收到任何消息,消费者2收到部分消息(tag为TAG-B)
从下图我们可以看到,消费者2消费到了tag为TAG-B的消息;而消费者1并没有消费到任何消息
同一个消费组,给不同的消费者设置不同的tag时,后启动的消费者会覆盖先启动的消费者设置的tag。
- tag是消息过滤的条件,经过服务端和客户端两层过滤,最后只有后启动的消费者才能收到部分消息。
-
同一个consumer group的订阅关系,保存在RebalanceImpl类的Map中。key为topic;
再看SubscriptionData中,有一个tagsSet用来表示当前topic对应消费组的包含的Tag信息。
-
不同的消费者启动后,依次注册订阅关系,因为tag不一样,导致Map中同一topic的tag被覆盖。比如:消费者1订阅tag1,消费者2订阅tag2。最后map中只保存tag2;
-
过滤的核心是是tag,tag被更新,过滤条件被改变。服务端过滤后只返回tag2的消息;
-
客户端接收消息后,再次过滤。先启动的消费者1订阅tagA,但是服务端返回tag2,所以消费者1收不到任何消息。消费者2能收到一半的消息(集群模式,假设消息平均分配,另外一半分给tag2)
Tag过滤语法简单,但是其灵活性也比较差,相对比较适合过滤场景简单 且 客户端对计算资源不是很敏感的用户。如果想实现更为复杂的消息过滤功能可以使用Sql Filter。
注意:只有使用push模式的消费者才能用使用SQL92标准的sql语句。
1)基本语法RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
Producer代码:
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("saint-study"); // 设置nameserver地址 producer.setNamesrvAddr("10.10.100.100:9876"); producer.start(); // topic 和body Listmessages = new ArrayList<>(); for (int i = 10; i < 30; i ++) { Message msg = new Message("saint-study-topic", "TAG-B", "key02", ("study000" + i).getBytes(StandardCharsets.UTF_8)); msg.putUserProperty("age", String.valueOf(i)); messages.add(msg); } SendResult send = producer.send(messages); System.out.println("sendResult: " + send); // 关闭生产者 producer.shutdown(); System.out.println("已经停机"); }
Consumer代码:
public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer"); consumer.setNamesrvAddr("10.10.100.100:9876"); // 给consumer关联SQL过滤规则 MessageSelector messageSelector = MessageSelector.bySql("age >= 18 and age <= 28"); consumer.subscribe("saint-study-topic", messageSelector); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List三、原理剖析 1、TAG过滤原理 1)服务端过滤msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer start。。。。。。"); }
留个坑,等研究完Broker端消息的存储再补充。
2)客户端过滤客户端过滤的代码实现体现在PullAPIWrapper#processPullResult()方法中,这个我们在《Rocket源码分析pullMessage:Consumer是如何从broker拉取消息的?》一文中有详细介绍Consumer拉取消息的流程。
如果内存中主题订阅信息中tagsSet不为空,并且Consumer订阅时不是ClassFilter会执行Tag过滤;Tag过滤就是单纯的字符串值的比对。
为什么服务端做了Tag过滤,客户端还要做呢?
- 服务端采用Hash的方式存储Tag信息,所以必然会存在hash冲突的情况,导致过滤存在不准确性,所以客户端需要进行精确过滤。
- 客户度通过tag的字符串值做对比,不相等的tag消息不返回给消费者。
存服务端过滤。
留个坑,等研究完Broker端消息的存储再补充。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)