RocketMQ

RocketMQ,第1张

RocketMQ

RocketMQ 使用

    简单的使用
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.Charset;

public class BasicProducer {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        // 创建一个Producer对象
        DefaultMQProducer producer = new DefaultMQProducer("basic_producer1");

        //设置producer建立连接的nameserv地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 启动producer对象
        producer.start();

        // 发送消息
        // a. 准备消息
        Message message = new Message();
        String topic = "test_mq";
        message.setTopic(topic);

        // 向详细中放入数据
        String data = "hello, rocket test send";
        byte[] bytes = data.getBytes(Charset.forName("utf-8"));
        message.setBody(bytes);

        // b. 发送消息
        SendResult send = producer.send(message);
        System.out.println("发送状态: " + send.getSendStatus()
                + ", msgId = " + send.getMsgId());


    }
}

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.nio.charset.Charset;
import java.util.List;

public class BasicConsumer {

    public static void main(String[] args) throws MQClientException {
        // 定义Consumer对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("basic_consumer1");

        // 设置所要连接的nameserv地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 订阅Topic
        consumer.subscribe("test_mq", "*");

        // 设置消费消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                // 实现消息的消费逻辑

                try {
                    // 1. 获取消息中的数据
                    MessageExt message = msgs.get(0);

                    byte[] body = message.getBody();
                    // 解析接收到的字节数据
                    String s = new String(body, 0, body.length, Charset.forName("utf-8"));

                    // 业务逻辑: 输出接收到的字符串
                    System.out.println("收到消息: msgId为" + message.getMsgId() + ":" + s);
                } catch (Exception e) {
                    e.printStackTrace();
                    // 消费失败
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();
    }
}

延迟消息:

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;


public class DelayProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {

        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");

        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.start();

        String data = "test delay message";
        Message message = new Message();
        message.setTopic("test_delay");
        message.setBody(data.getBytes("utf-8"));
        // 延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m  5m  6m   7m  8m  9m  10m  20m  30m  1h 2h
        //  级别      1  2   3  4   5  6   7  8  9  10    11 12  13  14   15   16   17 18
        message.setDelayTimeLevel(2);

        message.putUserProperty("startTime", System.currentTimeMillis() + "");
        SendResult send = producer.send(message);
        System.out.println("发送了: " + send.getMsgId() + ", 发送结果" + send.getSendStatus());


    }
}

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class DelayConsumer {

    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.subscribe("test_delay", "*");

        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                try {
                    MessageExt message = msgs.get(0);
                    String startTime = message.getUserProperty("startTime");
                    long sendTime = Long.parseLong(startTime);
                    System.out.println("message receive time span: " + (System.currentTimeMillis() - sendTime));

                    byte[] body = message.getBody();
                    String s = new String(body, 0, body.length, "utf-8");
                    //System.out.println("msgId: " + message.getMsgId() + ": " + s);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

    使用文章参考
    https://www.cnblogs.com/fangyuan303687320/p/5495481.html
    https://www.cnblogs.com/SimpleWu/p/12112351.html

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715640.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存