- 一、普通消息:
- 1.消息发送分类:
- (1)同步发送消息:
- (2)异步发送消息:
- (3)单向发送消息:
- 2.普通消息代码举例:
- 定义生产者:
- (1)同步发消息生产者:Sync
- (2)异步消息发送生产者:Async
- (3)单向消息发送生产者:
- 定义消费者:
- 二、顺序消息:
- 三、延迟消息:
- 四、事务消息:
(1)创建maven工程: rocketmq-test
(2)导入依赖:
org.apache.rocketmq rocketmq-client4.9.2
(3)准备工作:
然后单机启动rocketmq:
单机启动:
cd /usr/src/software/rocketmq-4.9.2
启动:nohup sh bin/mqnamesrv &
查看日志:tail -f ~/logs/rocketmqlogs/namesrv.log
cd /usr/src/software/rocketmq-4.9.2
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
关闭防火墙:systemctl stop firewalld
然后启动控制台:http://localhost:6789/#/
(4)代码:
定义生产者: (1)同步发消息生产者:Synccom.fan.general.SyncProducer :
package com.fan.general; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; //同步发行消息 public class SyncProducer { public static void main(String[] args) throws Exception{ //创建一个生产者producer,参数为创建一个生产者producer Group 名称 DefaultMQProducer producer = new DefaultMQProducer("pg"); //指定nameserver地址 producer.setNamesrvAddr("rocketmqOS:9876"); //设置当前发送失败后的重试次数,默认不设置是2次 producer.setRetryTimesWhenSendFailed(3); //设置发送超时实现为5秒,默认是3秒 producer.setSendMsgTimeout(5000); //开启生产者 producer.start(); //生产并发送100条消息 for (int i = 0; i < 100; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("someTopic", "sometag", body); //为消息指定key msg.setKeys("key-" + i ); //发送消息 SendResult sendResult = producer.send(msg); System.out.println(sendResult); } //关闭producer producer.shutdown(); } }
涉及的类:
测试:
在linux主机上查看:
查看发送来的消息:
(2)异步消息发送生产者:Asyncpackage com.fan.general; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class AsyncProducer { public static void main(String[] args)throws Exception { //创建一个生产者 DefaultMQProducer producer = new DefaultMQProducer("pg"); //指定服务器地址 producer.setNamesrvAddr("rocketmqOS:9876"); //指定异步发送失败后,不进行重试发送,也可改成2 producer.setRetryTimesWhenSendAsyncFailed(0); //指定新创建的topic的queue数量为2,默认为4 producer.setDefaultTopicQueueNums(2); //启动生产者 producer.start(); //发送100条消息 for (int i = 0; i < 100; i++) { //创建一个消息体 byte[] body = ("hi," + i).getBytes(); Message message = new Message("myTopicA", "myTagA", body); //异步发送需要有callback回调方法 producer.send(message, new SendCallback() { //当producer接收到mq发送来的ack后就会触发该回调方法的执行 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } }); }//end-for //sleep一会了,由于是异步发送,这里如果不sleep,则消息还未发送就会将producer给关闭了 Thread.sleep(3000); producer.shutdown(); } }
运行测试:
控制台看看消息:
(3)单向消息发送生产者:package com.fan.general; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class onewayProducer { public static void main(String[] args)throws Exception { DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("rocketmqOS:9876"); //启动生产者 producer.start(); for (int i = 0; i < 100; i++) { byte[] msgBody = ("hi," + i).getBytes(); Message message = new Message("single", "singleTag", msgBody); //单向发送,没有消息的返回值,所以也不用打印 producer.sendoneway(message); } producer.shutdown(); System.out.println("producer 关闭"); } }定义消费者:
SomeConsumer:
package com.fan.general; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class SomeConsumer { public static void main(String[] args)throws MQClientException { //定义一个pull消费者 //DefaultLitePullConsumer consumer1 = new DefaultLitePullConsumer("cg"); //定义一个push 消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); consumer.setNamesrvAddr("rocketmqOS:9876"); //从哪里开始消费,指定从第一条开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //指定消费topic与tag consumer.subscribe("someTopic","*"); //指定费用广播模式 进行消费,默认为集群模式的 //consumer.setMessageModel(MessageModel.BROADCASTING); //注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { //逐条消费消息 for (MessageExt msg : msgs ) { System.out.println(msg); } //返回消费状态:消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //开启消费者进行消费 consumer.start(); System.out.println("消费者开始了----"); } }
lite:简化的
运行测试:网页反应比较慢
二、顺序消息:
代码实现:
package com.fan.general; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import java.util.List; public class OrderedProducer { public static void main(String[] args)throws Exception { //创建一个生产者 DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("rocketmqOS:9876"); //若为全局有序,需要设置queue数量为1 //producer.setDefaultTopicQueueNums(1); producer.start(); for (int i = 0; i < 100; i++) { Integer orderId = i; byte[] body = ("hi," + i).getBytes(); Message msg = new Message("TopicA", "TagA", body); //将orderid作为消息key msg.setKeys(orderId.toString()); //send()的第三个参数会传递给选择器的select()的第三个参数 ,该send为同步发送 SendResult sendResult = producer.send(msg, new MessageQueueSelector() { //具体的选择算法在该方法中定义 @Override public MessageQueue select(Listmqs, Message msg, Object arg) { //以下是使用消息key作为选择的选择算法 String keys = msg.getKeys(); Integer id = Integer.valueOf(keys); //以下是使用arg作为选择key的选择算法 //Integer id = (Integer)arg; int index = id % mqs.size(); return mqs.get(index); } },orderId); System.out.println(sendResult); } producer.shutdown(); } }
运行测试:启动mq,记得关闭防火墙:systemctl stop firewalld
三、延迟消息:具体步骤:
生产者:
package com.fan.delay; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import java.text.SimpleDateFormat; import java.util.Date; //消息的生产者 public class DelayProducer { public static void main(String[] args)throws Exception { //1.创造生产者 DefaultMQProducer producer = new DefaultMQProducer("pg"); //2.给生产者设置名称服务器 producer.setNamesrvAddr("rocketmqOS:9876"); //3.开启生产者 producer.start(); for (int i = 0; i < 1; i++) { byte[] body = ("hi," + i).getBytes(); //4.循环生产消息 Message message = new Message("TopicB", "TagB", body); //指定消息的延时等级为3级,即延迟10秒 message.setDelayTimeLevel(3); SendResult sendResult = producer.send(message); //输出消息被发送的时间 System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date() )); System.out.println(","+sendResult); } //5.生产完消息后关闭生产者 producer.shutdown(); } }
消费者:
package com.fan.delay; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; //消息的消费者 public class OtherConsumer { public static void main(String[] args)throws Exception { //0.创造消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); //1.给消费者设置名称服务器 consumer.setNamesrvAddr("rocketmqOS:9876"); //2.订阅的主题和子表达式是什么 consumer.subscribe("TopicB","*"); //subscribe订阅 //3.设置从哪里开始消费消息 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //4.注册消息监听器,参数是并发消息监听器,Concurrently并发的意思 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { //逐条消费消息 //输出消息的消费时间 System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date())); System.out.println(","+msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5.开启消费者 consumer.start(); System.out.println("消费者已开启消费---"); } }
rocketmq消费者注册监听有两种模式,有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同。
MessageListenerOrderly正确消费返回ConsumeOrderlyStatus.SUCCESS,
稍后消费返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
MessageListenerConcurrently正确消费返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
稍后消费返回ConsumeConcurrentlyStatus.RECONSUME_LATER
运行测试:先启动消费者,后启动生产者:
生产者39秒开始生产消息,消费者49秒的时候开始消费消息:
代码演示:
运行测试:
定义消费者:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)