RocketMq拉取模式消费者模块

RocketMq拉取模式消费者模块,第1张

RocketMq拉取模式消费者模块

一、背景
推送模式如果消费者处理慢,大量消息会导致消费者宕机,因此采用消费者实时拉取。要么自己写while(true),要么用如下
二、代码如下(此为多线程 *** 作)

@Component
public class MyConsumer {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private RocketMQConfig rocketMQConfig;
   

    @PostConstruct
    public void DefaultMQPushConsumer() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup");
            consumer.setNamesrvAddr(rocketMQConfig.getNamesrvAddr());
            consumer.setInstanceName("Consumer");
            consumer.subscribe("mytopic", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                    logger.info("---MyConsumer  接收到的消息---" + msgs.toString());
                    for (MessageExt message : msgs) {
                        String msg = new String(message.getBody());
                        logger.info("MyConsumer  consumeMessage:" + (new String(message.getBody())));
                        Product product = JSON.parseObject(new String(message.getBody()), Product.class);
                        logger.info("MyConsumer  product:" + product);
                        try {
                            //业务代码
                        } catch (Exception e) {
                            logger.info("---MyConsumer 异常,异常信息为:{}---", e.getMessage());
                            e.printStackTrace();
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            logger.info("MyConsumer  clientIP:" + consumer.getClientIP());
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        logger.info(" MyConsumer  Started. ");
    }
}

三、知识补充
1、@PostConstruct注解:(摘取自·传送门)
@PostConstruct是Java自己的注解。
Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init()方法之前执行。

通常我们会是在Spring框架中使用到@PostConstruct注解 该注解的方法在整个Bean初始化中的执行顺序:

Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
2、MessageListener (2、3、4、5皆参考自·传送门)
MessageListenerConcurrently 并发

ConsumeConcurrentlyStatus.CONSUME_SUCCESS:成功消费
ConsumeConcurrentlyStatus.RECONSUME_LATER:告诉使用者现在不使用它,以后需要重新使用它。然后可以继续使用其他消息

MessageListenerOrderly 线性

ConsumeOrderlyStatus.SUCCESS 消费成功
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT:因为关心顺序,所以不能跳过消息,但是可以返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉使用者等待一会儿

3、线程池的大小

消费端使用 ThreadPoolExecutor 在内部处理消费,因此可以通过设置 setConsumeThreadMin 或 setConsumeThreadMax 来更改它

4、ConsumeFromWhere
当一个新的Consumer Group建立时,它将需要决定是否需要使用Broker中已经存在的历史消息。同样通过set设置。

CONSUME_FROM_LAST_OFFSET:将忽略历史消息,并使用此后生成的任何消息。
CONSUME_FROM_FIRST_OFFSET:将消费 Broker 中存在的所有消息。
CONSUME_FROM_TIMESTAMP:来消费指定时间戳之后生成的消息

5、消息重复 Duplication
消息重复的原因

Producer 重新发送,如 FLUSH_SLAVE_TIMEOUT 情况
Consumer 消费了一些消息,这些消息消费的 offsets 没有来得及更新到Broker,这时 Consumer 挂了
因此,如果应用程序不能容忍重复,那么可能需要进行一些外部工作来处理这个问题。例如,可以检查数据库的主键、使用幂等 *** 作

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5687927.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存