RocketMQ 使用
- 简单的使用
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.nio.charset.Charset; public class BasicProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { // 创建一个Producer对象 DefaultMQProducer producer = new DefaultMQProducer("basic_producer1"); //设置producer建立连接的nameserv地址 producer.setNamesrvAddr("127.0.0.1:9876"); // 启动producer对象 producer.start(); // 发送消息 // a. 准备消息 Message message = new Message(); String topic = "test_mq"; message.setTopic(topic); // 向详细中放入数据 String data = "hello, rocket test send"; byte[] bytes = data.getBytes(Charset.forName("utf-8")); message.setBody(bytes); // b. 发送消息 SendResult send = producer.send(message); System.out.println("发送状态: " + send.getSendStatus() + ", msgId = " + send.getMsgId()); } }
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.nio.charset.Charset; import java.util.List; public class BasicConsumer { public static void main(String[] args) throws MQClientException { // 定义Consumer对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("basic_consumer1"); // 设置所要连接的nameserv地址 consumer.setNamesrvAddr("127.0.0.1:9876"); // 订阅Topic consumer.subscribe("test_mq", "*"); // 设置消费消息的监听器 consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { // 实现消息的消费逻辑 try { // 1. 获取消息中的数据 MessageExt message = msgs.get(0); byte[] body = message.getBody(); // 解析接收到的字节数据 String s = new String(body, 0, body.length, Charset.forName("utf-8")); // 业务逻辑: 输出接收到的字符串 System.out.println("收到消息: msgId为" + message.getMsgId() + ":" + s); } catch (Exception e) { e.printStackTrace(); // 消费失败 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } }
延迟消息:
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; public class DelayProducer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("delay_producer"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); String data = "test delay message"; Message message = new Message(); message.setTopic("test_delay"); message.setBody(data.getBytes("utf-8")); // 延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h // 级别 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 message.setDelayTimeLevel(2); message.putUserProperty("startTime", System.currentTimeMillis() + ""); SendResult send = producer.send(message); System.out.println("发送了: " + send.getMsgId() + ", 发送结果" + send.getSendStatus()); } }
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.io.UnsupportedEncodingException; import java.util.List; public class DelayConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.subscribe("test_delay", "*"); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { try { MessageExt message = msgs.get(0); String startTime = message.getUserProperty("startTime"); long sendTime = Long.parseLong(startTime); System.out.println("message receive time span: " + (System.currentTimeMillis() - sendTime)); byte[] body = message.getBody(); String s = new String(body, 0, body.length, "utf-8"); //System.out.println("msgId: " + message.getMsgId() + ": " + s); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
- 使用文章参考
https://www.cnblogs.com/fangyuan303687320/p/5495481.html
https://www.cnblogs.com/SimpleWu/p/12112351.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)