1.集群模式,多个消费者通过负载均衡一起消费信息
2.广播模式,一个消息会复制成多份分发给每一个消费者
消费者不同组时
一个消息会复制多份发给不同的消费者组
public class SyncProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name DefaultMQProducer producer = new DefaultMQProducer("producer_group_01"); //Specify name server addresses producer.setNamesrvAddr("localhost:9876"); //Launch the instance producer.start(); for(int i = 0; i < 10; i++){ //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); //Shut down once the producer instance is not longer in use. } producer.shutdown(); } }
public class Consumer { public static void main(String[] args) throws Exception{ //Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_01"); //Specify name server addresses. consumer.setNamesrvAddr("localhost:9876"); //Subscribe one more more topics to consume. consumer.subscribe("TopicTest", "*"); consumer.setMessageModel(MessageModel.BROADCASTING); //Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listlist, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list); for (MessageExt messageExt : list) { System.out.println("收到消息" + new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started. %n"); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)