Linux搭建 RocketMQ 2主2从集群

Linux搭建 RocketMQ 2主2从集群,第1张

Linux搭建 RocketMQ 2主2从集群 前置条件

1.配置jdk1.8 环境变量 ,如果没配置的话参考这个博客去配置: https://blog.csdn.net/qq_41489540/article/details/116399710

2.集群批量执行命令的脚本,看这个博客去整一个:

https://blog.csdn.net/qq_41489540/article/details/109094840

为什么让整这个脚本呢?原因是集群集群很多,如果你一个一个去执行命令安装的话,会累死你的.弄了这个脚本之后就可以实现在一台机器敲完命令之后,集群多台机器都能执行到这个命令.

3.xsync集群同步脚本:

https://blog.csdn.net/qq_41489540/article/details/109094046

搭建计划

这次搭建一个2主2从异步刷盘的集群

机器名nameServer节点部署broker节点部署zjj101nameserverzjj102nameserverbroker-a ,broker-b-szjj103nameserverbroker-b,brkoer-a-s

说明,broker-a 意思是amaster节点 , brkoer-a-s的意思是a主的slave节点

关闭集群中的防火墙

我用的 xcall脚本, 你也可以自己登录集群中的每一台机器,自己去执行 systemctl stop firewalld.service 关闭

[root@zjj101 /]# xcall systemctl stop firewalld.service
要执行的命令是systemctl stop firewalld.service
---------------------zjj101-----------------
---------------------zjj102-----------------
---------------------zjj103-----------------
[root@zjj101 /]# xcall firewall-cmd --state
要执行的命令是firewall-cmd --state
---------------------zjj101-----------------
not running
---------------------zjj102-----------------
not running
---------------------zjj103-----------------
not running
[root@zjj101 /]#

上传rocketMQ安装包到zjj101机器上

我直接在 /root/soft/ 目录下放了一个 rocketmq-all-4.7.1-bin-release.rar ,你们自己自行准备.

或者从下面百度云网盘里面下载
链接: https://pan.baidu.com/s/1xhcPh0Dk18THT1s-ArDGkQ 提取码: ua2c 复制这段内容后打开百度网盘手机App, *** 作更方便哦

解压

如果没有安装rar包的,参考: https://zjj1994.blog.csdn.net/article/details/120802578 博客

解压

[root@zjj101 soft]# unrar x rocketmq-all-4.7.1-bin-release.rar
集群分发过去

写相对路径,命令: sh xsync soft/

[root@zjj101 ~]# pwd
/root
[root@zjj101 ~]# ls
anaconda-ks.cfg       apache-tomcat-8.5.28.tar.gz  soft
apache-tomcat-8.5.28  script
[root@zjj101 ~]# sh xsync soft/
查看各个机器复制情况
[root@zjj101 ~]# sh xcall ls
要执行的命令是ls
---------------------zjj101-----------------
anaconda-ks.cfg
apache-tomcat-8.5.28
apache-tomcat-8.5.28.tar.gz
script
soft
---------------------zjj102-----------------
anaconda-ks.cfg
soft
---------------------zjj103-----------------
anaconda-ks.cfg
apache-tomcat-8.5.28
apache-tomcat-8.5.28.tar.gz
soft
[root@zjj101 ~]#

配置第一组broker-a 配置主节点

在zjj102上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties

进入 :/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 路径下

[root@zjj102 2m-2s-async]# pwd
/root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async
[root@zjj102 2m-2s-async]# vim broker-a.properties

修改 broker-a.properties 文件

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

配置从节点

这一组broker的主节点在zjj103上,所以需要配置zjj103上的config/2m-2s-async/broker-
b.properties

进入/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 文件夹下

 cd /root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async

修改 broker-a-s.properties 文件

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
配置第二组Broker-b 配置主节点

这一组broker的主节点在zjj103上,所以需要配置zjj103上的config/2m-2s-async/broker-
b.properties

在 /root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 文件夹下修改 broker-
b.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
配置从节点

然后他对应的slave在zjj102上,修改zjj102上的 conf/2m-2s-async/broker-b-s.properties

#所属集群名字,名字一样的节点就在同一个集群内
brokerClusterName=rocketmq-cluster
#broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-b
#brokerid,0就表示是Master,>0的都是表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/rocketmq/storeSlave
#commitLog 存储路径
storePathCommitLog=/app/rocketmq/storeSlave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
#消息索引存储路径
storePathIndex=/app/rocketmq/storeSlave/index
#checkpoint 文件存储路径
storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
#abort 文件存储路径
abortFile=/app/rocketmq/storeSlave/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
注意点

这样broker就配置完成了。
需要注意的配置项:1、同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ
already started
2、同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错
nameserver不需要进行配置,直接启动就行。这也看出nameserver是无状态的。

配置环境变量

配置 : export ROCKETMQ_HOME=/root/soft/rocketmq-all-4.7.1-bin-release 这行, 路径写你RocketMQ位置

zjj101机器配置:

export PATH="$PATH:/root/script/"# RocketMQexport ROCKETMQ_HOME=/root/soft/rocketmq-all-4.7.1-bin-releaseexport PATH=${ROCKETMQ_HOME}/bin:$PATHexport PATH

配置完了之后集群分发一下 :

在/etc目录下执行 : sh xsync ./profile

[root@zjj101 etc]# sh xsync ./profile要分发的文件的路径是:/etc/profile---------------------zjj102---------------------sending incremental file listprofilesent 640 bytes  received 53 bytes  462.00 bytes/sectotal size is 1,941  speedup is 2.80---------------------zjj103---------------------sending incremental file listprofilesent 640 bytes  received 53 bytes  1,386.00 bytes/sectotal size is 1,941  speedup is 2.80[root@zjj101 etc]#

集群更新下配置文件 ,命令: sh xcall source /etc/profile

[root@zjj101 etc]# sh xcall source /etc/profile
要执行的命令是source /etc/profile
---------------------zjj101-----------------
---------------------zjj102-----------------
---------------------zjj103-----------------

验证是否配置成功 :

命令: sh xcall echo $ROCKETMQ_HOME ,看 打印结果 说明配置成功了

[root@zjj101 etc]# sh xcall echo $ROCKETMQ_HOME
要执行的命令是echo /root/soft/rocketmq-all-4.7.1-bin-release
---------------------zjj101-----------------
/root/soft/rocketmq-all-4.7.1-bin-release
---------------------zjj102-----------------
/root/soft/rocketmq-all-4.7.1-bin-release
---------------------zjj103-----------------
/root/soft/rocketmq-all-4.7.1-bin-release

启动RocketMQ集群

启动就比较简单了,直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默
认的配置都比较高。

先启动nameServer 修改zjj101机器上的runserver.sh配置

修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存,实际情况根据你们公司的电脑配置来调整,我这里是搞着玩的,所以jvm配置就调整小一些

修改 “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh” 文件,JAVA_OPT="${JAVA_OPT} -server配置,大概是67行的位置

先修改zjj101机器的配置文件

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"
集群分发给别的集群节点上

修改完了保存后,集群分发一下

命令: sh xsync “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh”

[root@zjj101 bin]# sh xsync "/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh"
要分发的文件的路径是:/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh
---------------------zjj102---------------------
sending incremental file list
runserver.sh

sent 921 bytes  received 65 bytes  657.33 bytes/sec
total size is 3,527  speedup is 3.58
---------------------zjj103---------------------
sending incremental file list
runserver.sh

sent 921 bytes  received 65 bytes  1,972.00 bytes/sec
total size is 3,527  speedup is 3.58
[root@zjj101 bin]#

检查分发结果

分发完了不放心的话检查一下 ,用集群查看脚本 查看一下刚刚修改的配置是否分发成功了, 我这里查看内容修改成功了.

[root@zjj101 bin]# sh  xcall cat "/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh"

集群启动nameServer 编写启动脚本

在/root/soft/script/创建一个"startRocketMqNameServer.sh" 文件

内容是:

#!/bin/bash

# 我服务器名字是 zjj101 zjj102 zjj103  这里调整你们自己的服务器名字.
array=(zjj101 zjj102 zjj103)

echo  "开始启动RocketMQ集群的NamesrvStartup"
for((i=0;i<${#array[@]};i++))
do
	
	ssh ${array[i]}   nohup sh $ROCKETMQ_HOME/bin/mqnamesrv  > startNameServer.log 2>&1 &
done 
 
启动nameServer

执行刚刚写的脚本

# 执行集群
[root@zjj101 script]# sh startRocketMqNameServer.sh
开始启动RocketMQ集群的NamesrvStartup
# 查看是否都启动成功
[root@zjj101 script]# sh xcall jps -l
要执行的命令是jps -l
---------------------zjj101-----------------
78614 org.apache.rocketmq.namesrv.NamesrvStartup
78686 sun.tools.jps.Jps
---------------------zjj102-----------------
15555 org.apache.rocketmq.namesrv.NamesrvStartup
15577 sun.tools.jps.Jps
---------------------zjj103-----------------
7012 org.apache.rocketmq.namesrv.NamesrvStartup
7034 sun.tools.jps.Jps

再启动broker 修改 runbroker.sh内存

启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件。

在zjj102上启动broker-a的master节点和broker-b的slave节点

修改 zjj102 和 zjj103机器上的 “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runbroker.sh” 的jvm启动内存,如果你不修改的话,默认内存设置的非常大,根据你电脑配置去修改

大概是第66行位置,修改 JAVA_OPT="${JAVA_OPT} -server 配置

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"
启动

zjj102机器上 cd到 /root/soft/rocketmq-all-4.7.1-bin-release/bin 目录下

执行下面两个命令

nohup sh  mqbroker -c ../conf/2m-2s-async/broker-a.properties > broker-a.log 2>&1 &
nohup sh mqbroker -c ../conf/2m-2s-async/broker-b-s.properties  >broker-b-s.log 2>&1&

查看是否执行成功,下面发现有两个BrokerStartup,就说明执行成功了.

[root@zjj102 bin]# jps -l
20512 org.apache.rocketmq.broker.BrokerStartup
15555 org.apache.rocketmq.namesrv.NamesrvStartup
20357 org.apache.rocketmq.broker.BrokerStartup
21528 sun.tools.jps.Jps

zjj103机器上cd到 /root/soft/rocketmq-all-4.7.1-bin-release/bin 目录下

分别执行下面两个命令

nohup sh mqbroker -c ../conf/2m-2s-async/broker-b.properties > broker-b.log 2>&1 &
nohup sh mqbroker -c ../conf/2m-2s-async/broker-a-s.properties > broker-a-s.log 2>&1 &

查看是否启动成功,有两个 BrokerStartup 说明启动成功了.

[root@zjj103 bin]# jps -l
8080 sun.tools.jps.Jps
7012 org.apache.rocketmq.namesrv.NamesrvStartup
7748 org.apache.rocketmq.broker.BrokerStartup
7775 org.apache.rocketmq.broker.BrokerStartup

搭建Rocketmq-console可视化插件

这里windows搭建的: 直接看 https://zjj1994.blog.csdn.net/article/details/120809685 博客, 我就不演示了. 很简单

通过Rocketmq-console查看集群情况,

查看到集群搭建成功.

执行官方示例测试集群是否好用 说明

在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。我们在
zjj102上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息

bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 

接收消息:

bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 

注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以
运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认
会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。

export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'

然后就可以正常执行了。
这个NameServer地址的读取方式见源码中
org.apache.rocketmq.common.utils.NameServerAddressUtils

 public static String getNameServerAddresses() {
    return System.getProperty("rocketmq.namesrv.addr",
	System.getenv("NAMESRV_ADDR"));
 }

这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。

这个tools.sh就封装了一个简单的运行RocketMQ的环境,可以运行源码中的其他示例,然后自己
的例子也可以放到RocketMQ的lib目录下去执行。

开始演示

配置环境变量:

修改 /etc/profile 文件

export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'

配置完了之后,别忘了执行 source /etc/profile 让环境变量生效.

执行 echo $NAMESRV_ADDR 查看环境变量是否配置有问题,能打印出 "zjj101:9876;zjj102:9876;zjj103:9876 " 说明配置没问题

下面开始演示: 在RocketMq 安装目录下执行 : sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

# 查看配置的环境变量是否生效
[root@zjj102 rocketmq-all-4.7.1-bin-release]# echo $NAMESRV_ADDR
# 说明生效了
zjj101:9876;zjj102:9876;zjj103:9876
# 执行官方示例
[root@zjj102 rocketmq-all-4.7.1-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 下面开始打印日志了,说明生效了
12:58:13.013 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BCD0000, offsetMsgId=AC100A6600002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BEB0001, offsetMsgId=AC100A6600002A9F00000000000000CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BEF0002, offsetMsgId=AC100A6600002A9F0000000000000196, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BF10003, offsetMsgId=AC100A6700002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3C0F0004, offsetMsgId=AC100A6700002A9F00000000000000CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=0]

点击刷新查看多了个TopicTest,这个是执行官方示例自动生成的.

你再打开一个终端连接zjj102,执行下面的命令启动消费者

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

自己可以反复的启动 示例 , 生产消息和消费消息

点击 TopicTest的consumer管理来查看消费情况

下面显示的延迟1000 的意思就是有1000个没消费.你再执行 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 启动官方消费者示例之后,你就能看到延迟变少了,就说明消费了.

下面图就说明都被这个消费者消费了, 延迟为0说明没有消息积压了.

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/3988439.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-21
下一篇 2022-10-21

发表评论

登录后才能评论

评论列表(0条)

保存