用户下单,付款,发货,完成,每一个步骤发送完成,发送mq消息到下游系统处理(消费客户端),同一个订单要保证顺序消费。
mq实现思路选择mq顺序消息,实现思路,
生产者:发送消息的时候保证同一个订单的业务消息,发送到同一个队列;订单号对队列总和取模,找到指定队列
消费者:每一个消费线程可以保证只消费一个队列的消息,怎么保证的...可进一步探究
代码 生产者@Slf4j public class OrderedProducer { private static List消费者buildMsg() { List orders = new ArrayList<>(); Order order1 = new Order(); order1.setOrderNo("1"); order1.setNote("下单"); order1.setCustomer("麦迪"); Order order7 = new Order(); order7.setOrderNo("3"); order7.setNote("下单"); order7.setCustomer("库里"); Order order2 = new Order(); order2.setOrderNo("1"); order2.setNote("付款"); order2.setCustomer("麦迪"); Order order3 = new Order(); order3.setOrderNo("1"); order3.setNote("通知"); order3.setCustomer("麦迪"); Order order4 = new Order(); order4.setOrderNo("2"); order4.setNote("下单"); order4.setCustomer("科比"); Order order5 = new Order(); order5.setOrderNo("2"); order5.setNote("付款"); order5.setCustomer("科比"); Order order8 = new Order(); order8.setOrderNo("3"); order8.setNote("付款"); order8.setCustomer("库里"); Order order6 = new Order(); order6.setOrderNo("2"); order6.setNote("通知"); order6.setCustomer("科比"); Order order9 = new Order(); order9.setOrderNo("3"); order9.setNote("通知"); order9.setCustomer("库里"); orders.add(order1); orders.add(order7); orders.add(order2); orders.add(order3); orders.add(order4); orders.add(order5); orders.add(order8); orders.add(order6); orders.add(order9); return orders; } public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("example_group_name"); producer.setNamesrvAddr("47.103.143.191:8201;47.103.143.191:8202"); producer.start(); List orders = buildMsg(); orders.forEach(x -> { Message msg = new Message("TopicOrder", "TagOrder4", x.getOrderNo(), JSON.toJSonBytes(x)); try { SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List queues, Message message, Object arg) { Integer orderNo = Integer.valueOf((String) arg); int index = orderNo % queues.size(); // log.info("order: {}, queueTotal: {}, index: {}, queue: {}", JSON.toJSonString(x), queues.size(), // index, queues.get(index)); log.info("客户: {}, 发送到的队列: {}", x.getCustomer(), index); return queues.get(index); } }, x.getOrderNo()); log.info("sendResult: {}", JSON.toJSonString(sendResult)); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }); producer.shutdown(); } }
@Slf4j public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("47.103.143.191:8201;47.103.143.191:8202"); consumer.subscribe("TopicOrder", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @SneakyThrows @Override public ConsumeOrderlyStatus consumeMessage(List结果验证msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); log.info("线程:{}, 队列:{}, 消费消息:{}", Thread.currentThread().getName(), msgs.get(0).getQueueId(), new String(msgs.get(0).getBody(), "UTF-8")); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
生产者发送日志,可以看出,同一个人的业务发送到了同一个队列,总共9条消息
消费者
当生产者第一次发送9条消息时,看下图红字框中,没问题,3个线程,分别消费3个队列中的消息,但是当我再次发送消息时,消费者消费就乱了,每一个消息都由一个线程消费,不断的新建线程,4-13...再发送消息,继续创建新的线程,直到总量20,再从线程1开始消费
如果从新启动消费者,就是正常的3个线程123消费3个队列,如果不从新启动消费者,生产者不断发送,消费端就无法按顺序正常消费,而是不断创建新线程消费
疑问1.消费者线程为什么是20个
2.消费者为什么创建新线程只消费一个,不是一个线程,消费一个队列
3.消费者多线程在哪里实现,真实的消费端多线程消费场景就是这样吗,不用管,自动会有多个线程消费,还是怎么实现多个消费线程消费
有知晓的同志请解答,谢谢
思考队列与broker
每个broker默认有4个队列,双主双从,就是4*4 16个队列,所以稍微需要考虑一个消息均衡的发送到指定队列
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)