RocketMQ实战应用

RocketMQ实战应用,第1张

RocketMQ实战应用

目录
  • 一、普通消息:
    • 1.消息发送分类:
      • (1)同步发送消息:
      • (2)异步发送消息:
      • (3)单向发送消息:
    • 2.普通消息代码举例:
      • 定义生产者:
      • (1)同步发消息生产者:Sync
      • (2)异步消息发送生产者:Async
      • (3)单向消息发送生产者:
      • 定义消费者:
  • 二、顺序消息:
  • 三、延迟消息:
  • 四、事务消息:

一、普通消息: 1.消息发送分类:

(1)同步发送消息:

(2)异步发送消息:

(3)单向发送消息:

2.普通消息代码举例:

(1)创建maven工程: rocketmq-test

(2)导入依赖:

   
        
        
            org.apache.rocketmq
            rocketmq-client
            4.9.2
        
    

(3)准备工作:

然后单机启动rocketmq:
单机启动:

cd /usr/src/software/rocketmq-4.9.2
启动:nohup sh bin/mqnamesrv &
查看日志:tail -f ~/logs/rocketmqlogs/namesrv.log

cd /usr/src/software/rocketmq-4.9.2
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

关闭防火墙:systemctl stop firewalld

然后启动控制台:http://localhost:6789/#/

(4)代码:

定义生产者: (1)同步发消息生产者:Sync

com.fan.general.SyncProducer :

package com.fan.general;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
//同步发行消息
public class SyncProducer {
    public static void main(String[] args) throws Exception{
        //创建一个生产者producer,参数为创建一个生产者producer Group 名称
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        //指定nameserver地址
        producer.setNamesrvAddr("rocketmqOS:9876");
        //设置当前发送失败后的重试次数,默认不设置是2次
        producer.setRetryTimesWhenSendFailed(3);
        //设置发送超时实现为5秒,默认是3秒
        producer.setSendMsgTimeout(5000);
        //开启生产者
        producer.start();
        //生产并发送100条消息
        for (int i = 0; i < 100; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("someTopic", "sometag", body);
            //为消息指定key
            msg.setKeys("key-" + i );
            //发送消息
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        //关闭producer
        producer.shutdown();
    }
}

涉及的类:


测试:

在linux主机上查看:

查看发送来的消息:

(2)异步消息发送生产者:Async
package com.fan.general;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducer {
    public static void main(String[] args)throws  Exception {
        //创建一个生产者
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        //指定服务器地址
        producer.setNamesrvAddr("rocketmqOS:9876");
        //指定异步发送失败后,不进行重试发送,也可改成2
        producer.setRetryTimesWhenSendAsyncFailed(0);
        //指定新创建的topic的queue数量为2,默认为4
        producer.setDefaultTopicQueueNums(2);
        //启动生产者
        producer.start();
        //发送100条消息
        for (int i = 0; i < 100; i++) {
            //创建一个消息体
            byte[] body = ("hi," + i).getBytes();
            Message message = new Message("myTopicA", "myTagA", body);
            //异步发送需要有callback回调方法
            producer.send(message, new SendCallback() {
                //当producer接收到mq发送来的ack后就会触发该回调方法的执行
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });

        }//end-for
        //sleep一会了,由于是异步发送,这里如果不sleep,则消息还未发送就会将producer给关闭了
        Thread.sleep(3000);
        producer.shutdown();
    }
}

运行测试:

控制台看看消息:

(3)单向消息发送生产者:
package com.fan.general;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class onewayProducer {
    public static void main(String[] args)throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        //启动生产者
        producer.start();
        for (int i = 0; i < 100; i++) {
            byte[] msgBody = ("hi," + i).getBytes();
            Message message = new Message("single", "singleTag", msgBody);
            //单向发送,没有消息的返回值,所以也不用打印
            producer.sendoneway(message);
        }
        producer.shutdown();
        System.out.println("producer 关闭");
    }
}

定义消费者:

SomeConsumer:

package com.fan.general;


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class SomeConsumer {
    public static void main(String[] args)throws MQClientException {
        //定义一个pull消费者
        //DefaultLitePullConsumer consumer1 = new DefaultLitePullConsumer("cg");
        //定义一个push 消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("rocketmqOS:9876");
        //从哪里开始消费,指定从第一条开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //指定消费topic与tag
        consumer.subscribe("someTopic","*");
        //指定费用广播模式  进行消费,默认为集群模式的
        //consumer.setMessageModel(MessageModel.BROADCASTING);
        //注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                //逐条消费消息
                for (MessageExt msg : msgs ) {
                    System.out.println(msg);
                }
                //返回消费状态:消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //开启消费者进行消费
        consumer.start();
        System.out.println("消费者开始了----");
    }
}

lite:简化的

运行测试:网页反应比较慢

二、顺序消息:



代码实现:

package com.fan.general;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.List;

public class OrderedProducer {
    public static void main(String[] args)throws Exception {
        //创建一个生产者
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        //若为全局有序,需要设置queue数量为1
       //producer.setDefaultTopicQueueNums(1);
        producer.start();
        for (int i = 0; i < 100; i++) {
            Integer orderId = i;
            byte[] body = ("hi," + i).getBytes();
            Message msg = new Message("TopicA", "TagA", body);
            //将orderid作为消息key
            msg.setKeys(orderId.toString());
            //send()的第三个参数会传递给选择器的select()的第三个参数 ,该send为同步发送
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                //具体的选择算法在该方法中定义
                @Override
                public MessageQueue select(List mqs, Message msg, Object arg) {
                    //以下是使用消息key作为选择的选择算法
                    String keys = msg.getKeys();
                    Integer id = Integer.valueOf(keys);
                    //以下是使用arg作为选择key的选择算法
                    //Integer id = (Integer)arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            },orderId);
            System.out.println(sendResult);
        }
        producer.shutdown();
    }
}

运行测试:启动mq,记得关闭防火墙:systemctl stop firewalld

三、延迟消息:

具体步骤:



生产者:

package com.fan.delay;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.text.SimpleDateFormat;
import java.util.Date;
//消息的生产者
public class DelayProducer {
    public static void main(String[] args)throws Exception {
        //1.创造生产者
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        //2.给生产者设置名称服务器
        producer.setNamesrvAddr("rocketmqOS:9876");
        //3.开启生产者
        producer.start();
        for (int i = 0; i < 1; i++) {
            byte[] body = ("hi," + i).getBytes();
            //4.循环生产消息
            Message message = new Message("TopicB", "TagB", body);
            //指定消息的延时等级为3级,即延迟10秒
            message.setDelayTimeLevel(3);
            SendResult sendResult = producer.send(message);
            //输出消息被发送的时间
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date() ));
            System.out.println(","+sendResult);
        }
        //5.生产完消息后关闭生产者
        producer.shutdown();
    }
}

消费者:

package com.fan.delay;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
//消息的消费者
public class OtherConsumer {
    public static void main(String[] args)throws Exception {
        //0.创造消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        //1.给消费者设置名称服务器
        consumer.setNamesrvAddr("rocketmqOS:9876");
        //2.订阅的主题和子表达式是什么
        consumer.subscribe("TopicB","*"); //subscribe订阅
        //3.设置从哪里开始消费消息
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //4.注册消息监听器,参数是并发消息监听器,Concurrently并发的意思
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) { //逐条消费消息
                    //输出消息的消费时间
                    System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date()));
                    System.out.println(","+msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.开启消费者
        consumer.start();
        System.out.println("消费者已开启消费---");
    }
}

rocketmq消费者注册监听有两种模式,有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently,这两种模式返回值不同。

MessageListenerOrderly正确消费返回ConsumeOrderlyStatus.SUCCESS,

稍后消费返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

MessageListenerConcurrently正确消费返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS

稍后消费返回ConsumeConcurrentlyStatus.RECONSUME_LATER

运行测试:先启动消费者,后启动生产者:

生产者39秒开始生产消息,消费者49秒的时候开始消费消息:

四、事务消息:









代码演示:





运行测试:

定义消费者:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5669944.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存