rocketmq顺序消费问题

rocketmq顺序消费问题,第1张

rocketmq顺序消费问题 场景举例

用户下单,付款,发货,完成,每一个步骤发送完成,发送mq消息到下游系统处理(消费客户端),同一个订单要保证顺序消费。

mq实现思路

选择mq顺序消息,实现思路,

生产者:发送消息的时候保证同一个订单的业务消息,发送到同一个队列;订单号对队列总和取模,找到指定队列

消费者:每一个消费线程可以保证只消费一个队列的消息,怎么保证的...可进一步探究

代码 生产者
@Slf4j
public class OrderedProducer {

    private static List buildMsg() {
	List orders = new ArrayList<>();
	Order order1 = new Order();
	order1.setOrderNo("1");
	order1.setNote("下单");
	order1.setCustomer("麦迪");

	Order order7 = new Order();
	order7.setOrderNo("3");
	order7.setNote("下单");
	order7.setCustomer("库里");

	Order order2 = new Order();
	order2.setOrderNo("1");
	order2.setNote("付款");
	order2.setCustomer("麦迪");
	Order order3 = new Order();
	order3.setOrderNo("1");
	order3.setNote("通知");
	order3.setCustomer("麦迪");

	Order order4 = new Order();
	order4.setOrderNo("2");
	order4.setNote("下单");
	order4.setCustomer("科比");
	Order order5 = new Order();
	order5.setOrderNo("2");
	order5.setNote("付款");
	order5.setCustomer("科比");

	Order order8 = new Order();
	order8.setOrderNo("3");
	order8.setNote("付款");
	order8.setCustomer("库里");

	Order order6 = new Order();
	order6.setOrderNo("2");
	order6.setNote("通知");
	order6.setCustomer("科比");

	Order order9 = new Order();
	order9.setOrderNo("3");
	order9.setNote("通知");
	order9.setCustomer("库里");

	orders.add(order1);
	orders.add(order7);
	orders.add(order2);
	orders.add(order3);
	orders.add(order4);
	orders.add(order5);
	orders.add(order8);
	orders.add(order6);
	orders.add(order9);
	return orders;
    }

    public static void main(String[] args) throws Exception {
	DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
	producer.setNamesrvAddr("47.103.143.191:8201;47.103.143.191:8202");
	producer.start();
	List orders = buildMsg();
	orders.forEach(x -> {
	    Message msg = new Message("TopicOrder", "TagOrder4", x.getOrderNo(), JSON.toJSonBytes(x));
	    try {
		SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
		    @Override
		    public MessageQueue select(List queues, Message message, Object arg) {
			Integer orderNo = Integer.valueOf((String) arg);
			int index = orderNo % queues.size();
			//			log.info("order: {}, queueTotal: {}, index: {}, queue: {}", JSON.toJSonString(x), queues.size(),
			//					index, queues.get(index));
			log.info("客户: {}, 发送到的队列: {}", x.getCustomer(), index);
			return queues.get(index);
		    }
		}, x.getOrderNo());
		log.info("sendResult: {}", JSON.toJSonString(sendResult));
	    } catch (MQClientException e) {
		e.printStackTrace();
	    } catch (RemotingException e) {
		e.printStackTrace();
	    } catch (MQBrokerException e) {
		e.printStackTrace();
	    } catch (InterruptedException e) {
		e.printStackTrace();
	    }

	});
	producer.shutdown();
    }

}
消费者
@Slf4j
public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
	consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
	consumer.setNamesrvAddr("47.103.143.191:8201;47.103.143.191:8202");
	consumer.subscribe("TopicOrder", "*");
	consumer.registerMessageListener(new MessageListenerOrderly() {

	    @SneakyThrows
	    @Override
	    public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
		context.setAutoCommit(false);
		log.info("线程:{}, 队列:{}, 消费消息:{}", Thread.currentThread().getName(), msgs.get(0).getQueueId(),
				new String(msgs.get(0).getBody(), "UTF-8"));
		return ConsumeOrderlyStatus.SUCCESS;
	    }
	});

	consumer.start();

	System.out.printf("Consumer Started.%n");
    }
}
结果验证

生产者发送日志,可以看出,同一个人的业务发送到了同一个队列,总共9条消息

 消费者

当生产者第一次发送9条消息时,看下图红字框中,没问题,3个线程,分别消费3个队列中的消息,但是当我再次发送消息时,消费者消费就乱了,每一个消息都由一个线程消费,不断的新建线程,4-13...再发送消息,继续创建新的线程,直到总量20,再从线程1开始消费

如果从新启动消费者,就是正常的3个线程123消费3个队列,如果不从新启动消费者,生产者不断发送,消费端就无法按顺序正常消费,而是不断创建新线程消费

疑问

1.消费者线程为什么是20个

2.消费者为什么创建新线程只消费一个,不是一个线程,消费一个队列

3.消费者多线程在哪里实现,真实的消费端多线程消费场景就是这样吗,不用管,自动会有多个线程消费,还是怎么实现多个消费线程消费

有知晓的同志请解答,谢谢

思考

队列与broker

每个broker默认有4个队列,双主双从,就是4*4 16个队列,所以稍微需要考虑一个消息均衡的发送到指定队列

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5479524.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存