一、背景
推送模式如果消费者处理慢,大量消息会导致消费者宕机,因此采用消费者实时拉取。要么自己写while(true),要么用如下
二、代码如下(此为多线程 *** 作)
@Component public class MyConsumer { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RocketMQConfig rocketMQConfig; @PostConstruct public void DefaultMQPushConsumer() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup"); consumer.setNamesrvAddr(rocketMQConfig.getNamesrvAddr()); consumer.setInstanceName("Consumer"); consumer.subscribe("mytopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { logger.info("---MyConsumer 接收到的消息---" + msgs.toString()); for (MessageExt message : msgs) { String msg = new String(message.getBody()); logger.info("MyConsumer consumeMessage:" + (new String(message.getBody()))); Product product = JSON.parseObject(new String(message.getBody()), Product.class); logger.info("MyConsumer product:" + product); try { //业务代码 } catch (Exception e) { logger.info("---MyConsumer 异常,异常信息为:{}---", e.getMessage()); e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); logger.info("MyConsumer clientIP:" + consumer.getClientIP()); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } logger.info(" MyConsumer Started. "); } }
三、知识补充
1、@PostConstruct注解:(摘取自·传送门)
@PostConstruct是Java自己的注解。
Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init()方法之前执行。
通常我们会是在Spring框架中使用到@PostConstruct注解 该注解的方法在整个Bean初始化中的执行顺序:
Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
2、MessageListener (2、3、4、5皆参考自·传送门)
MessageListenerConcurrently 并发
ConsumeConcurrentlyStatus.CONSUME_SUCCESS:成功消费 ConsumeConcurrentlyStatus.RECONSUME_LATER:告诉使用者现在不使用它,以后需要重新使用它。然后可以继续使用其他消息
MessageListenerOrderly 线性
ConsumeOrderlyStatus.SUCCESS 消费成功 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT:因为关心顺序,所以不能跳过消息,但是可以返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉使用者等待一会儿
3、线程池的大小
消费端使用 ThreadPoolExecutor 在内部处理消费,因此可以通过设置 setConsumeThreadMin 或 setConsumeThreadMax 来更改它
4、ConsumeFromWhere
当一个新的Consumer Group建立时,它将需要决定是否需要使用Broker中已经存在的历史消息。同样通过set设置。
CONSUME_FROM_LAST_OFFSET:将忽略历史消息,并使用此后生成的任何消息。 CONSUME_FROM_FIRST_OFFSET:将消费 Broker 中存在的所有消息。 CONSUME_FROM_TIMESTAMP:来消费指定时间戳之后生成的消息
5、消息重复 Duplication
消息重复的原因
Producer 重新发送,如 FLUSH_SLAVE_TIMEOUT 情况 Consumer 消费了一些消息,这些消息消费的 offsets 没有来得及更新到Broker,这时 Consumer 挂了 因此,如果应用程序不能容忍重复,那么可能需要进行一些外部工作来处理这个问题。例如,可以检查数据库的主键、使用幂等 *** 作
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)