- 前言
- 快速搭建RocketMQ集群
- 机器环境
- 创建用户
- 系统配置
- 关闭防火墙
- 安装java
- 安装RocketMQ
- 配置RocketMQ集群
- 配置第一组broker-a
- 配置第二组Broker-b
- 启动RocketMQ
- 先启动nameServer
- 再启动broker
- 启动状态检查
- 测试mqadmin管理工具
- 命令行快速验证
- 搭建管理控制台
记录RocketMQ
快速搭建RocketMQ集群 机器环境准备三台机器,root密码 root ;IP地址:
192.168.232.128 worker1 192.168.232.129 worker2 192.168.232.130 worker3创建用户
useradd oper passwd oper (密码输入 123qweasd)系统配置
切换oper用户,在worker1上 生成key
ssh-kengen
然后分发给其他机器
ssh-copy-id worker1 ssh-copy-id worker2 ssh-copy-id worker3
这样就可以在worker1上直接ssh 或者scp到另外的机器,不需要输密码了。
关闭防火墙systemctl stop firewalld.service firewall-cmd --state安装java
给oper创建/app目录
上传jdk的tar包
修改~/.bash_profile,配置环境变量。source生效。
export JAVA_HOME=/app/jdk1.8/安装RocketMQ
上传tar包,直接解压。然后配置环境变量
export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.7.1-bin-release 1
RocketMQ在4.5版本之前都不支持master宕机后slave自动切换。在4.5版本后,增加了基于Dleger实现的主从切换。
配置RocketMQ集群搭建一个2主2从异步刷盘的集群,所以我们会使用conf/2m-2s-async下的配
置文件,实际项目中,为了达到高可用,一般会使用dleger。预备设计的集群情况如下
所以修改的配置文件是进入rocketmq的config目录下修改2m-2s-async的配置文件。–只需要配置broker.conf。
在rocketmq的config目录下可以看到rocketmq建议的各种配置方式:
- 2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失),
- 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全),
- 2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。
- 而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。
采用2m-2s-async的方式搭建集群
配置第一组broker-a在worker2上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-a #brokerid,0就表示是Master,>0的都是表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=worker1:9876; worker2:9876;worker3:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/rocketmq/store #commitLog 存储路径 storePathCommitLog=/app/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/app/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/app/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/app/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
该节点对应的从节点在worker3上。修改2m-2s-async/broker-a-s.properties 只需要修改brokerId和 brokerRole
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-a #brokerid,0就表示是Master,>0的都是表示 Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=worker1:9876; worker2:9876;worker3:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/rocketmq/store #commitLog 存储路径 storePathCommitLog=/app/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/app/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/app/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/app/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128配置第二组Broker-b
这一组broker的主节点在worker3上,所以需要配置worker3上的config/2m-2s-async/brokerb.properties
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-b #brokerid,0就表示是Master,>0的都是表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 1234567891011121314151617181920212223 然后他对应的slave在worker2上, 修改work2上的 conf/2m-2s-async/broker-b-s.properties mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/rocketmq/store #commitLog 存储路径 storePathCommitLog=/app/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/app/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/app/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/app/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
然后他对应的slave在worker2上,修改work2上的 conf/2m-2s-async/broker-b-s.properties
#所属集群名字,名字一样的节点就在同一个集群内 brokerClusterName=rocketmq-cluster #broker名字,名字一样的节点就是一组主从节点。 brokerName=broker-b #brokerid,0就表示是Master,>0的都是表示 Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=worker1:9876;worker2:9876;worker3:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 1234567891011121314151617181920212223 然后他对应的slave在worker2上, 修改work2上的 conf/2m-2s-async/broker-b-s.properties mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/app/rocketmq/store #commitLog 存储路径 storePathCommitLog=/app/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/app/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/app/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/app/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/app/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
这样broker就配置完成了。
需要注意的配置项:
- 同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ
already started - 同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错nameserver不需要进行配置,直接启动就行。这也看出nameserver是无状态的。
启动就比较简单了,直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默认的配置都比较高。
先启动nameServer修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m - XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"
直接在三个节点上启动nameServer。
nohup bin/mqadminsrv &
启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。
The Name Server boot success. serializeType=JSON
使用jps指令可以看到一个NamesrvStartup进程。
这里也看到,RocketMQ在runserver.sh中是使用的CMS垃圾回收期,而在runbroker.sh中使用的是G1垃圾回收期。
启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件。在worker2上启动broker-a的master节点和broker-b的slave节点
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties & nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
在work3上启动broker-b的master节点和broker-a的slave节点
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties & nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
启动slave时,如果遇到报错 Lock failed,MQ already started ,那是因为有多个实例共用了同一个storePath造成的,这时就需要调整store的路径。
启动状态检查使用jps指令,能看到一个NameSrvStartup进程和两个BrokerStartup进程。nohup.out中也有启动成功的日志。对应的日志文件:
# 查看nameServer日志 tail -500f ~/logs/rocketmqlogs/namesrv.log # 查看broker日志 tail -500f ~/logs/rocketmqlogs/broker.log 1234测试mqadmin管理工具
RocketMQ的源代码中并没有为我们提供类似于Nacos或者RabbitMQ那样的控制台,只提供了一个mqadmin指令来管理RocketMQ,命令在bin目录下。使用方式是 ./mqadmin {command} {args}
命令行快速验证在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。在worker2上进入RocketMQ的安装目录:
- 发送消息:默认会发1000条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 1
- 接收消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='worker1:9876;worker2:9876;worker3:9876'
然后就可以正常执行了。这个NameServer地址的读取方式见源码中
org.apache.rocketmq.common.utils.NameServerAddressUtils
public static String getNameServerAddresses() { return System.getProperty("rocketmq.namesrv.addr", System.getenv("NAMESRV_ADDR")); }
这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。这个tools.sh就封装了一个简单的运行RocketMQ的环境,可以运行源码中的其他示例,然后自己
的例子也可以放到RocketMQ的lib目录下去执行。
RocketMQ源代码中并没有提供控制台,但是有一个Rocket的社区扩展项目中提供了一个控制台,地址: https://github.com/apache/rocketmq-externals
下载下来后,进入其中的rocket-console目录,使用maven进行编译
mvn clean package -Dmaven.test.skip=true
编译完成后,获取target下的jar包,就可以直接执行。但是这个时候要注意,在这个项目的application.properties中需要指定nameserver的地址。默认这个属性是空的。可以在jar包的当前目录下增加一application.properties文件,覆盖jar包中默认的一个属性:
rocketmq.config.namesrvAddr=worker1:9876;worker2:9876;worker3:9876 1
然后执行:
java -jar rocketmq-console-ng-1.0.1.jar
启动完成后,可以访问 http://192.168.232.128:8080看到管理页面
如果需要搭建高可用,需要使用Dleger搭建高可用集群
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)