RocketMQ过滤消费实现方式、原理以及过滤消费导致消息丢失问题

RocketMQ过滤消费实现方式、原理以及过滤消费导致消息丢失问题,第1张

RocketMQ过滤消费实现方式、原理以及过滤消费导致消息丢失问题 一、背景

在分析完RocketMQ Consumer端的消息拉取流程之后,发现一个很有意思的钩子函数,可以用来自定义消息的过滤消费。那么我们来看一下RocketMQ是如何实现过滤消费的?

二、消息过滤方式 1、使用Tag标签

在大多数情况下,TAG是一个简单而有用的设计,其可以来简单的选择我们想要的消息。

1)实现方式

Consumer代码:

public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer");
    consumer.setNamesrvAddr("10.10.100.100:9876");

    // 消费者订阅topic时,添加Tag过滤规则
    consumer.subscribe("saint-study-topic", "TAG-A");

    // 集群模式,只有一个consumer能消费到消息。
    consumer.setMessageModel(MessageModel.CLUSTERING);

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
    System.out.println("Consumer start。。。。。。");

}

生产者代码:

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("saint-study");
    // 设置nameserver地址
    producer.setNamesrvAddr("10.10.100.100:9876");
    producer.start();

    // topic 和body
    Message msg = new Message("saint-study-topic", "TAG-A", "key01", "study010".getBytes(StandardCharsets.UTF_8));
    Message msg2 = new Message("saint-study-topic", "TAG-B", "key02", "study011".getBytes(StandardCharsets.UTF_8));
    Message msg3 = new Message("saint-study-topic", "TAG-C", "key03", "study012".getBytes(StandardCharsets.UTF_8));
    Message msg4 = new Message("saint-study-topic", "TAG-A", "key04", "study013".getBytes(StandardCharsets.UTF_8));

    List list = new ArrayList<>();
    list.add(msg);
    list.add(msg2);
    list.add(msg3);
    list.add(msg4);

    // 批量消息发送消息
    SendResult send = producer.send(list);
    System.out.println("sendResult: " + send);

    // 关闭生产者
    producer.shutdown();
    System.out.println("已经停机");
}

运行生产者,我们可以发现成功生产了4条消息:

运行消费者,我们可以发现只消费到了TAG为“TAG-A”的消息,即:study010、study013

2)错误使用TAG过滤消费导致消息丢失问题! (1)问题描述
  1. 启动消费者1,消费组为study-consumer,订阅saint-study-topic的消息,tag设置为TAG-A
  2. 启动消费者2,消费组也为study-consumer,也订阅saint-study-topic的消息,但是tag设置为TAG-B
  3. 启动生产者,生产者发送含有TAG-A,TAG-B的消息各3条
  4. 消费者1没有收到任何消息,消费者2收到部分消息(tag为TAG-B)

从下图我们可以看到,消费者2消费到了tag为TAG-B的消息;而消费者1并没有消费到任何消息

(2)结论

同一个消费组,给不同的消费者设置不同的tag时,后启动的消费者会覆盖先启动的消费者设置的tag。

  • tag是消息过滤的条件,经过服务端和客户端两层过滤,最后只有后启动的消费者才能收到部分消息。
(3)原因总结
  • 同一个consumer group的订阅关系,保存在RebalanceImpl类的Map中。key为topic;

    再看SubscriptionData中,有一个tagsSet用来表示当前topic对应消费组的包含的Tag信息。

  • 不同的消费者启动后,依次注册订阅关系,因为tag不一样,导致Map中同一topic的tag被覆盖。比如:消费者1订阅tag1,消费者2订阅tag2。最后map中只保存tag2;

  • 过滤的核心是是tag,tag被更新,过滤条件被改变。服务端过滤后只返回tag2的消息;

  • 客户端接收消息后,再次过滤。先启动的消费者1订阅tagA,但是服务端返回tag2,所以消费者1收不到任何消息。消费者2能收到一半的消息(集群模式,假设消息平均分配,另外一半分给tag2)

2、使用SQL过滤

Tag过滤语法简单,但是其灵活性也比较差,相对比较适合过滤场景简单 且 客户端对计算资源不是很敏感的用户。如果想实现更为复杂的消息过滤功能可以使用Sql Filter。

注意:只有使用push模式的消费者才能用使用SQL92标准的sql语句。

1)基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE
2)实现方式

Producer代码:

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("saint-study");
    // 设置nameserver地址
    producer.setNamesrvAddr("10.10.100.100:9876");
    producer.start();

    // topic 和body
    List messages = new ArrayList<>();
    for (int i = 10; i < 30; i ++) {
        Message msg = new Message("saint-study-topic", "TAG-B", "key02", ("study000" + i).getBytes(StandardCharsets.UTF_8));
        msg.putUserProperty("age", String.valueOf(i));
        messages.add(msg);
    }

    SendResult send = producer.send(messages);
    System.out.println("sendResult: " + send);

    // 关闭生产者
    producer.shutdown();
    System.out.println("已经停机");
}

Consumer代码:

public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer");
    consumer.setNamesrvAddr("10.10.100.100:9876");

    // 给consumer关联SQL过滤规则
    MessageSelector messageSelector = MessageSelector.bySql("age >= 18 and age <= 28");
    consumer.subscribe("saint-study-topic", messageSelector);
    consumer.setMessageModel(MessageModel.CLUSTERING);

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    consumer.start();
    System.out.println("Consumer start。。。。。。");

}
三、原理剖析 1、TAG过滤原理 1)服务端过滤

留个坑,等研究完Broker端消息的存储再补充。

2)客户端过滤

客户端过滤的代码实现体现在PullAPIWrapper#processPullResult()方法中,这个我们在《Rocket源码分析pullMessage:Consumer是如何从broker拉取消息的?》一文中有详细介绍Consumer拉取消息的流程。

如果内存中主题订阅信息中tagsSet不为空,并且Consumer订阅时不是ClassFilter会执行Tag过滤;Tag过滤就是单纯的字符串值的比对。

为什么服务端做了Tag过滤,客户端还要做呢?

  • 服务端采用Hash的方式存储Tag信息,所以必然会存在hash冲突的情况,导致过滤存在不准确性,所以客户端需要进行精确过滤。
  • 客户度通过tag的字符串值做对比,不相等的tag消息不返回给消费者。
2、TAG过滤原理

存服务端过滤。

留个坑,等研究完Broker端消息的存储再补充。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4658626.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存