// 创建连接 public void connect() { try { // 消费者组 consumer = new DefaultMQPushConsumer("consumerGroup-demo"); consumer.setNamesrvAddr("127.0.0.1"); // 消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topic-demo", "*"); } catch (MQClientException e) { logger.error("RocketConsumerDemo:connect()->" + e.getMessage(), e); } }有序消费
// 有序消费MessageListenerOrderly consumer.registerMessageListener((MessageListenerOrderly) (msgs, content) -> { // msgs中只收集同一个topic,同一个tag,并且key相同的message try { for (MessageExt msg : msgs) { String body = new String(msg.getBody(), "utf-8"); //消费者获取消息 这里只输出 不做后面逻辑处理 System.out.println(body); } } catch (UnsupportedEncodingException e) { logger.error("RocketConsumerDemo:消费()->" + e.getMessage(), e); // 稍后消费 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; });并发消费
// 并发消费MessageListenerConcurrently consumer.registerMessageListener((MessageListenerConcurrently) (msgs, content) -{ // msgs中只收集同一个topic,同一个tag,并且key相同的message try { for (MessageExt msg : msgs) { String body = new String(msg.getBody(), "utf-8"); //消费者获取消息 这里只输出 不做后面逻辑处理 System.out.println(body); } } catch (UnsupportedEncodingException e) { logger.error("RocketConsumerDemo:消费()->" + e.getMessage(), e); // 稍后消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)