1.配置jdk1.8 环境变量 ,如果没配置的话参考这个博客去配置: https://blog.csdn.net/qq_41489540/article/details/116399710
2.集群批量执行命令的脚本,看这个博客去整一个:
https://blog.csdn.net/qq_41489540/article/details/109094840
为什么让整这个脚本呢?原因是集群集群很多,如果你一个一个去执行命令安装的话,会累死你的.弄了这个脚本之后就可以实现在一台机器敲完命令之后,集群多台机器都能执行到这个命令.
3.xsync集群同步脚本:
https://blog.csdn.net/qq_41489540/article/details/109094046
搭建计划这次搭建一个2主2从异步刷盘的集群
说明,broker-a 意思是amaster节点 , brkoer-a-s的意思是a主的slave节点
关闭集群中的防火墙我用的 xcall脚本, 你也可以自己登录集群中的每一台机器,自己去执行 systemctl stop firewalld.service 关闭
[root@zjj101 /]# xcall systemctl stop firewalld.service 要执行的命令是systemctl stop firewalld.service ---------------------zjj101----------------- ---------------------zjj102----------------- ---------------------zjj103----------------- [root@zjj101 /]# xcall firewall-cmd --state 要执行的命令是firewall-cmd --state ---------------------zjj101----------------- not running ---------------------zjj102----------------- not running ---------------------zjj103----------------- not running [root@zjj101 /]#上传rocketMQ安装包到zjj101机器上
我直接在 /root/soft/ 目录下放了一个 rocketmq-all-4.7.1-bin-release.rar ,你们自己自行准备.
或者从下面百度云网盘里面下载
链接: https://pan.baidu.com/s/1xhcPh0Dk18THT1s-ArDGkQ 提取码: ua2c 复制这段内容后打开百度网盘手机App, *** 作更方便哦
如果没有安装rar包的,参考: https://zjj1994.blog.csdn.net/article/details/120802578 博客
解压
[root@zjj101 soft]# unrar x rocketmq-all-4.7.1-bin-release.rar集群分发过去
写相对路径,命令: sh xsync soft/
[root@zjj101 ~]# pwd /root [root@zjj101 ~]# ls anaconda-ks.cfg apache-tomcat-8.5.28.tar.gz soft apache-tomcat-8.5.28 script [root@zjj101 ~]# sh xsync soft/查看各个机器复制情况
[root@zjj101 ~]# sh xcall ls 要执行的命令是ls ---------------------zjj101----------------- anaconda-ks.cfg apache-tomcat-8.5.28 apache-tomcat-8.5.28.tar.gz script soft ---------------------zjj102----------------- anaconda-ks.cfg soft ---------------------zjj103----------------- anaconda-ks.cfg apache-tomcat-8.5.28 apache-tomcat-8.5.28.tar.gz soft [root@zjj101 ~]#配置第一组broker-a 配置主节点
在zjj102上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties
进入 :/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 路径下
[root@zjj102 2m-2s-async]# pwd /root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async [root@zjj102 2m-2s-async]# vim broker-a.properties
修改 broker-a.properties 文件
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-a #brokerid,0就表示是Master,>0的都是表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/rocketmq/store #commitLog 存储路径 storePathCommitLog=/app/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/app/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/app/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/app/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128配置从节点
这一组broker的主节点在zjj103上,所以需要配置zjj103上的config/2m-2s-async/broker-
b.properties
进入/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 文件夹下
cd /root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async
修改 broker-a-s.properties 文件
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-a #brokerid,0就表示是Master,>0的都是表示 Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/rocketmq/storeSlave #commitLog 存储路径 storePathCommitLog=/app/rocketmq/storeSlave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue #消息索引存储路径 storePathIndex=/app/rocketmq/storeSlave/index #checkpoint 文件存储路径 storeCheckpoint=/app/rocketmq/storeSlave/checkpoint #abort 文件存储路径 abortFile=/app/rocketmq/storeSlave/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128配置第二组Broker-b 配置主节点
这一组broker的主节点在zjj103上,所以需要配置zjj103上的config/2m-2s-async/broker-
b.properties
在 /root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 文件夹下修改 broker-
b.properties
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-b #brokerid,0就表示是Master,>0的都是表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/rocketmq/store #commitLog 存储路径 storePathCommitLog=/app/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/app/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/app/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/app/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128配置从节点
然后他对应的slave在zjj102上,修改zjj102上的 conf/2m-2s-async/broker-b-s.properties
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-b #brokerid,0就表示是Master,>0的都是表示 Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/rocketmq/storeSlave #commitLog 存储路径 storePathCommitLog=/app/rocketmq/storeSlave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue #消息索引存储路径 storePathIndex=/app/rocketmq/storeSlave/index #checkpoint 文件存储路径 storeCheckpoint=/app/rocketmq/storeSlave/checkpoint #abort 文件存储路径 abortFile=/app/rocketmq/storeSlave/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128注意点
这样broker就配置完成了。
需要注意的配置项:1、同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ
already started
2、同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错
nameserver不需要进行配置,直接启动就行。这也看出nameserver是无状态的。
配置 : export ROCKETMQ_HOME=/root/soft/rocketmq-all-4.7.1-bin-release 这行, 路径写你RocketMQ位置
zjj101机器配置:
export PATH="$PATH:/root/script/"# RocketMQexport ROCKETMQ_HOME=/root/soft/rocketmq-all-4.7.1-bin-releaseexport PATH=${ROCKETMQ_HOME}/bin:$PATHexport PATH
配置完了之后集群分发一下 :
在/etc目录下执行 : sh xsync ./profile
[root@zjj101 etc]# sh xsync ./profile要分发的文件的路径是:/etc/profile---------------------zjj102---------------------sending incremental file listprofilesent 640 bytes received 53 bytes 462.00 bytes/sectotal size is 1,941 speedup is 2.80---------------------zjj103---------------------sending incremental file listprofilesent 640 bytes received 53 bytes 1,386.00 bytes/sectotal size is 1,941 speedup is 2.80[root@zjj101 etc]#
集群更新下配置文件 ,命令: sh xcall source /etc/profile
[root@zjj101 etc]# sh xcall source /etc/profile 要执行的命令是source /etc/profile ---------------------zjj101----------------- ---------------------zjj102----------------- ---------------------zjj103-----------------
验证是否配置成功 :
命令: sh xcall echo $ROCKETMQ_HOME ,看 打印结果 说明配置成功了
[root@zjj101 etc]# sh xcall echo $ROCKETMQ_HOME 要执行的命令是echo /root/soft/rocketmq-all-4.7.1-bin-release ---------------------zjj101----------------- /root/soft/rocketmq-all-4.7.1-bin-release ---------------------zjj102----------------- /root/soft/rocketmq-all-4.7.1-bin-release ---------------------zjj103----------------- /root/soft/rocketmq-all-4.7.1-bin-release启动RocketMQ集群
启动就比较简单了,直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默
认的配置都比较高。
修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存,实际情况根据你们公司的电脑配置来调整,我这里是搞着玩的,所以jvm配置就调整小一些
修改 “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh” 文件,JAVA_OPT="${JAVA_OPT} -server配置,大概是67行的位置
先修改zjj101机器的配置文件
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"集群分发给别的集群节点上
修改完了保存后,集群分发一下
命令: sh xsync “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh”
[root@zjj101 bin]# sh xsync "/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh" 要分发的文件的路径是:/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh ---------------------zjj102--------------------- sending incremental file list runserver.sh sent 921 bytes received 65 bytes 657.33 bytes/sec total size is 3,527 speedup is 3.58 ---------------------zjj103--------------------- sending incremental file list runserver.sh sent 921 bytes received 65 bytes 1,972.00 bytes/sec total size is 3,527 speedup is 3.58 [root@zjj101 bin]#检查分发结果
分发完了不放心的话检查一下 ,用集群查看脚本 查看一下刚刚修改的配置是否分发成功了, 我这里查看内容修改成功了.
[root@zjj101 bin]# sh xcall cat "/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh"集群启动nameServer 编写启动脚本
在/root/soft/script/创建一个"startRocketMqNameServer.sh" 文件
内容是:
#!/bin/bash # 我服务器名字是 zjj101 zjj102 zjj103 这里调整你们自己的服务器名字. array=(zjj101 zjj102 zjj103) echo "开始启动RocketMQ集群的NamesrvStartup" for((i=0;i<${#array[@]};i++)) do ssh ${array[i]} nohup sh $ROCKETMQ_HOME/bin/mqnamesrv > startNameServer.log 2>&1 & done启动nameServer
执行刚刚写的脚本
# 执行集群 [root@zjj101 script]# sh startRocketMqNameServer.sh 开始启动RocketMQ集群的NamesrvStartup # 查看是否都启动成功 [root@zjj101 script]# sh xcall jps -l 要执行的命令是jps -l ---------------------zjj101----------------- 78614 org.apache.rocketmq.namesrv.NamesrvStartup 78686 sun.tools.jps.Jps ---------------------zjj102----------------- 15555 org.apache.rocketmq.namesrv.NamesrvStartup 15577 sun.tools.jps.Jps ---------------------zjj103----------------- 7012 org.apache.rocketmq.namesrv.NamesrvStartup 7034 sun.tools.jps.Jps再启动broker 修改 runbroker.sh内存
启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件。
在zjj102上启动broker-a的master节点和broker-b的slave节点
修改 zjj102 和 zjj103机器上的 “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runbroker.sh” 的jvm启动内存,如果你不修改的话,默认内存设置的非常大,根据你电脑配置去修改
大概是第66行位置,修改 JAVA_OPT="${JAVA_OPT} -server 配置
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"启动
zjj102机器上 cd到 /root/soft/rocketmq-all-4.7.1-bin-release/bin 目录下
执行下面两个命令
nohup sh mqbroker -c ../conf/2m-2s-async/broker-a.properties > broker-a.log 2>&1 & nohup sh mqbroker -c ../conf/2m-2s-async/broker-b-s.properties >broker-b-s.log 2>&1&
查看是否执行成功,下面发现有两个BrokerStartup,就说明执行成功了.
[root@zjj102 bin]# jps -l 20512 org.apache.rocketmq.broker.BrokerStartup 15555 org.apache.rocketmq.namesrv.NamesrvStartup 20357 org.apache.rocketmq.broker.BrokerStartup 21528 sun.tools.jps.Jps
zjj103机器上cd到 /root/soft/rocketmq-all-4.7.1-bin-release/bin 目录下
分别执行下面两个命令
nohup sh mqbroker -c ../conf/2m-2s-async/broker-b.properties > broker-b.log 2>&1 & nohup sh mqbroker -c ../conf/2m-2s-async/broker-a-s.properties > broker-a-s.log 2>&1 &
查看是否启动成功,有两个 BrokerStartup 说明启动成功了.
[root@zjj103 bin]# jps -l 8080 sun.tools.jps.Jps 7012 org.apache.rocketmq.namesrv.NamesrvStartup 7748 org.apache.rocketmq.broker.BrokerStartup 7775 org.apache.rocketmq.broker.BrokerStartup搭建Rocketmq-console可视化插件
这里windows搭建的: 直接看 https://zjj1994.blog.csdn.net/article/details/120809685 博客, 我就不演示了. 很简单
通过Rocketmq-console查看集群情况,查看到集群搭建成功.
执行官方示例测试集群是否好用 说明在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。我们在
zjj102上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以
运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认
会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'
然后就可以正常执行了。
这个NameServer地址的读取方式见源码中
org.apache.rocketmq.common.utils.NameServerAddressUtils
public static String getNameServerAddresses() { return System.getProperty("rocketmq.namesrv.addr", System.getenv("NAMESRV_ADDR")); }
这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。
这个tools.sh就封装了一个简单的运行RocketMQ的环境,可以运行源码中的其他示例,然后自己
的例子也可以放到RocketMQ的lib目录下去执行。
配置环境变量:
修改 /etc/profile 文件
export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'
配置完了之后,别忘了执行 source /etc/profile 让环境变量生效.
执行 echo $NAMESRV_ADDR 查看环境变量是否配置有问题,能打印出 "zjj101:9876;zjj102:9876;zjj103:9876 " 说明配置没问题
下面开始演示: 在RocketMq 安装目录下执行 : sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 查看配置的环境变量是否生效 [root@zjj102 rocketmq-all-4.7.1-bin-release]# echo $NAMESRV_ADDR # 说明生效了 zjj101:9876;zjj102:9876;zjj103:9876 # 执行官方示例 [root@zjj102 rocketmq-all-4.7.1-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer # 下面开始打印日志了,说明生效了 12:58:13.013 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BCD0000, offsetMsgId=AC100A6600002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BEB0001, offsetMsgId=AC100A6600002A9F00000000000000CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BEF0002, offsetMsgId=AC100A6600002A9F0000000000000196, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BF10003, offsetMsgId=AC100A6700002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=0] SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3C0F0004, offsetMsgId=AC100A6700002A9F00000000000000CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=0]
点击刷新查看多了个TopicTest,这个是执行官方示例自动生成的.
你再打开一个终端连接zjj102,执行下面的命令启动消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
自己可以反复的启动 示例 , 生产消息和消费消息
点击 TopicTest的consumer管理来查看消费情况
下面显示的延迟1000 的意思就是有1000个没消费.你再执行 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 启动官方消费者示例之后,你就能看到延迟变少了,就说明消费了.
下面图就说明都被这个消费者消费了, 延迟为0说明没有消息积压了.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)