—
一、什么是RocketMQ?二、安装与配置RocketMQ
百度网盘分享官方下载地址安装配置 三、快速运行RocketMQ
RocketMQ组件 四、启动NameServer五、启动Broker六、命令行快速验证七、关闭RocketMQ服务八、RocketMQ集群架构
2、RocketMQ集群搭建
1、机器环境2、创建用户3、系统配置
关闭防火墙(生产环境不建议这么做,有被入侵的风险) 4、安装java5、安装RocketMQ6、配置RocketMQ集群
1、配置第一组broker-a2、配置第二组Broker-b 7、启动RocketMQ
2、再启动broker3、启动状态检查4、测试mqadmin管理工具5、命令行快速验证 8、搭建管理控制台
jar包运行docker镜像运行 9、Dleger高可用集群搭建
搭建方法 10、调整系统参数
1、配置RocketMQ的JVM内存大小:2、RocketMQ的其他一些核心参数3、Linux内核参数定制 3、RocketMQ的其他参考资料4、小总结 九、RocketMQ原生API使用
1、测试环境搭建2、RocketMQ的编程模型
消息发送者的固定步骤消息消费者的固定步骤 3、RocketMQ的消息样例
3.1 基本样例
1、同步发送消息的样例见:org.apache.rocketmq.example.simple.Producer2、异步发送消息的样例见:org.apache.rocketmq.example.simple.AsyncProducer3、单向发送消息的样例:4、使用消费者消费消息。 3.2 顺序消息3.3 广播消息3.4 延迟消息3.5 批量消息3.6 过滤消息3.7 事务消息3.8 ACL权限控制 十、SpringBoot整合RocketMQ
1、快速实战2、总结 十一、SpringCloudStream整合RocketMQ
1、快速实战2、总结 十二、高级原理
一、基础概念
1 消息模型(Message Model)2 消息生产者(Producer)3 消息消费者(Consumer)4 主题(Topic)5 代理服务器(Broker Server)6 名字服务(Name Server)7 消息(Message) 二、消息存储
1、何时存储消息2、消息存储介质
2.1磁盘保存文件慢吗?2.2零拷贝技术加速文件读写 3 消息存储结构4 刷盘机制5 消息主从复制
同步复制异步复制 6 负载均衡
6.1Producer负载均衡6.2 Consumer负载均衡
1、集群模式2、广播模式 7、消息重试9、消息幂等
1、幂等的概念2、消息幂等的必要性3、处理方式 十三、RockerMQ源码
1、源码环境搭建
1、源码拉取2、注解版源码引入3、源码调试
3.2 启动Broker3.3 发送消息3.4 消费消息3.5 如何看源码 2、NameServer启动
1、核心问题2、源码重点 3、Broker启动4、Broker注册5、Producer6、消息存储
6.2-分发ConsumeQueue和IndexFile6.3、文件同步刷盘与异步刷盘6.4、过期文件删除6.5、文件存储部分的总结 7、消费者
7.1、启动7.2、消息拉取7.3 长轮询拉取机制7.4 客户端负载均衡策略 8、延迟消息 十四、实践问题
一、使用RocketMQ如何保证消息不丢失?
1、哪些环节会有丢消息的可能?2、RocketMQ消息零丢失方案
1》 生产者使用事务消息机制保证消息零丢失2》RocketMQ配置同步刷盘+Dledger主从架构保证MQ自身不会丢消息3》消费者端不要使用异步消费机制4》RocketMQ特有的问题,NameServer挂了如何保证消息不丢失?5》RocketMQ消息零丢失方案总结 二、使用RocketMQ如何保证消息顺序
1、为什么要保证消息有序?2、如何保证消息有序? 三、使用RocketMQ如何快速处理积压消息?
1、如何确定RocketMQ有大量的消息积压?2、如何处理大量积压的消息? 四、RocketMQ的消息轨迹
1、RocketMQ消息轨迹数据的关键属性:2、消息轨迹配置3、消息轨迹数据存储
RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。目前RocketMQ在阿里云上有一个购买即可用的商业版本,商业版本集成了阿里内部一些更深层次的功能及运维定制。我们这里学习的是Apache的开源版本。开源版本相对于阿里云上的商业版本,功能上略有缺失,但是大体上功能是一样的。
RocketMQ的官网地址: http://rocketmq.apache.org ,github地址是 https://github.com/apache/rocketmq
RocketMQ运行版本下载地址:
链接:https://pan.baidu.com/s/1mqfO8bpQs0yTnX6oeOsPCA?pwd=2022
提取码:2022
RocketMQ源码版本下载地址:
链接:https://pan.baidu.com/s/1NbgSuv89uSgtYALRzSYr0w?pwd=2022
提取码:2022
RocketMQ运行版本下载地址:
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
RocketMQ源码版本下载地址:
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip
查看linux版本
uname -a
创建一个 *** 作用户,用来运行自己的程序,与root用户区分开。使用root用户创建一个oper用户,并给他创建一个工作目录,设置用户密码
useradd liaozhiwei passwd liaozhiwei mkdir /opt/rocketmq cd /opt/rocketmq chown liaozhiwei:liaozhiwei /opt/rocketmq
运行RocketMQ需要先安装JDK。我们采用目前最稳定的JDK1.8版本。可以自行去Oracle官网上下载也可以使用我从官网拉下来的jdk版本。
链接:https://pan.baidu.com/s/10YA9SBV7Y6TKJ9keBrNVWw?pwd=2022
提取码:2022
然后用FTP或者WSP上传到liaozhiwei用户的工作目录下。由liaozhiwei用户解压到/opt/jdk目录下。
tar -zxvf jdk-8u301-linux-x64.tar.gz
继续上传RocketMQ到/opt/rocketmq目录下,由于我们上传的包是zip的包,解压需要通过解压工具,所以我们安装一个
yum install unzip zip
然后解压
unzip rocketmq-all-4.7.1-bin-release.zip
配置环境变量。使用vim /etc/profile编辑文件,在文件尾部添加以下内容
###java环境配置 export JAVA_HOME=/opt/jdk export JRE_HOME=$JAVA_HOME/jre export CLASSPATH=./:JAVA_HOME/lib:$JRE_HOME/lib ###RocketMQ环境配置 export ROCKETMQ_HOME=/opt/rocketmq ###path export PATH=/bin:/usr/bin:/sbin:/usr/sbin:$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
编辑完成后,通过:wq保存退出,执行source /etc/profile让环境变量生效。输入java -version能查看到以下内容表明JDK安装成功了。
ROCKETMQ_HOME的环境变量是必须要单独配置的,如果不配置的话,启动NameSever和Broker都会报错。
这个环境变量的作用是用来加载$ROCKETMQ_HOME/conf下的除broker.conf以外的几个配置文件。所以实际情况中,可以不按这个配置,但是一定要能找到配置文件。
运行之前,我们需要对RocketMQ的组件结构有个大致的了解。
![RocketMQ组件.png](https://img-blog.csdnimg.cn/img_convert/1e6fa243d7a4cc928f5b17424ad4b7d2.png#clientId=uca02257f-f78f-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u4a5468dc&margin=[object Object]&name=RocketMQ组件.png&originHeight=372&originWidth=904&originalType=binary&ratio=1&rotation=0&showTitle=false&size=59845&status=done&style=none&taskId=u6f3fb25f-b8a8-4cee-a7a5-c8286abf1b9&title=)
RocketMQ由以下这几个组件组成
NameServer : 提供轻量级的Broker路由服务。 Broker:实际处理消息存储、转发等服务的核心组件。 Producer:消息生产者集群。通常是业务系统中的一个功能模块。 Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。
所以我们要启动RocketMQ服务,需要先启动NameServer。
四、启动NameServer启动NameServer非常简单, 在$ROCKETMQ_HOME/bin目录下有个mqnamesrv。直接执行这个脚本就可以启动RocketMQ的NameServer服务。
但是要注意,RocketMQ默认预设的JVM内存是4G,这是RocketMQ给我们的最佳配置。但是通常我们用虚拟机的话都是不够4G内存的,所以需要调整下JVM内存大小。修改的方式是直接修改runserver.sh。 用
vim /opt/rocketmq/bin/runserver.sh
编辑这个脚本,在脚本中找到这一行调整内存大小为512M
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"
授权
chmod 777 /opt/rocketmq/bin/mqnamesrv
然后我们用静默启动的方式启动NameServer服务:
nohup /opt/rocketmq/bin/mqnamesrv > /opt/rocketmq/nameServerLog 2>&1 &
启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。并且使用jps指令可以看到有一个NamesrvStartup进程。
cat /opt/rocketmq/nameServerLog
![图片.png](https://img-blog.csdnimg.cn/img_convert/52a97736cfcc2f275bdee6e1e9a114b8.png#clientId=ub4fc0e10-a510-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=53&id=u6c5e8852&margin=[object Object]&name=图片.png&originHeight=105&originWidth=1152&originalType=binary&ratio=1&rotation=0&showTitle=false&size=17937&status=done&style=none&taskId=ubd803de6-858d-46d8-8597-cb0060f3980&title=&width=576)
五、启动Broker主要尽量将文件夹的名字进行调整,不用多余的特殊字符
启动Broker的脚本是runbroker.sh。Broker的默认预设内存是8G,启动前,如果内存不够,同样需要调整下JVM内存。
vim /opt/rocketmq/bin/runbroker.sh
找到这一行,进行内存调整
![图片.png](https://img-blog.csdnimg.cn/img_convert/12f7e3d0a976e21ff603b6058c14b0a8.png#clientId=uc3734464-07a4-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=48&id=u41bceffe&margin=[object Object]&name=图片.png&originHeight=95&originWidth=644&originalType=binary&ratio=1&rotation=0&showTitle=false&size=10909&status=done&style=none&taskId=u0dbe7070-f57d-43a6-8f63-dc8bf93eadf&title=&width=322)
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
然后我们需要找到$ROCKETMQ_HOME/conf/broker.conf, vim指令进行编辑,在最下面加入一个配置:
vim /opt/rocketmq/conf/broker.conf
autoCreateTopicEnable=true
授权
chmod 777 /opt/rocketmq/bin/mqbroker
然后也以静默启动的方式启动runbroker.sh
nohup /opt/rocketmq/bin/mqbroker -n localhost:9876 > /opt/rocketmq/brokerlog 2>&1 &
启动完成后,同样是检查nohup.out日志,有这一条关键日志就标识启动成功了。 并且jps指令可以看到一个BrokerStartup进程。
![图片.png](https://img-blog.csdnimg.cn/img_convert/9b93cb2b60f69ad1cd2df83108fbcc87.png#clientId=ub4fc0e10-a510-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=42&id=u7462fa0d&margin=[object Object]&name=图片.png&originHeight=84&originWidth=344&originalType=binary&ratio=1&rotation=0&showTitle=false&size=5619&status=done&style=none&taskId=u9716a796-b1ad-4a78-bd57-884a405822b&title=&width=172)
查看brokerlog日志文件
cat /opt/rocketmq/brokerlog
![图片.png](https://img-blog.csdnimg.cn/img_convert/bd219894b3370a9c2946437d9d25ac78.png#clientId=ub4fc0e10-a510-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=40&id=u8d6a04c4&margin=[object Object]&name=图片.png&originHeight=79&originWidth=867&originalType=binary&ratio=1&rotation=0&showTitle=false&size=10434&status=done&style=none&taskId=u41b733a2-dae8-4f2c-b059-50ecb3e9483&title=&width=433.5)
在观察runserver.sh和runbroker.sh时,我们还可以查看到其他的JVM执行参数,这些参数都可以进行定制。例如我们观察到一个比较有意思的地方,nameServer使用的是CMS垃圾回收器,而Broker使用的是G1垃圾回收器。
在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。我们在worker2上进入RocketMQ的安装目录:
授权命令
chmod 777 /opt/rocketmq/bin/tools.sh
然后启动消息生产者发送消息:默认会发1000条消息
/opt/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
我们可以看到发送消息的日志:
![图片.png](https://img-blog.csdnimg.cn/img_convert/08f0e08016304b6228e1a968d2db7f5d.png#clientId=ub4fc0e10-a510-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=432&id=ubc3711f1&margin=[object Object]&name=图片.png&originHeight=864&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=351870&status=done&style=none&taskId=u984f3600-ef68-4f14-8211-5c1764ad2e6&title=&width=960)
这日志中,上面部分就是我们发送的消息的内容。后面两句标识消息生产者正常关闭。
然后启动消息消费者接收消息:
/opt/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
启动后,可以看到消费到的消息。
![图片.png](https://img-blog.csdnimg.cn/img_convert/2477ceef6fe16a7cb14302984c061621.png#clientId=ub4fc0e10-a510-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=415&id=u3a0db81b&margin=[object Object]&name=图片.png&originHeight=829&originWidth=1920&originalType=binary&ratio=1&rotation=0&showTitle=false&size=348568&status=done&style=none&taskId=u0b8906f5-e5ea-479e-bb41-062bccf447b&title=&width=960)
日志中MessageExt后的整个内容就是一条完整的RocketMQ消息。我们要对这个消息的结构有个大概的了解,后面会对这个消息进行深入的理解。
其中比较关键的属性有:brokerName,queueId,msgId,topic,cluster,tags,body,transactionId。先找下这些属性在哪里。
而这个Consume指令并不会结束,他会继续挂起,等待消费其他的消息。我们可以使用CTRL+C停止该进程。
七、关闭RocketMQ服务要关闭RocketMQ服务可以通过mqshutdown脚本直接关闭
授权
chmod 777 /opt/rocketmq/bin/mqshutdown
1.关闭NameServer
sh /opt/rocketmq/bin/mqshutdown namesrv
2.关闭Broker
sh /opt/rocketmq/bin/mqshutdown broker八、RocketMQ集群架构
刚才的演示中,我们已经体验到了RocketMQ是如何工作的。这样,我们回头看RocketMQ的集群架构,就能够有更全面的理解了。
![RocketMQ集群.png](https://img-blog.csdnimg.cn/img_convert/154aea440a5bbcdd6a0ee81d301a5f33.png#clientId=uca02257f-f78f-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=uc25d0efb&margin=[object Object]&name=RocketMQ集群.png&originHeight=372&originWidth=904&originalType=binary&ratio=1&rotation=0&showTitle=false&size=59845&status=done&style=none&taskId=u0ba6a1d7-cade-40db-9f1e-3c27a43037c&title=)
1、RocketMQ集群中的各个角色
一个完整的RocketMQ集群中,有如下几个角色
- Producer:消息的发送者;举例:发信者Consumer:消息接收者;举例:收信者Broker:暂存和传输消息;举例:邮局NameServer:管理Broker;举例:各个邮局的管理机构Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息。我们之前的测试案例中,Topic是什么?topic=‘TopicTest’。现在你能看懂我们之前在broker.conf中添加的autoCreateTopicEnable=true这个属性的用处了吗?Message Queue:相当于是Topic的分区;用于并行发送和接收消息。在我们之前的测试案例中,一个queueId就代表了一个MessageQueue。有哪些queueId? 0,1,2,3四个MessageQueue,你都找到了吗?
准备三台虚拟机,root密码 root ;IP地址:
机器1的ip worker1机器2的ip worker2机器3的ip worker3
这里特意不把每个机器的机器名定义得太过规范,比如master slave这样的,有助于更理解各项配置。
2、创建用户useradd liaozhiwei
passwd liaozhiwei (密码输入 123qweasd)
3、系统配置免密登录
切换liaozhiwei用户,在worker1上 生成key
ssh-kengen
然后分发给其他机器
ssh-copy-id worker1ssh-copy-id worker2ssh-copy-id worker3
这样就可以在worker1上直接ssh 或者scp到另外的机器,不需要输密码了。
关闭防火墙(生产环境不建议这么做,有被入侵的风险)systemctl stop firewalld.servicefirewall-cmd --state4、安装java
给liaozhiweir创建/rocketmq目录
上传jdk的tar包
vim /etc/profile,配置环境变量。source /etc/profile生效。
export JAVA_HOME=/opt/jdk/
![图片.png](https://img-blog.csdnimg.cn/img_convert/c63990fb6e8ccedc156d30da71c7fb54.png#clientId=ua3452ceb-6890-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=116&id=ubc5add3f&margin=[object Object]&name=图片.png&originHeight=232&originWidth=706&originalType=binary&ratio=1&rotation=0&showTitle=false&size=21921&status=done&style=none&taskId=ud297424d-61fb-4525-96b8-09af1ff4570&title=&width=353)
上传tar包,直接解压。然后配置环境变量
export ROCKETMQ_HOME=/opt/rocketmq
RocketMQ在4.5版本之前都不支持master宕机后slave自动切换。在4.5版本后,增加了基于Dleger实现的主从切换。这里用的目前最新的4.7.1版本
我们为了便于观察,这次搭建一个2主2从异步刷盘的集群,所以我们会使用conf/2m-2s-async下的配置文件,实际项目中,为了达到高可用,一般会使用dleger。预备设计的集群情况如下:
所以修改的配置文件是进入rocketmq的config目录下修改2m-2s-async的配置文件。–只需要配置broker.conf。
在rocketmq的config目录下可以看到rocketmq建议的各种配置方式: 2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失), 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全), 2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。 而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。
我们这次采用2m-2s-async的方式搭建集群。
1、配置第一组broker-a在worker2上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties
#所属集群名字,名字一样的节点就在同一个集群内brokerClusterName=rocketmq-cluster#broker名字,名字一样的节点就是一组主从节点。brokerName=broker-a#brokerid,0就表示是Master,>0的都是表示 SlavebrokerId=0#nameServer地址,分号分割namesrvAddr=worker1:9876;worker2:9876;worker3:9876#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=120#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/opt/rocketmq/store#commitLog 存储路径storePathCommitLog=/opt/rocketmq/store/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/opt/rocketmq/store/consumequeue#消息索引存储路径storePathIndex=/opt/rocketmq/store/index#checkpoint 文件存储路径storeCheckpoint=/opt/rocketmq/store/checkpoint#abort 文件存储路径abortFile=/opt/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
该节点对应的从节点在worker3上。修改2m-2s-async/broker-a-s.properties 只需要修改brokerId和brokerRole
#所属集群名字,名字一样的节点就在同一个集群内brokerClusterName=rocketmq-cluster#broker名字,名字一样的节点就是一组主从节点。brokerName=broker-a#brokerid,0就表示是Master,>0的都是表示 SlavebrokerId=1#nameServer地址,分号分割namesrvAddr=worker1:9876;worker2:9876;worker3:9876#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=11011#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=120#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/opt/rocketmq/storeSlave#commitLog 存储路径storePathCommitLog=/opt/rocketmq/storeSlave/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/opt/rocketmq/storeSlave/consumequeue#消息索引存储路径storePathIndex=/opt/rocketmq/storeSlave/index#checkpoint 文件存储路径storeCheckpoint=/opt/rocketmq/storeSlave/checkpoint#abort 文件存储路径abortFile=/opt/rocketmq/storeSlave/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SLAVE#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=1282、配置第二组Broker-b
这一组broker的主节点在worker3上,所以需要配置worker3上的config/2m-2s-async/broker-b.properties
#所属集群名字,名字一样的节点就在同一个集群内brokerClusterName=rocketmq-cluster#broker名字,名字一样的节点就是一组主从节点。brokerName=broker-b#brokerid,0就表示是Master,>0的都是表示 SlavebrokerId=0#nameServer地址,分号分割namesrvAddr=worker1:9876;worker2:9876;worker3:9876#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=120#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/opt/rocketmq/store#commitLog 存储路径storePathCommitLog=/opt/rocketmq/store/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/opt/rocketmq/store/consumequeue#消息索引存储路径storePathIndex=/opt/rocketmq/store/index#checkpoint 文件存储路径storeCheckpoint=/opt/rocketmq/store/checkpoint#abort 文件存储路径abortFile=/opt/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
然后他对应的slave在worker2上,修改work2上的 conf/2m-2s-async/broker-b-s.properties
#所属集群名字,名字一样的节点就在同一个集群内brokerClusterName=rocketmq-cluster#broker名字,名字一样的节点就是一组主从节点。brokerName=broker-b#brokerid,0就表示是Master,>0的都是表示 SlavebrokerId=1#nameServer地址,分号分割namesrvAddr=worker1:9876;worker2:9876;worker3:9876#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=11011#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=120#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/opt/rocketmq/storeSlave#commitLog 存储路径storePathCommitLog=/opt/rocketmq/storeSlave/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/opt/rocketmq/storeSlave/consumequeue#消息索引存储路径storePathIndex=/opt/rocketmq/storeSlave/index#checkpoint 文件存储路径storeCheckpoint=/opt/rocketmq/storeSlave/checkpoint#abort 文件存储路径abortFile=/opt/rocketmq/storeSlave/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SLAVE#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
这样broker就配置完成了。
需要注意的配置项:
1、同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ already started
2、同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错,nameserver不需要进行配置,直接启动就行。这也看出nameserver是无状态的。
3、其他的配置项参见《RcoketMQ全部配置表.pdf》
7、启动RocketMQ启动就比较简单了,直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默认的配置都比较高。
1、先启动nameServer。
修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:metaspaceSize=128m -XX:MaxmetaspaceSize=320m"
直接在三个节点上启动nameServer。
nohup /opt/rocketmq/bin/mqnamesrv > /opt/rocketmq/nameServer.log 2>&1 &
启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。
Java HotSpot™ 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release Java HotSpot™ 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release. The Name Server boot success. serializeType=JSON
使用jps指令可以看到一个NamesrvStartup进程。
这里也看到,RocketMQ在runserver.sh中是使用的CMS垃圾回收期,而在runbroker.sh中使用的是G1垃圾回收期。
2、再启动broker启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件。
在worker2上启动broker-a的master节点和broker-b的slave节点
nohup /opt/rocketmq/bin/mqnamesrv -c /opt/rocketmq/conf/2m-2s-async/broker-a.properties &nohup /opt/rocketmq/bin/mqnamesrv -c /opt/rocketmq/conf/2m-2s-async/broker-b-s.properties &
在work3上启动broker-b的master节点和broker-a的slave节点
nohup /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-b.properties &nohup /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/2m-2s-async/broker-a-s.properties &
启动slave时,如果遇到报错 Lock failed,MQ already started ,那是因为有多个实例共用了同一个storePath造成的,这时就需要调整store的路径。
3、启动状态检查使用jps指令,能看到一个NameSrvStartup进程和两个BrokerStartup进程。
nohup.out中也有启动成功的日志。
对应的日志文件:
查看nameServer日志tail -500f /opt/rocketmq/nameServer.log查看broker日志tail -500f /opt/rocketmq/broker.log4、测试mqadmin管理工具
RocketMQ的源代码中并没有为我们提供类似于Nacos或者RabbitMQ那样的控制台,只提供了一个mqadmin指令来管理RocketMQ,命令在bin目录下。使用方式是 ./mqadmin {command} {args}
5、命令行快速验证1、几乎所有指令都需要通过-n参数配置nameServer地址,格式为ip:port
2、几乎所有执行都可以通过-h参数获得帮助
3、当既有Broker地址(-b)又有集群名称clustername(-c)配合项,则优先以Broker地址执行指令。如果不配置Broker地址,则对集群中所有主机执行指令。
在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。我们在worker2上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='worker1:9876;worker2:9876;worker3:9876'
然后就可以正常执行了。
这个NameServer地址的读取方式见源码中org.apache.rocketmq.common.utils.NameServerAddressUtils
public static String getNameServerAddresses() { return System.getProperty("rocketmq.namesrv.addr", System.getenv("NAMESRV_ADDR")); }
这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。
8、搭建管理控制台 jar包运行这个tools.sh就封装了一个简单的运行RocketMQ的环境,可以运行源码中的其他示例,然后自己的例子也可以放到RocketMQ的lib目录下去执行。
RocketMQ源代码中并没有提供控制台,但是有一个Rocket的社区扩展项目中提供了一个控制台,地址:https://github.com/apache/rocketmq-externals
下载下来后,进入其中的rocket-console目录,使用maven进行编译
mvn clean package -Dmaven.test.skip=true
编译完成后,获取target下的jar包,就可以直接执行。但是这个时候要注意,在这个项目的application.properties中需要指定nameserver的地址。默认这个属性是空的。
那我们可以在jar包的当前目录下增加一个application.properties文件,覆盖jar包中默认的一个属性:
rocketmq.config.namesrvAddr=worker1:9876;worker2:9876;worker3:9876
然后执行:
java -jar rocketmq-console-ng-1.0.1.jar
启动完成后,可以访问 http://ip:8080看到管理页面
在管理页面的右上角可以选择语言。
这里我使用的是docker进行直接拉取镜像运行的方式,二行命令就搞定了
docker pull styletang/rocketmq-console-ngdocker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=139.224.233.121:9876 -Drocketmq.config.isVIPChannel=false" -p8080:8080 -t styletang/rocketmq-console-ng9、Dleger高可用集群搭建
通过这种方式,我们搭建了一个主从结构的RocketMQ集群,但是我们要注意,这种主从结构是只做数据备份,没有容灾功能的。也就是说当一个master节点挂了后,slave节点是无法切换成master节点继续提供服务的。注意这个集群至少要是3台,允许少于一半的节点发生故障。
如果slave挂了,对集群的影响不会很大,因为slave只是做数据备份的。但是影响也是会有的,例如,当消费者要拉取的数据量比较大时,RocketMQ有一定的机制会优先保证Master节点的性能,只让Master节点返回一小部分数据,而让其他部分的数据从slave节点去拉取。另外,需要注意,Dleger会有他自己的CommitLog机制,也就是说,使用主从集群累计下来的消息,是无法转移到Dleger集群中的。
而如果要进行高可用的容灾备份,需要采用Dledger的方式来搭建高可用集群。注意,这个Dledger需要在RocketMQ4.5以后的版本才支持,我们使用的4.7.1版本已经默认集成了dledger。搭建方法
要搭建高可用的Broker集群,我们只需要配置conf/dleger下的配置文件就行。
这种模式是基于Raft协议的,是一个类似于Zookeeper的paxos协议的选举协议,也是会在集群中随机选举出一个leader,其他的就是follower。只是他选举的过程跟paxos有点不同。Raft协议基于随机休眠机制的,选举过程会比paxos相对慢一点。
首先:[我们同样是需要修改runserver.sh和runbroker.sh](http://xn--runserver-947nw2gmts9m8bkijnsdy1kk96m302b.xn--shrunbroker-804s.sh),对JVM内存进行定制。然后:我们需要修改conf/dleger下的配置文件。 跟dleger相关的几个配置项如下:![图片.png](https://cdn.nlark.com/yuque/0/2022/png/22437817/1641722153880-9b61ef4d-5ade-44ad-b09e-ad33a5547398.png#clientId=ude9a8954-70ff-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=300&id=u1aff0111&margin=%5Bobject%20Object%5D&name=%E5%9B%BE%E7%89%87.png&originHeight=300&originWidth=993&originalType=binary&ratio=1&rotation=0&showTitle=false&size=17274&status=done&style=none&taskId=u553d5987-e4a8-4750-adab-f4acaf8126a&title=&width=993)配置完后,同样是使用 nohup bin/mqbroker -c $conf_name & 的方式指定实例文件。
10、调整系统参数在bin/dleger下有个fast-try.sh,这个脚本是在本地启动三个RocketMQ实例,搭建一个高可用的集群,读取的就是conf/dleger下的broker-no.conf,broker-n1.conf和broker-n2.conf。使用这个脚本同样要注意定制下JVM内存,他给每个实例默认定制的是1G内存,虚拟机肯定是不够的。
这种单机三实例的集群搭建完成后,可以使用 bin/mqadmin clusterList -n worker1.conf的方式查看集群状态。
单机状态下一般一次主从切换需要大概10S的时间。
到这里,我们的整个RocketMQ的服务就搭建完成了。但是在实际使用时,我们说RocketMQ的吞吐量、性能都很高,那要发挥RocketMQ的高性能,还需要对RocketMQ以及服务器的性能进行定制
之前提到过,在runserver.sh中需要定制nameserver的内存大小,在runbroker.sh中需要定制broker的内存大小。这些默认的配置可以认为都是经过检验的最优化配置,但是在实际情况中都还需要根据服务器的实际情况进行调整。这里以runbroker.sh中对G1GC的配置举例,在runbroker.sh中的关键配置:
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
-XX:+UseG1GC: 使用G1垃圾回收器, -XX:G1HeapRegionSize=16m 将G1的region块大小设为16M,-XX:G1ReservePercent:在G1的老年代中预留25%空闲内存,这个默认值是10%,RocketMQ把这个参数调大了。-XX:InitiatingHeapOccupancyPercent=30:当堆内存的使用率达到30%之后就会启动G1垃圾回收器尝试回收垃圾,默认值是45%,RocketMQ把这个参数调小了,也就是提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收时间太长的问题。
然后,后面定制了GC的日志文件,确定GC日志文件的地址、打印的内容以及控制每个日志文件的大小为30M并且只保留5个文件。这些在进行性能检验时,是相当重要的参考内容。
例如在conf/dleger/broker-n0.conf中有一个参数:sendMessageThreadPoolNums=16。这一个参数是表明RocketMQ内部用来发送消息的线程池的线程数量是16个,其实这个参数可以根据机器的CPU核心数进行适当调整,例如如果你的机器核心数超过16个,就可以把这个参数适当调大。
我们在部署RocketMQ的时候,还需要对Linux内核参数进行一定的定制。例如
ulimit,需要进行大量的网络通信和磁盘IO。vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。
3、RocketMQ的其他参考资料这些参数在CentOS7中的配置文件都在 /proc/sys/vm目录下。
另外,RocketMQ的bin目录下有个os.sh里面设置了RocketMQ建议的系统内核参数,可以根据情况进行调整。
还记得我们之前把RocketMQ的源代码也下载下来了吗?我们现在不需要去看源代码,但是在源码中有个docs目录,里面有非常有用的资料。例如,在他的docs/cn/architecture.md文档中,有对RocketMQ架构的更详细的介绍。这里面的内容就不再搬运了,我们直接看看把。
4、小总结我们要对MQ的优缺点以及适用场景开始要有逐渐清晰的概念。成熟的MQ产品上手使用都很简单,所以,使用和面试的重点从来都不会是怎么编程,而是能结合项目场景完整落地,这才是考验程序员功力的地方。而这个功力的要点就在于对异步消息驱动场景的理解深度。
然后,我们要对RocketMQ整体的产品架构以及应用生态有个大致的了解。商业版本的RocketMQ提供了购买即用的高可用特性,并且功能也比开源版本略有改进。而在RocketMQ的开源版本之外,围绕RocketMQ的扩展生态包括管理控制台,大都整合在了rocketmq-externals社区项目中。关于RocketMQ的周边生态,其实跟kafka和RabbitMQ还是有差距的,但是RocketMQ相比这两个产品,不管是开发语言还是架构思维,对我们都更为友好,而且周边生态发展也有后发优势,所以对RocketMQ要抱着学习,改进的态度,从点到面横向拓宽技术视野。
最后,我们要对RocketMQ的整体架构有一个全面的了解。并且在后续的细节学习时,要保持对第一个问题的好奇心。
使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里我们用SpringBoot来搭建一系列消息生产者和消息消费者,来访问我们之前搭建的RocketMQ集群。
1、测试环境搭建首先创建一个基于Maven的SpringBoot工程,引入如下依赖:
org.apache.rocketmq rocketmq-client4.7.1
另外还与一些依赖,例如openmessage、acl等扩展功能还需要添加对应的依赖。具体可以参见RocketMQ源码中的example模块。在RocketMQ源码包中的example模块提供了非常详尽的测试代码,也可以拿来直接调试。我们这里就用源码包中的示例来连接我们自己搭建的RocketMQ集群来进行演示。
RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较老,但是还是都可以运行的。所以我们还是以官网上的顺序进行学习。
但是在调试这些代码的时候要注意一个问题:这些测试代码中的生产者和消费者都需要依赖NameServer才能运行,只需要将NameServer指向我们自己搭建的RocketMQ集群,而不需要管Broker在哪里,就可以连接我们自己的自己的RocketMQ集群。而RocketMQ提供的生产者和消费者寻找NameServer的方式有两种:
1、在代码中指定namesrvAddr属性。例如:consumer.setNamesrvAddr(“127.0.0.1:9876”);
2、通过NAMESRV_ADDR环境变量来指定。多个NameServer之间用分号连接。
然后RocketMQ的生产者和消费者的编程模型都是有个比较固定的步骤的,掌握这个固定的步骤,对于我们学习源码以及以后使用都是很有帮助的。
消息发送者的固定步骤消息消费者的固定步骤1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
3、RocketMQ的消息样例1创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
那我们来逐一连接下RocketMQ都支持哪些类型的消息:
3.1 基本样例基本样例部分我们使用消息生产者分别通过三种方式发送消息,同步发送、异步发送以及单向发送。
然后使用消费者来消费这些消息。
1、同步发送消息的样例见:org.apache.rocketmq.example.simple.Producer2、异步发送消息的样例见:org.apache.rocketmq.example.simple.AsyncProducer等待消息返回后再继续进行下面的 *** 作。
3、单向发送消息的样例:这个示例有个比较有趣的地方就是引入了一个countDownLatch来保证所有消息回调方法都执行完了再关闭Producer。 所以从这里可以看出,RocketMQ的Producer也是一个服务端,在往Broker发送消息的时候也要作为服务端提供服务。
public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //Call send message to deliver message to one of brokers. producer.sendoneway(msg); } //Wait for sending to complete Thread.sleep(5000); producer.shutdown(); }}
4、使用消费者消费消息。关键点就是使用producer.sendOneWay方式来发送消息,这个方法没有返回值,也没有回调。就是只管把消息发出去就行了。
消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。
拉模式的样例见:org.apache.rocketmq.example.simple.PullConsumer
推模式的样例见:org.apache.rocketmq.example.simple.PushConsumer
3.2 顺序消息通常情况下,用推模式比较简单。
实际上RocketMQ的推模式也是由拉模式封装出来的。
4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。
顺序消息生产者样例见:org.apache.rocketmq.example.order.Producer
顺序消息消费者样例见:org.apache.rocketmq.example.order.Consumer
验证时,可以启动多个Consumer实例,观察下每一个订单的消息分配以及每个订单下多个步骤的消费顺序。
不管订单在多个Consumer实例之前是如何分配的,每个订单下的多条消息顺序都是固定从0~5的。
RocketMQ保证的是消息的局部有序,而不是全局有序。
先从控制台上看下List mqs是什么。
再回看我们的样例,实际上,RocketMQ也只保证了每个OrderID的所有消息有序(发到了同一个queue),而并不能保证所有消息都有序。所以这就涉及到了RocketMQ消息有序的原理。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。
首先在发送者端:在默认情况下,消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),而消费者消费的时候也从多个MessageQueue上拉取消息,这种情况下消息是不能保证顺序的。而只有当一组有序的消息发送到同一个MessageQueue上时,才能利用MessageQueue先进先出的特性保证这一组消息有序。
而Broker中一个队列内的消息是可以保证有序的。
3.3 广播消息然后在消费者端:消费者会从多个消息队列上去拿消息。这时虽然每个消息队列上的消息是有序的,但是多个队列之间的消息仍然是乱序的。消费者端要保证消息有序,就需要按队列一个一个来取消息,即取完一个队列的消息后,再去取下一个队列的消息。而给consumer注入的MessageListenerOrderly对象,在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。MessageListenerConcurrently这个消息监听器则不会锁队列,每次都是从多个Message中取一批数据(默认不超过32条)。因此也无法保证消息有序。
广播消息的消息生产者样例见:org.apache.rocketmq.example.broadcast.PushConsumer
3.4 延迟消息广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态(MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。
延迟消息的生产者案例
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown(); } }
延迟消息实现的效果就是在调用producer.send方法后,消息并不会立即发送出去,而是会等一段时间再发送出去。这是RocketMQ特有的一个功能。
那会延迟多久呢?延迟时间的设置就是在Message消息对象上设置一个延迟级别message.setDelayTimeLevel(3);
开源版本的RocketMQ中,对延迟消息并不支持任意时间的延迟设定(商业版本中支持),而是只支持18个固定的延迟级别,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。这从哪里看出来的?其实从rocketmq-console控制台就能看出来。而这18个延迟级别也支持自行定义,不过一般情况下最好不要自定义修改。
3.5 批量消息那这么好用的延迟消息是怎么实现的?这18个延迟级别除了在延迟消息中用,还有什么地方用到了?别急,我们会在后面部分进行详细讲解。
批量消息是指将多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。
批量消息的消息生产者样例见:org.apache.rocketmq.example.batch.SimpleBatchProducer和org.apache.rocketmq.example.batch.SplitBatchProducer
相信大家在官网以及测试代码中都看到了关键的注释:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB
3.6 过滤消息实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。但是使用批量消息时,这个消息长度确实是必须考虑的一个问题。而且批量消息的使用是有一定限制的,这些消息应该有相同的Topic,相同的waitStoreMsgOK。而且不能是延迟消息、事务消息等。
在大多数情况下,可以使用Message的Tag属性来简单快速的过滤信息。
使用Tag过滤消息的消息生产者案例见:org.apache.rocketmq.example.filter.TagFilterProducer
使用Tag过滤消息的消息消费者案例见:org.apache.rocketmq.example.filter.TagFilterConsumer
主要是看消息消费者。consumer.subscribe(“TagFilterTest”, “TagA || TagC”); 这句只订阅TagA和TagC的消息。
TAG是RocketMQ中特有的一个消息属性。RocketMQ的最佳实践中就建议,使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用TAG来区分。
但是,这种方式有一个很大的限制,就是一个消息只能有一个TAG,这在一些比较复杂的场景就有点不足了。 这时候,可以使用SQL表达式来对消息进行过滤。
SQL过滤的消息生产者案例见:org.apache.rocketmq.example.filter.SqlFilterProducer
SQL过滤的消息消费者案例见:org.apache.rocketmq.example.filter.SqlFilterConsumer
这个模式的关键是在消费者端使用MessageSelector.bySql(String sql)返回的一个MessageSelector。这里面的sql语句是按照SQL92标准来执行的。sql中可以使用的参数有默认的TAGS和一个在生产者中加入的a属性。
SQL92语法:
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
数值比较,比如:>,>=,<,<=,BETWEEN,=; 字符比较,比如:=,<>,IN; IS NULL 或者 IS NOT NULL; 逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415; 字符,比如:'abc',必须用单引号包裹起来; NULL,特殊的常量 布尔值,TRUE 或 FALSE
使用注意:只有推模式的消费者可以使用SQL过滤。拉模式是用不了的。
3.7 事务消息大家想一下,这个消息过滤是在Broker端进行的还是在Consumer端进行的?
这个事务消息是RocketMQ提供的一个非常有特色的功能,需要着重理解。
首先,我们了解下什么是事务消息。官网的介绍是:事务消息是在分布式系统中保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两个 *** 作的原子性,也就是这两个 *** 作一起成功或者一起失败。
其次,我们来理解下事务消息的编程模型。事务消息只保证消息发送者的本地事务与发消息这两个 *** 作的原子性,因此,事务消息的示例只涉及到消息发送者,对于消息消费者来说,并没有什么特别的。
事务消息生产者的案例见:org.apache.rocketmq.example.transaction.TransactionProducer
事务消息的关键是在TransactionMQProducer中指定了一个TransactionListener事务监听器,这个事务监听器就是事务消息的关键控制器。源码中的案例有点复杂,我这里准备了一个更清晰明了的事务监听器示例。
public class TransactionListenerImpl implements TransactionListener { //在提交完事务消息后执行。 //返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。 //返回ROLLBACK_MESSAGE状态的消息会被丢弃。 //返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String tags = msg.getTags(); //TagA的消息会立即被消费者消费到 if(StringUtils.contains(tags,"TagA")){ return LocalTransactionState.COMMIT_MESSAGE; //TagB的消息会被丢弃 }else if(StringUtils.contains(tags,"TagB")){ return LocalTransactionState.ROLLBACK_MESSAGE; //其他消息会等待Broker进行事务状态回查。 }else{ return LocalTransactionState.UNKNOW; } } //在对UNKNOWN状态的消息进行状态回查时执行。返回的结果是一样的。 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String tags = msg.getTags(); //TagC的消息过一段时间会被消费者消费到 if(StringUtils.contains(tags,"TagC")){ return LocalTransactionState.COMMIT_MESSAGE; //TagD的消息也会在状态回查时被丢弃掉 }else if(StringUtils.contains(tags,"TagD")){ return LocalTransactionState.ROLLBACK_MESSAGE; //剩下TagE的消息会在多次状态回查后最终丢弃 }else{ return LocalTransactionState.UNKNOW; } }}
然后,我们要了解下事务消息的使用限制:
1、事务消息不支持延迟消息和批量消息。
2、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
回查次数是由BrokerConfig.transactionCheckMax这个参数来配置的,默认15次,可以在broker.conf中覆盖。 然后实际的检查次数会在message中保存一个用户属性MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES。这个属性值大于transactionCheckMax,就会丢弃。 这个用户属性值会按回查次数递增,也可以在Producer中自行覆盖这个属性。
3、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SEConDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
由BrokerConfig.transactionTimeOut这个参数来配置。默认6秒,可以在broker.conf中进行修改。 另外,也可以给消息配置一个MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性来给消息指定一个特定的消息回查时间。 msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, “10000”); 这样就是10秒。
4、事务性消息可能不止一次被检查或消费。
5、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
接下来,我们还要了解下事务消息的实现机制,参见下图:
![RocketMQ事务消息.png](https://img-blog.csdnimg.cn/img_convert/93e4f4867091137c7f200b82e2a558b8.png#clientId=ude9a8954-70ff-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=ua67610b3&margin=[object Object]&name=RocketMQ事务消息.png&originHeight=338&originWidth=751&originalType=binary&ratio=1&rotation=0&showTitle=false&size=31878&status=done&style=none&taskId=ua03a6b23-c489-4e71-bb77-279ce40a96a&title=)
事务消息机制的关键是在发送消息时,会将消息转为一个half半消息,并存入RocketMQ内部的一个 RMQ_SYS_TRANS_HALF_TOPIC 这个Topic,这样对消费者是不可见的。再经过一系列事务检查通过后,再将消息转存到目标Topic,这样对消费者就可见了。
最后,我们还需要思考下事务消息的作用。
大家想一下这个事务消息跟分布式事务有什么关系?为什么扯到了分布式事务相关的两阶段提交上了?事务消息只保证了发送者本地事务和发送消息这两个 *** 作的原子性,但是并不保证消费者本地事务的原子性,所以,事务消息只保证了分布式事务的一半。但是即使这样,对于复杂的分布式事务,RocketMQ提供的事务消息也是目前业内最佳的降级方案。
权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在$ROCKETMQ_HOME/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常; ACL客户端可以参考:org.apache.rocketmq.example.simple包下面的AclClient代码。
注意,如果要在自己的客户端中使用RocketMQ的ACL功能,还需要引入一个单独的依赖包。
org.apache.rocketmq rocketmq-acl4.7.1
而Broker端具体的配置信息可以参见源码包下docs/cn/acl/user_guide.md。主要是在broker.conf中打开acl的标志:aclEnable=true。然后就可以用plain_acl.yml来进行权限配置了。并且这个配置文件是热加载的,也就是说要修改配置时,只要修改配置文件就可以了,不用重启Broker服务。我们来简单分析下源码中的plan_acl.yml的配置:
#全局白名单,不受ACL控制#通常需要将主从架构中的所有节点加进来globalWhiteRemoteAddresses:- 10.10.103.*- 192.168.0.*accounts:#第一个账户- accessKey: RocketMQ secretKey: 123456789 whiteRemoteAddress: admin: false defaultTopicPerm: DENY #默认Topic访问策略是拒绝 defaultGroupPerm: SUB #默认Group访问策略是只允许订阅 topicPerms: - topicA=DENY #topicA拒绝 - topicB=PUB|SUB #topicB允许发布和订阅消息 - topicC=SUB #topicC只允许订阅 groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB#第二个账户,只要是来自192.168.2.*的IP,就可以访问所有资源- accessKey: rocketmq2 secretKey: 123456789 whiteRemoteAddress: 192.168.2.* # if it is admin, it could access all resources admin: true十、SpringBoot整合RocketMQ 1、快速实战
这部分我们看下SpringBoot如何快速集成RocketMQ。
在使用SpringBoot的starter集成包时,要特别注意版本。因为SpringBoot集成RocketMQ的starter依赖是由Spring社区提供的,目前正在快速迭代的过程当中,不同版本之间的差距非常大,甚至基础的底层对象都会经常有改动。例如如果使用rocketmq-spring-boot-starter:2.0.4版本开发的代码,升级到目前最新的rocketmq-spring-boot-starter:2.1.1后,基本就用不了了。
我们创建一个maven工程,引入关键依赖:
org.apache.rocketmq rocketmq-spring-boot-starter2.1.1 org.springframework.boot spring-boot-starterorg.springframework spring-coreorg.springframework spring-webmvcorg.springframework.boot spring-boot-starter-web2.1.6.RELEASE io.springfox springfox-swagger-ui2.9.2 io.springfox springfox-swagger22.9.2
rocketmq-spring-boot-starter:2.1.1引入的SpringBoot包版本是2.0.5.RELEASE,这里把SpringBoot的依赖包升级了一下。
然后我们以SpringBoot的方式,快速创建一个简单的Demo
启动类:
@SpringBootApplicationpublic class RocketMQScApplication { public static void main(String[] args) { SpringApplication.run(RocketMQScApplication.class,args); }}
配置文件 application.properties
#NameServer地址rocketmq.name-server=192.168.211.985:9876#默认的消息生产者组rocketmq.producer.group=springBootGroup
消息生产者
package com.roy.rocket.basic;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.io.UnsupportedEncodingException;@Componentpublic class SpringProducer { @Resource private RocketMQTemplate rocketMQTemplate; //发送普通消息的示例 public void sendMessage(String topic,String msg){ this.rocketMQTemplate.convertAndSend(topic,msg); } //发送事务消息的示例 public void sendMessageInTransaction(String topic,String msg) throws InterruptedException { String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { Messagemessage = MessageBuilder.withPayload(msg).build(); String destination =topic+":"+tags[i % tags.length]; SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(destination, message,destination); System.out.printf("%s%n", sendResult); Thread.sleep(10); } }}
消息消费者
package com.roy.rocket.basic;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")public class SpringConsumer implements RocketMQListener{ @Override public void onMessage(String message) { System.out.println("Received message : "+ message); }}
SpringBoot集成RocketMQ,消费者部分的核心就在这个@RocketMQMessageListener注解上。所有消费者的核心功能也都会集成到这个注解中。所以我们还要注意下这个注解里面的属性:
例如:消息过滤可以由里面的selectorType属性和selectorexpression来定制
消息有序消费还是并发消费则由consumeMode属性定制。
消费者是集群部署还是广播部署由messageModel属性定制。
然后关于事务消息,还需要配置一个事务消息监听器:
package com.roy.rocket.config;import org.apache.commons.lang3.StringUtils;import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.support.RocketMQUtil;import org.springframework.messaging.Message;import org.springframework.messaging.converter.StringMessageConverter;import java.util.concurrent.ConcurrentHashMap;@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")public class MyTransactionImpl implements RocketMQLocalTransactionListener { private ConcurrentHashMap
这样我们启动应用后,就能够通过访问 http://localhost:8080/MQTest/sendMessage?message=123 接口来发送一条简单消息。并在SpringConsumer中消费到。
也可以通过访问http://localhost:8080/MQTest/sendTransactionMessage?message=123 ,来发送一条事务消息。
2、总结这里可以看到,对事务消息,SpringBoot进行封装时,就缺少了transactionId,这在事务控制中是非常关键的。
十一、SpringCloudStream整合RocketMQSpringBoot 引入org.apache.rocketmq:rocketmq-spring-boot-starter依赖后,就可以通过内置的RocketMQTemplate来与RocketMQ交互。相关属性都以rockemq.开头。具体所有的配置信息可以参见org.apache.rocketmq.spring.autoconfigure.RocketMQProperties这个类。
SpringBoot依赖中的Message对象和RocketMQ-client中的Message对象是两个不同的对象,这在使用的时候要非常容易弄错。例如RocketMQ-client中的Message里的TAG属性,在SpringBoot依赖中的Message中就没有。Tag属性被移到了发送目标中,与Topic一起,以Topic:Tag的方式指定。
最后强调一次,一定要注意版本。rocketmq-spring-boot-starter的更新进度一般都会略慢于RocketMQ的版本更新,并且版本不同会引发很多奇怪的问题。apache有一个官方的rocketmq-spring示例,地址:https://github.com/apache/rocketmq-spring.git 以后如果版本更新了,可以参考下这个示例代码。
SpringCloudStream是Spring社区提供的一个统一的消息驱动框架,目的是想要以一个统一的编程模型来对接所有的MQ消息中间件产品。我们还是来看看SpringCloudStream如何来集成RocketMQ。
创建Maven工程,引入依赖:
org.apache.rocketmq rocketmq-client4.7.1 org.apache.rocketmq rocketmq-acl4.7.1 com.alibaba.cloud spring-cloud-starter-stream-rocketmq2.2.3.RELEASE org.apache.rocketmq rocketmq-clientorg.apache.rocketmq rocketmq-aclorg.springframework.boot spring-boot-starter-web2.3.3.RELEASE
应用启动类:
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.cloud.stream.messaging.Source;@EnableBinding({Source.class, Sink.class})@SpringBootApplicationpublic class ScRocketMQApplication { public static void main(String[] args) { SpringApplication.run(ScRocketMQApplication.class,args); }}
注意这个@EnableBinding({Source.class, Sink.class})注解,这是SpringCloudStream引入的Binder配置。
然后增加配置文件application.properties
#ScStream通用的配置以spring.cloud.stream开头spring.cloud.stream.bindings.input.destination=TestTopicspring.cloud.stream.bindings.input.group=scGroupspring.cloud.stream.bindings.output.destination=TestTopic#rocketMQ的个性化配置以spring.cloud.stream.rocketmq开头#spring.cloud.stream.rocketmq.binder.name-server=192.168.211.127:9876;192.168.211.128:9876;192.168.211.129:9876spring.cloud.stream.rocketmq.binder.name-server=192.168.211.129:9876
SpringCloudStream中,一个binding对应一个消息通道。这其中配置的input,是在Sink.class中定义的,对应一个消息消费者。而output,是在Source.class中定义的,对应一个消息生产者。
然后就可以增加消息消费者:
package com.roy.scrocket.basic;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.stereotype.Component;@Componentpublic class ScConsumer { @StreamListener(Sink.INPUT) public void onMessage(String messsage){ System.out.println("received message:"+messsage+" from binding:"+ Sink.INPUT); }}
消息生产者:
package com.roy.scrocket.basic;import org.apache.rocketmq.common.message.MessageConst;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHeaders;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.HashMap;import java.util.Map;@Componentpublic class ScProducer { @Resource private Source source; public void sendMessage(String msg){ Mapheaders = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS, "testTag"); MessageHeaders messageHeaders = new MessageHeaders(headers); Message message = MessageBuilder.createMessage(msg, messageHeaders); this.source.output().send(message); }}
最后增加一个Controller类用于测试:
package com.roy.scrocket.controller;import com.roy.scrocket.basic.ScProducer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;@RestController@RequestMapping("/MQTest")public class MQTestController { @Resource private ScProducer producer; @RequestMapping("/sendMessage") public String sendMessage(String message){ producer.sendMessage(message); return "消息发送完成"; }}
启动应用后,就可以访问http://localhost:8080/MQTest/sendMessage?message=123,给RocketMQ发送一条消息到TestTopic,并在ScConsumer中消费到了。
关于SpringCloudStream。这是一套几乎通用的消息中间件编程框架,例如从对接RocketMQ换到对接Kafka,业务代码几乎不需要动,只需要更换pom依赖并且修改配置文件就行了。但是,由于各个MQ产品都有自己的业务模型,差距非常大,所以使用使用SpringCloudStream时要注意业务模型转换。并且在实际使用中,要非常注意各个MQ的个性化配置属性。例如RocketMQ的个性化属性都是以spring.cloud.stream.rocketmq开头,只有通过这些属性才能用上RocketMQ的延迟消息、排序消息、事务消息等个性化功能。SpringCloudStream是Spring社区提供的一套统一框架,但是官方目前只封装了kafka、kafka Stream、RabbitMQ的具体依赖。而RocketMQ的依赖是交由厂商自己维护的,也就是由阿里巴巴自己来维护。这个维护力度显然是有不小差距的。所以一方面可以看到之前在使用SpringBoot时着重强调的版本问题,在使用SpringCloudStream中被放大了很多。spring-cloud-starter-stream-rocketmq目前最新的2.2.3.RELEASE版本中包含的rocketmq-client版本还是4.4.0。这个差距就非常大了。另一方面,RocketMQ这帮大神不屑于写文档的问题也特别严重,SpringCloudStream中关于RocketMQ的个性化配置几乎很难找到完整的文档。总之,对于RocketMQ来说,SpringCloudStream目前来说还并不是一个非常好的集成方案。这方面跟kafka和Rabbit还没法比。所以使用时要慎重。
十二、高级原理
这一部分我们先来总结下RocketMQ的一些重要的基础概念:
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
生产者中,会把同一类Producer组成一个集合,叫做生产者组,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
4 主题(Topic)集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。MessageQueue是生产者发送消息与消费者消费消息的最小单位。
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Broker Server是RocketMQ真正的业务核心,包含了多个重要的子模块:
Remoting Module:整个Broker的实体,负责处理来自clients端的请求。Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
而Broker Server要保证高可用需要搭建主从集群架构。RocketMQ中有两种Broker架构模式:
普通集群:
这种集群模式下会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同步和异步同步。
这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用了。
Dledger高可用集群:
Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。这个模式下的集群会随机选出一个节点作为master,而当master节点挂了后,会从slave中自动选出一个节点升级成为master。
Dledger技术做的事情:
1、接管Broker的CommitLog消息存储
2、从集群中选举出master节点
3、完成master节点往slave节点的消息同步。
Dledger的关键部分是在他的节点选举上。Dledger是使用Raft算法来进行节点选举的。这里简单介绍下Raft算法的选举过程:
首先:每个节点有三个状态,Leader,follower和candidate(候选人)。正常运行的情况下,集群中会有一个leader,其他都是follower,follower只响应Leader和Candidate的请求,而客户端的请求全部由Leader处理,即使有客户端请求到了一个follower,也会将请求转发到leader。
集群刚启动时,每个节点都是follower状态,之后集群内部会发送一个timeout信号,所有follower就转成candidate去拉取选票,获得大多数选票的节点选为leader,其他候选人转为follower。如果一个timeout信号发出时,没有选出leader,将会重新开始一次新的选举。而Leader节点会往其他节点发送心跳信号,确认他的leader状态。
然后会启动定时器,如果在指定时间内没有收到Leader的心跳,就会转为Candidate状态,然后向其他成员发起投票请求,如果收到半数以上成员的投票,则Canddate会晋升为Leader。然后leader也有可能会退化成follower。
然后,在Raft协议中,会将时间分为一些任意时间长度的时间片段,叫做term。term会使用一个全局唯一,连续递增的编号作为标识,也就是起到了一个逻辑时钟的作用。
![RaftTerms.png](https://img-blog.csdnimg.cn/img_convert/984562cd210cc12bf898ea586ed43694.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u40dea1ea&margin=[object Object]&name=RaftTerms.png&originHeight=307&originWidth=994&originalType=binary&ratio=1&rotation=0&showTitle=false&size=23925&status=done&style=none&taskId=u83be2477-5995-40ef-b4db-8cc1adaae5b&title=)RaftTerms
在每一个term时间片里,都会进行新的选举,每一个Candidate都会努力争取成为leader。获得票数最多的节点就会被选举为Leader。被选为Leader的这个节点,在一个term时间片里就会保持leader状态。这样,就会保证在同一时间段内,集群中只会有一个Leader。在某些情况下,选票可能会被各个节点瓜分,形成不了多数派,那这个term可能直到结束都没有leader,直到下一个term再重新发起选举,这也就没有了Zookeeper中的脑裂问题。而在每次重新选举的过程中, leader也有可能会退化成为follower。也就是说,在这个集群中, leader节点是会不断变化的。
然后,每次选举的过程中,每个节点都会存储当前term编号,并在节点之间进行交流时,都会带上自己的term编号。如果一个节点发现他的编号比另外一个小,那么他就会将自己的编号更新为较大的那一个。而如果leader或者candidate发现自己的编号不是最新的,他就会自动转成follower。如果接收到的请求term编号小于自己的编号,term将会拒绝执行。
在选举过程中,Raft协议会通过心跳机制发起leader选举。节点都是从follower状态开始的,如果收到了来自leader或者candidate的心跳RPC请求,那他就会保持follower状态,避免争抢成为candidate。而leader会往其他节点发送心跳信号,来确认自己的地位。如果follower一段时间(两个timeout信号)内没有收到Leader的心跳信号,他就会认为leader挂了,发起新一轮选举。
选举开始后,每个follower会增加自己当前的term,并将自己转为candidate。然后向其他节点发起投票请求,请求时会带上自己的编号和term,也就是说都会默认投自己一票。之后candidate状态可能会发生以下三种变化:
赢得选举,成为leader: 如果它在一个term内收到了大多数的选票,将会在接下的剩余term时间内称为leader,然后就可以通过发送心跳确立自己的地位。(每一个server在一个term内只能投一张选票,并且按照先到先得的原则投出)
其他节点成为leader: 在等待投票时,可能会收到其他server发出心跳信号,说明其他leader已经产生了。这时通过比较自己的term编号和RPC过来的term编号,如果比对方大,说明leader的term过期了,就会拒绝该RPC,并继续保持候选人身份; 如果对方编号不比自己小,则承认对方的地位,转为follower。
选票被瓜分,选举失败: 如果没有candidate获取大多数选票, 则没有leader产生, candidate们等待超时后发起另一轮选举. 为了防止下一次选票还被瓜分,必须采取一些额外的措施, raft采用随机election timeout(随机休眠时间)的机制防止选票被持续瓜分。通过将timeout随机设为一段区间上的某个值, 因此很大概率会有某个candidate率先超时然后赢得大部分选票。
所以以三个节点的集群为例,选举过程会是这样的:
集群启动时,三个节点都是follower,发起投票后,三个节点都会给自己投票。这样一轮投票下来,三个节点的term都是1,是一样的,这样是选举不出Leader的。
当一轮投票选举不出Leader后,三个节点会进入随机休眠,例如A休眠1秒,B休眠3秒,C休眠2秒。
一秒后,A节点醒来,会把自己的term加一票,投为2。然后2秒时,C节点醒来,发现A的term已经是2,比自己的1大,就会承认A是Leader,把自己的term也更新为2。实际上这个时候,A已经获得了集群中的多数票,2票,A就会被选举成Leader。这样,一般经过很短的几轮选举,就会选举出一个Leader来。
到3秒时,B节点会醒来,他也同样会承认A的term最大,他是Leader,自己的term也会更新为2。这样集群中的所有Candidate就都确定成了leader和follower.
然后在一个任期内,A会不断发心跳给另外两个节点。当A挂了后,另外的节点没有收到A的心跳,就会都转化成Candidate状态,重新发起选举。
Dledger还会采用Raft协议进行多副本的消息同步:
简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。
Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。
再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。
最后,关于Dledger以及Raft协议的更底层的详细资料,后续会有一个分布式一致性协议的专题,将会结合其他分布式一致性算法做统一讲解,这里就不深入展开了。
6 名字服务(Name Server)名称服务充当路由消息的提供者。Broker Server会在启动时向所有的Name Server注册自己的服务信息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
这种特性也就意味着NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会有影响。当然,这里不考虑节点的负载情况。
7 消息(Message)消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
关于Message的更详细字段,在源码的docs/cn/best_practice.md中有详细介绍。
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。
MQ收到一条消息后,需要向生产者返回一个ACK响应,并将消息存储起来。MQ Push一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。如果没有标记为消费,MQ会不断的尝试往消费者推送这条消息。MQ需要定期删除一些过期的消息,这样才能保证服务一直可用。2、消息存储介质
RocketMQ采用的是类似于Kafka的文件存储机制,即直接用磁盘文件来保存消息,而不需要借助MySQL这一类索引工具。
2.1磁盘保存文件慢吗?磁盘如果使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。
2.2零拷贝技术加速文件读写Linux *** 作系统分为【用户态】和【内核态】,文件 *** 作、网络 *** 作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的 *** 作,实际进行了4 次数据复制,分别是:
从磁盘复制数据到内核态内存;
从内核态内存复 制到用户态内存;
然后从用户态 内存复制到网络驱动的内核态内存;
最后是从网络驱动的内核态内存复 制到网卡中进行传输。
而通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过NIO包中的MappedByteBuffer实现的。RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了
3 消息存储结构关于零拷贝,JAVA的NIO中提供了两种实现方式,mmap和sendfile,其中mmap适合比较小的文件,而sendfile适合传递比较大的文件。
RocketMQ消息的存储分为三个部分:
CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
整体的消息存储结构如下图:
消息存储结构
还记得我们在搭建集群时都特意指定的文件存储路径吗?现在可以上去看看这些文件都是什么样子。还有哪些落盘的文件?
另外还有几个文件可以了解下。
abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的 *** 作。
checkpoint:数据存盘检查点
4 刷盘机制config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。
RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失。同时这样才可以让存储的消息量可以超出内存的限制。RocketMQ为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时,有两种写磁盘的方式,同步刷盘和异步刷盘
同步刷盘和异步刷盘
同步刷盘:
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
异步刷盘:
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写 *** 作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
配置方式:
5 消息主从复制刷盘方式是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
如果Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从Master复制到Slave上。而消息复制的方式分为同步复制和异步复制。
同步复制异步复制同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。
在同步复制下,如果Master节点故障,Slave上有全部的数据备份,这样容易恢复数据。但是同步复制会增大数据写入的延迟,降低系统的吞吐量。
异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给Slave节点。
在异步复制下,系统拥有较低的延迟和较高的吞吐量。但是如果master节点故障,而有些数据没有完成复制,就会造成数据丢失。
配置方式:
消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
6 负载均衡 6.1Producer负载均衡Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。
发送者队列轮询
同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。
6.2 Consumer负载均衡Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种,分别对应AllocateMessageQueueStrategy下的六种实现类,可以在consumer中直接set来指定。默认情况下使用的是最简单的平均分配策略。
AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。
这个策略可以通过一个machineRoomResolver对象来定制Consumer和Broker的机房解析规则。然后还需要引入另外一个分配策略来对同机房的Broker和Consumer进行分配。一般也就用简单的平均分配策略或者轮询分配策略。
感觉这东西挺鸡肋的,直接给个属性指定机房不是挺好的吗。
源码中有测试代码AllocateMachineRoomNearByTest。
在示例中:Broker的机房指定方式: messageQueue.getBrokerName().split("-")[0],而Consumer的机房指定方式:clientID.split("-")[0]
clinetID的构建方式:见ClientConfig.buildMQClientId方法。按他的测试代码应该是要把clientIP指定为IDC1-CID-0这样的形式。
AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。
AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。
AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。
AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。
例如平均分配时的分配情况是这样的:
Consumer平均分配
2、广播模式广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。
7、消息重试首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。
而对于普通的消息,当消费者消费消息失败后,你可以通过设置返回状态达到消息重试的结果。
1、如何让消息进行重试
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:
返回Action.ReconsumeLater-推荐
返回null
抛出异常
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //处理消息 doConsumeMessage(message); //方式1:返回 Action.ReconsumeLater,消息将重试 return Action.ReconsumeLater; //方式2:返回 null,消息将重试 return null; //方式3:直接抛出异常, 消息将重试 throw new RuntimeException("Consumer Message exceotion"); }}
如果希望消费失败后不重试,可以直接返回Action.CommitMessage。
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage; return Action.CommitMessage; } //消息处理正常,直接返回 Action.CommitMessage; return Action.CommitMessage; }}
2、重试消息如何处理
重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的队列中。
![RocketMQ消息重试.png](https://img-blog.csdnimg.cn/img_convert/43096f31437413faf3f5633481f62066.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=ucc15a51d&margin=[object Object]&name=RocketMQ消息重试.png&originHeight=410&originWidth=1668&originalType=binary&ratio=1&rotation=0&showTitle=false&size=52370&status=done&style=none&taskId=u65609b07-2018-4241-9366-86e5c60dcee&title=)
RocketMQ消息重试
然后RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
![图片.png](https://img-blog.csdnimg.cn/img_convert/fabf720dd7d20be4174ef961c82f1b26.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=313&id=u99c9043a&margin=[object Object]&name=图片.png&originHeight=313&originWidth=507&originalType=binary&ratio=1&rotation=0&showTitle=false&size=9631&status=done&style=none&taskId=ufe20c412-0fe3-42a2-9154-013d649aee5&title=&width=507)
这个重试时间跟延迟消息的延迟级别是对应的。不过取的是延迟级别的后16级别。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这个重试时间可以将源码中的org.apache.rocketmq.example.quickstart.Consumer里的消息监听器返回状态改为RECONSUME_LATER测试一下。
重试次数:
如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。
另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。
然后关于这个重试次数,RocketMQ可以进行定制。例如通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。
关于MessageId:
在老版本的RocketMQ中,一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。
但是在4.7.1版本中,每次重试MessageId都会重建。
配置覆盖:
消息最大重试次数的设置对相同GroupID下的所有Consumer实例有效。并且最后启动的Consumer会覆盖之前启动的Consumer的配置。
8、死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
死信队列的名称是%DLQ%+ConsumGroup
![RocketMQ死信队列.png](https://img-blog.csdnimg.cn/img_convert/cdec5dad23aaf2fe9544b1cae3c729b5.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u38e52f09&margin=[object Object]&name=RocketMQ死信队列.png&originHeight=241&originWidth=1705&originalType=binary&ratio=1&rotation=0&showTitle=false&size=32231&status=done&style=none&taskId=u7775c39b-5212-4449-9af0-cdd43ce2d33&title=)
RocketMQ死信队列
死信队列的特征:
一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
死信队列中的消息不会再被消费者正常消费。
死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
9、消息幂等 1、幂等的概念注:默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。
在MQ系统中,对于消息幂等有三种实现语义:
at most once 最多一次:每条消息最多只会被消费一次
at least once 至少一次:每条消息至少会被消费一次
exactly once 刚刚好一次:每条消息都只会确定的消费一次
这三种语义都有他适用的业务场景。
其中,at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。
而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
而这个exactly once是MQ中最理想也是最难保证的一种语义,需要有非常精细的设计才行。RocketMQ只能保证at least once,保证不了exactly once。所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性。
关于这个问题,官网上有明确的回答:
- Are messages delivered exactly once?
2、消息幂等的必要性RocketMQ ensures that all messages are delivered at least once. In most cases, the messages are not repeated
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
3、处理方式负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
- RocketMQ的官方Git仓库地址:https://github.com/apache/rocketmq 可以用git把项目clone下来或者直接下载代码包。也可以到RocketMQ的官方网站上下载指定版本的源码: http://rocketmq.apache.org/dowloading/releases/![源码下载.png](https://img-blog.csdnimg.cn/img_convert/fdf7d451a66f6c767c8b5f3899b50884.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u733cef29&margin=[object Object]&name=源码下载.png&originHeight=300&originWidth=816&originalType=binary&ratio=1&rotation=0&showTitle=false&size=36278&status=done&style=none&taskId=uc007d46b-f890-434b-aa80-410b8ab1de1&title=)下载后就可以解压导入到IDEA中进行解读了。我们只要注意下是下载的4.7.1版本就行了。
源码下很多的功能模块,很容易让人迷失方向,我们只关注下几个最为重要的模块:
broker: broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
example: RocketMQ 例代码
namesrv:NameServer实现相关类(NameServer启动进程)
store:消息存储实现相关类
各个模块的功能大都从名字上就能看懂。我们可以在有需要的时候再进去看源码。
但是这些模块有些东西还是要关注的。例如docs文件夹下的文档,以及各个模块下都有非常丰富的junit测试代码,这些都是非常有用的。
RocketMQ的源码中有个非常让人头疼的事情,就是他的代码注释几乎没有。为了帮助大家解读源码,我给大家准备了一个添加了自己注释的源码版本。 在配套资料当中。大家可以把这个版本导入IDEA来进行解读。
源码中对最为重要的注解设定了一个标记K1,相对不那么重要的注解设定了一个标记K2,而普通的注释就没有添加标记。大家可以在IDEA的TODO标签中配置这两个注解标记。
![注解版源码.png](https://img-blog.csdnimg.cn/img_convert/ac38b48e97b8dce081dbd45f26966fab.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u06147914&margin=[object Object]&name=注解版源码.png&originHeight=323&originWidth=594&originalType=binary&ratio=1&rotation=0&showTitle=false&size=38277&status=done&style=none&taskId=u8447c2de-2008-41a8-8d26-833b0eb08a0&title=)
将源码导入IDEA后,需要先对源码进行编译。编译指令 clean install -Dmaven.test.skip=true![源码调试.png](https://img-blog.csdnimg.cn/img_convert/849fc1f1584fd0ffca740e26d49cae6e.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=udf69e097&margin=[object Object]&name=源码调试.png&originHeight=208&originWidth=882&originalType=binary&ratio=1&rotation=0&showTitle=false&size=27079&status=done&style=none&taskId=u5e9f033e-53df-4c6d-b2aa-820a7b1175d&title=)
编译完成后就可以开始调试代码了。调试时需要按照以下步骤:
调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
![90码6.png](https://img-blog.csdnimg.cn/img_convert/4d7e1f8e35ec884723cdda541d0f371c.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=uc826ab62&margin=[object Object]&name=90码6.png&originHeight=127&originWidth=370&originalType=binary&ratio=1&rotation=0&showTitle=false&size=5026&status=done&style=none&taskId=u98f28244-5239-4e1d-99af-07a2115015f&title=)
注解版源码中已经复制好了。
3.1 启动nameServer
展开namesrv模块,运行NamesrvStartup类即可启动NameServer
启动时,会报错,提示需要配置一个ROCKETMQ_HOME环境变量。这个环境变量我们可以在机器上配置,跟配置JAVA_HOME环境变量一样。也可以在IDEA的运行环境中配置。目录指向源码目录即可。
![90码4.png](https://img-blog.csdnimg.cn/img_convert/c7382088f57b40d0162871bd00606466.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u87a9cdd8&margin=[object Object]&name=90码4.png&originHeight=232&originWidth=375&originalType=binary&ratio=1&rotation=0&showTitle=false&size=12536&status=done&style=none&taskId=u48dc6273-859f-45ce-9664-c1d9d494d10&title=)
![90码5.png](https://img-blog.csdnimg.cn/img_convert/cfa6cc921be302c9e9fd356e3b2dadbf.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=uade3f9e8&margin=[object Object]&name=90码5.png&originHeight=488&originWidth=1317&originalType=binary&ratio=1&rotation=0&showTitle=false&size=56043&status=done&style=none&taskId=u07e7fd9e-d96e-4196-940c-d90e5a0cd09&title=)
配置完成后,再次执行,看到以下日志内容,表示NameServer启动成功
The Name Server boot success. serializeType=JSON
启动Broker之前,我们需要先修改之前复制的broker.conf文件
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH # 自动创建Topic autoCreateTopicEnable=true # nameServ地址 namesrvAddr=127.0.0.1:9876 # 存储路径 storePathRootDir=E:RocketMQdatarocketmqdataDir # commitLog路径 storePathCommitLog=E:RocketMQdatarocketmqdataDircommitlog # 消息队列存储路径 storePathConsumeQueue=E:RocketMQdatarocketmqdataDirconsumequeue # 消息索引存储路径 storePathIndex=E:RocketMQdatarocketmqdataDirindex # checkpoint文件路径 storeCheckpoint=E:RocketMQdatarocketmqdataDircheckpoint # abort文件存储路径 abortFile=E:RocketMQdatarocketmqdataDirabort
然后Broker的启动类是broker模块下的BrokerStartup。
启动Broker时,同样需要ROCETMQ_HOME环境变量,并且还需要配置一个-c 参数,指向broker.conf配置文件。![90码8.png](https://img-blog.csdnimg.cn/img_convert/b62eac8b089043d36def25cc3d9501d6.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u9227fd1f&margin=[object Object]&name=90码8.png&originHeight=292&originWidth=1291&originalType=binary&ratio=1&rotation=0&showTitle=false&size=35701&status=done&style=none&taskId=u0c0bbf34-218e-437c-9e29-834eda4a32c&title=)
然后重新启动,即可启动Broker。
3.3 发送消息在源码的example模块下,提供了非常详细的测试代码。例如我们启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息。
但是在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。我们可以在源码中加一行代码指定NameServer
producer.setNamesrvAddr(“127.0.0.1:9876”);
然后就可以发送消息了。
我们可以使用同一模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息。运行时同样需要指定NameServer地址
consumer.setNamesrvAddr(“192.168.232.128:9876”);
这样整个调试环境就搭建好了。
下面我们可以一边调试一边讲解源码了。源码中大部分关键的地方都已经添加了注释,文档中就不做过多记录了。我们在看源码的时候,要注意,不要一看源码就一行行代码都逐步看,更不要期望一遍就把代码给看明白。这样会陷入到代码的复杂细节中,瞬间打击到放弃。
看源码时,需要用层层深入的方法。每一次阅读源码时,先了解程序执行的流程性代码,略过服务实现的细节性代码,形成大概的概念框架。然后再回头按同样的方法,逐步深入到之前略过的代码。这样才能从源码中看出一点门道来。
NameServer的启动入口为NamesrvStartup类的main方法,我们可以进行逐步调试。这次看源码,我们不要太过陷入其中的细节,我们的目的是先搞清楚NameServer的大体架构。
1、核心问题从之前的介绍中,我们已经了解到,在RocketMQ中,实际进行消息存储、推送等核心功能的是Broker。那NameServer具体做什么用呢?NameServer的核心作用其实就只有两个,一是维护Broker的服务地址并进行及时的更新。二是给Producer和Consumer提供服务获取Broker列表。
整体的流程:
整个NameServer的核心就是一个NamesrvController对象。这个controller对象就跟java Web开发中的Controller功能类似,都是响应客户端请求的。
在创建NamesrvController对象时,有两个关键的配置文件NamesrvConfig这个是NameServer自己运行需要的配置信息,还一个NettyServerConfig包含Netty服务端的配置参数,固定的占用了9876端口。
比较有意思的是这个9876端口并没有提供覆盖的方法
然后在启动服务时,启动了一个RemotingServer。这个就是用来响应请求的。
在关闭服务时,关闭了四个东西remotingServer,响应请求的服务;remotingExecutor Netty服务线程池; scheduledExecutorService 定时任务;fileWatchService 这个是用来跟踪acl配置的(acl的配置文件是实时热加载的)。
所以整个NameServer的结构是这样:
Broker启动的入口在BrokerStartup这个类,可以从他的main方法开始调试。
启动过程关键点:
重点也是围绕一个BrokerController对象,先创建,然后再启动。
在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置:
BrokerConfig
NettyServerConfig:Netty服务端占用了10911端口。又是一个神奇的端口。
NettyClientConfig
MessageStoreConfig
然后在BrokerController.start方法可以看到启动了一大堆Broker的核心服务,我们挑一些重要的
this.messageStore.start();启动核心的消息存储组件
this.remotingServer.start();
this.fastRemotingServer.start(); 启动两个Netty服务
this.brokerOuterAPI.start();启动客户端,往外发请求
BrokerController.this.registerBrokerAll: 向NameServer注册心跳。
this.brokerStatsManager.start();
this.brokerFastFailure.start();这也是一些负责具体业务的功能组件
我们现在不需要了解这些核心组件的具体功能,只要有个大概,Broker中有一大堆的功能组件负责具体的业务。
我们需要抽取出Broker的一个整体结构:
![%uFFFD成.png](https://img-blog.csdnimg.cn/img_convert/982f492d1edccd46337c96496de1caea.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u53ca2053&margin=[object Object]&name=%uFFFD成.png&originHeight=507&originWidth=724&originalType=binary&ratio=1&rotation=0&showTitle=false&size=33050&status=done&style=none&taskId=u2ff7dd8c-a95e-406d-9934-6343c0f9d4f&title=)
BrokerController.this.registerBrokerAll方法会发起向NameServer注册心跳。启动时会立即注册,同时也会启动一个线程池,以10秒延迟,默认30秒的间隔 持续向NameServer发送心跳。
BrokerController.this.registerBrokerAll这个方法就是注册心跳的入口。
![%uFFFD程.png](https://img-blog.csdnimg.cn/img_convert/673a30eac1c2fc33030ec21b4529a9ab.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u039acd8a&margin=[object Object]&name=%uFFFD程.png&originHeight=448&originWidth=1047&originalType=binary&ratio=1&rotation=0&showTitle=false&size=50338&status=done&style=none&taskId=u29a73710-94de-4f2e-b732-2260243e893&title=)
注意Producer有两种。 一个是普通发送者DefaultMQProducer。这个只需要构建一个Netty客户端。
还一个是事务消息发送者: TransactionMQProducer。这个需要构建一个Netty客户端同时也要构建Netty服务端。对于整个Producer的流程,其实还是挺复杂的,大致分两个步骤, start()方法,准备一大堆信息,send发送消息。我们先抽取下主线。
首先,关于Borker路由信息的管理: Producer需要拉取Broker列表,然后跟Broker建立连接等等很多核心的流程,其实都是在发送消息时建立的。因为在启动时,还不知道要拉取哪个Topic的Broker列表呢。所以对于这个问题,我们关注的重点,不应该是start方法,而是send方法。Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地缓存中没有,就从NameServer中去申请。
路由信息大致的管理流程:
![%uFFFD理.png](https://img-blog.csdnimg.cn/img_convert/8bd1933eee7629f75b19af6438ece608.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u7fc7feec&margin=[object Object]&name=%uFFFD理.png&originHeight=353&originWidth=732&originalType=binary&ratio=1&rotation=0&showTitle=false&size=41602&status=done&style=none&taskId=ufd4cca68-8915-4c82-980c-9e3b1884ef4&title=)
然后 获取路由信息后,会选出一个MessageQueue去发送消息。这个选MessageQueue的方法就是一个索引自增然后取模的方式。
![6%uFFFD%uFFFDQueue.png](https://img-blog.csdnimg.cn/img_convert/cacf4b1e33d12c38be327d87ffcd24da.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u137cb912&margin=[object Object]&name=6%uFFFD%uFFFDQueue.png&originHeight=755&originWidth=1059&originalType=binary&ratio=1&rotation=0&showTitle=false&size=146915&status=done&style=none&taskId=u1c435cd6-26eb-4829-acd9-980834ac6c6&title=)
然后 封装Netty请求发送消息。消息发从到Borker后,会由一个CommitLog类写入到CommitLog文件中。
我们接着上面的流程,来关注下Broker是如何把消息进行存储的。
消息存储的入口在:DefaultMessageStore.putMessage
最终存储的文件有哪些?
commitLog:消息存储目录
config:运行期间一些配置信息
consumerqueue:消息消费队列存储目录
index:消息索引文件存储目录
abort:如果存在改文件寿命Broker非正常关闭
checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
6.1-commitLog写入
CommitLog的doAppend方法就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。写入消息的过程是串行的,一次只会允许一个线程写入。
6.2-分发ConsumeQueue和IndexFile当CommitLog写入一条消息后,会有一个后台线程reputMessageService每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ComsumeQueue和IndexFile里去,这就是他底层的实现逻辑。
并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。
入口:CommitLog.putMessage -> CommitLog.handleDiskFlush
其中主要涉及到是否开启了对外内存。TransientStorePoolEnable。如果开启了堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内存就可以确保不会被交换到虚拟内存中。
入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()
默认情况下, Broker会启动后台线程,每60秒,检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。注意他删除时,并不会检查消息是否被消费了。
整个文件存储的核心入口入口在DefaultMessageStore的start方法中。
![%uFFFD储.png](https://img-blog.csdnimg.cn/img_convert/33e48ef092356f45988a1bebf095014f.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=uafe74e02&margin=[object Object]&name=%uFFFD储.png&originHeight=372&originWidth=1038&originalType=binary&ratio=1&rotation=0&showTitle=false&size=32431&status=done&style=none&taskId=ubf4fa3e7-d4be-4404-9789-1283a607ad3&title=)
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写 *** 作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。
CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。
当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。
RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。
消费者以消费者组的模式开展。消费者组之间有集群模式和广播模式两种消费模式。然后消费模式有推模式和拉模式。推模式是由拉模式封装组成。
集群模式下,消费队列负载均衡的通用原理:一个消费队列同一时间只能被一个消费者消费,而一个消费者可以同时消费多个队列。
消息顺序:RocketMQ只支持一个队列上的局部消息顺序,不保证全局消息顺序。 要实现顺序消息,可以把有序的消息指定为一个queue,或者给Topic只指定一个Queue,这个不推荐。
DefaultMQPushConsumer.start方法
启动过程不用太过关注,有个概念就行,然后客户端启动的核心是mQClientFactory 主要是启动了一大堆的服务。
这些服务可以结合具体场景再进行深入。例如pullMessageService主要处理拉取消息服务,rebalanceService主要处理客户端的负载均衡。
拉模式: PullMessageService
PullRequest里有messageQueue和processQueue,其中messageQueue负责拉取消息,拉取到后,将消息存入processQueue,进行处理。 存入后就可以清空messageQueue,继续拉取了。
![%uFFFD结.png](https://img-blog.csdnimg.cn/img_convert/0b61f6fdc95dc3633656ae20f626d229.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=ub0919565&margin=[object Object]&name=%uFFFD结.png&originHeight=790&originWidth=1021&originalType=binary&ratio=1&rotation=0&showTitle=false&size=467796&status=done&style=none&taskId=u22efe17d-8a21-43e7-ae7f-f27aec5a9b2&title=)
在Broker的配置中,有一个配置项longPollingEnable可以配置为true开启长轮询模式。我们看下这个是干什么的。消息长轮询的处理入口在Broker端的PullMessageProcessor.processReuquest方法,这是一个非常长的方法。在403行左右。有这一段。
case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) { long pollingTimeMills = suspendTimeoutMillisLong; //消息长轮询 if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) { pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills(); } String topic = requestHeader.getTopic(); long offset = requestHeader.getQueueOffset(); int queueId = requestHeader.getQueueId(); PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills, this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter); this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest); response = null; break;}
如果开启了长轮询机制,PullRequestHoldService会每隔5S被环境去尝试检查是否有新的消息到来,并给客户端响应,或者直到超时才给客户端响应。消息的实时性比较差,为了避免这种情况,RocketMQ还有另外一个机制,当消息到达时唤醒挂起的线程再检查一次。
这个机制的入口在DefaultMessageStore的start方法中,会启动一个reputMessageService。然后在commitLog消息分发成功后,会检查如果开启了长轮询,就会唤醒NotifyMessageArrivingListener,进行一起请求线程的检查。
if (dispatchRequest.isSuccess()) { if (size > 0) { //分发CommitLog写入消息 DefaultMessageStore.this.doDispatch(dispatchRequest); //K2 长轮询: 如果有消息到了主节点,并且开启了长轮询。 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { //唤醒NotifyMessageArrivingListener的arriving方法,进行一次请求线程的检查 DefaultMessageStore.this.messageArrivingListener .arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); }7.4 客户端负载均衡策略
1>在消费者示例的start方法中,启动RebalanceService,这个是客户端进行负载均衡策略的启动服务。他只负责根据负载均衡策略获取当前客户端分配到的MessageQueue示例。
五种负载策略,可以由Consumer的allocateMessageQueueStrategy属性来选择。
最常用的是AllocateMessageQueueAveragely平均分配和AllocateMessageQueueAveragelyByCircle平均轮询分配。
平均分配是把MessageQueue按组内的消费者个数平均分配。
而平均轮询分配就是把MessageQueue按组内的消费者一个一个轮询分配。
例如,六个队列q1,q2,q3,q4,q5,q6,分配给三个消费者c1,c2,c3
平均分配的结果就是: c1:{q1,q2},c2:{q3,q4},c3{q5,q6}
平均轮询分配的结果就是: c1:{q1,q4},c2:{q2,q5},c3:{q3,q6}
2>消费的过程
消费的过程依然是在DefaultMQPushConsumerImpl的 consumeMessageService中。他有两个子类ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService
消费过程的入口在DefaultMQPushConsumerImpl的pullMessage中定义的PullCallback中。
延迟消息的处理入口在scheduleMessageService这个组件中。 他会在broker启动时也一起加载。
整个延迟消息的实现方式是这样的:![%uFFFD息.png](https://img-blog.csdnimg.cn/img_convert/7eb37e3e8ae52c9d856366ffa9c0ba13.png#clientId=u2fd21818-6291-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u312abd2c&margin=[object Object]&name=%uFFFD息.png&originHeight=403&originWidth=820&originalType=binary&ratio=1&rotation=0&showTitle=false&size=27146&status=done&style=none&taskId=u3f6947b1-5f84-463e-9305-7264e6398d1&title=)
消息写入时,会将延迟消息转为写入到SCHEDULE_TOPIC_XXXX这个Topic中。这个系统内置的Topic有18个队列,对应18个延迟级别。
代码见CommitLog.putMessage方法。
然后ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。
消费者部分小结:
RocketMQ消息消费方式分别为集群模式、广播模式。
消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时间只会分配给一个消费者。
消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控。
并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器),广播模式消息消费进度存储在消费者端。
RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。
顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队列只能串行消费。并并发消息消费最本质的区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。
这个是在面试时,关于MQ,面试官最喜欢问的问题。这个问题是所有MQ都需要面对的一个共性问题。大致的解决思路都是一致的,但是针对不同的MQ产品又有不同的解决方案。分析这个问题要从以下几个角度入手:
我们考虑一个通用的MQ场景:
![MQ消息可靠性分析.png](https://img-blog.csdnimg.cn/img_convert/0b975dd8c90176529fbbf49525778879.png#clientId=u8601d301-d74b-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=ub8282fa1&margin=[object Object]&name=MQ消息可靠性分析.png&originHeight=609&originWidth=875&originalType=binary&ratio=1&rotation=0&showTitle=false&size=49852&status=done&style=none&taskId=u506c5b98-f1b2-4d0f-a4c2-35cc01dcb13&title=)
其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。
然后关于3这个环节,通常MQ存盘时都会先写入 *** 作系统的缓存page cache中,然后再由 *** 作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。
这个是MQ场景都会面对的通用的丢消息问题。那我们看看用Rocket时要如何解决这个问题
这个结论比较容易理解,因为RocketMQ的事务消息机制就是为了保证零丢失来设计的,并且经过阿里的验证,肯定是非常靠谱的。
但是如果深入一点的话,我们还是要理解下这个事务消息到底是不是靠谱。我们以最常见的电商订单场景为例,来简单分析下事务消息机制如何保证消息不丢失。我们看下下面这个流程图:
![RocketMQ生产者消息可靠性分析.png](https://img-blog.csdnimg.cn/img_convert/9211c1362076694eedfd8bd16b9ea88b.png#clientId=u8601d301-d74b-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=ua47736ed&margin=[object Object]&name=RocketMQ生产者消息可靠性分析.png&originHeight=361&originWidth=888&originalType=binary&ratio=1&rotation=0&showTitle=false&size=38924&status=done&style=none&taskId=u1d0df9ae-c458-492c-a173-eab00262710&title=)
1、为什么要发送个half消息?有什么用?
这个half消息是在订单系统进行下单 *** 作前发送,并且对下游服务的消费者是不可见的。那这个消息的作用更多的体现在确认RocketMQ的服务是否正常。相当于嗅探下RocketMQ服务是否正常,并且通知RocketMQ,我马上就要发一个很重要的消息了,你做好准备。
2.half消息如果写入失败了怎么办?
如果没有half消息这个流程,那我们通常是会在订单系统中先完成下单,再发送消息给MQ。这时候写入消息到MQ如果失败就会非常尴尬了。而half消息如果写入失败,我们就可以认为MQ的服务是有问题的,这时,就不能通知下游服务了。我们可以在下单时给订单一个状态标记,然后等待MQ服务正常后再进行补偿 *** 作,等MQ服务正常后重新下单通知下游服务。
3.订单系统写数据库失败了怎么办?
这个问题我们同样比较下没有使用事务消息机制时会怎么办?如果没有使用事务消息,我们只能判断下单失败,抛出了异常,那就不往MQ发消息了,这样至少保证不会对下游服务进行错误的通知。但是这样的话,如果过一段时间数据库恢复过来了,这个消息就无法再次发送了。当然,也可以设计另外的补偿机制,例如将订单数据缓存起来,再启动一个线程定时尝试往数据库写。而如果使用事务消息机制,就可以有一种更优雅的方案。
如果下单时,写数据库失败(可能是数据库崩了,需要等一段时间才能恢复)。那我们可以另外找个地方把订单消息先缓存起来(Redis、文本或者其他方式),然后给RocketMQ返回一个UNKNOWN状态。这样RocketMQ就会过一段时间来回查事务状态。我们就可以在回查事务状态时再尝试把订单数据写入数据库,如果数据库这时候已经恢复了,那就能完整正常的下单,再继续后面的业务。这样这个订单的消息就不会因为数据库临时崩了而丢失。
4.half消息写入成功后RocketMQ挂了怎么办?
我们需要注意下,在事务消息的处理机制中,未知状态的事务状态回查是由RocketMQ的Broker主动发起的。也就是说如果出现了这种情况,那RocketMQ就不会回调到事务消息中回查事务状态的服务。这时,我们就可以将订单一直标记为"新下单"的状态。而等RocketMQ恢复后,只要存储的消息没有丢失,RocketMQ就会再次继续状态回查的流程。
5.下单成功后如何优雅的等待支付成功?
在订单场景下,通常会要求下单完成后,客户在一定时间内,例如10分钟,内完成订单支付,支付完成后才会通知下游服务进行进一步的营销补偿。
如果不用事务消息,那通常会怎么办?
最简单的方式是启动一个定时任务,每隔一段时间扫描订单表,比对未支付的订单的下单时间,将超过时间的订单回收。这种方式显然是有很大问题的,需要定时扫描很庞大的一个订单信息,这对系统是个不小的压力。
那更进一步的方案是什么呢?是不是就可以使用RocketMQ提供的延迟消息机制。往MQ发一个延迟1分钟的消息,消费到这个消息后去检查订单的支付状态,如果订单已经支付,就往下游发送下单的通知。而如果没有支付,就再发一个延迟1分钟的消息。最终在第十个消息时把订单回收。这个方案就不用对全部的订单表进行扫描,而只需要每次处理一个单独的订单消息。
那如果使用上了事务消息呢?我们就可以用事务消息的状态回查机制来替代定时的任务。在下单时,给Broker返回一个UNKNOWN的未知状态。而在状态回查的方法中去查询订单的支付状态。这样整个业务逻辑就会简单很多。我们只需要配置RocketMQ中的事务消息回查次数(默认15次)和事务回查间隔时间(messageDelayLevel),就可以更优雅的完成这个支付状态检查的需求。
6、事务消息机制的作用
整体来说,在订单这个场景下,消息不丢失的问题实际上就还是转化成了下单这个业务与下游服务的业务的分布式事务一致性问题。而事务一致性问题一直以来都是一个非常复杂的问题。而RocketMQ的事务消息机制,实际上只保证了整个事务消息的一半,他保证的是订单系统下单和发消息这两个事件的事务一致性,而对下游服务的事务并没有保证。但是即便如此,也是分布式事务的一个很好的降级方案。目前来看,也是业内最好的降级方案。
1、同步刷盘
这个从我们之前的分析,就很好理解了。我们可以简单的把RocketMQ的刷盘方式 flushDiskType配置成同步刷盘就可以保证消息在刷盘过程中不会丢失了。
2、Dledger的文件同步
![Dledger文件同步.png](https://img-blog.csdnimg.cn/img_convert/06c00c5e1ed6643606e1faa7b1ef40a2.png#clientId=u8601d301-d74b-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u38673bb8&margin=[object Object]&name=Dledger文件同步.png&originHeight=456&originWidth=967&originalType=binary&ratio=1&rotation=0&showTitle=false&size=25533&status=done&style=none&taskId=ucce60779-bfbc-4beb-8f61-327e9ea2251&title=)
在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。
3》消费者端不要使用异步消费机制简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。
Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。
正常情况下,消费者端都是需要先处理本地事务,然后再给MQ一个ACK响应,这时MQ就会修改Offset,将消息标记为已消费,从而不再往其他消费者推送消息。所以在Broker的这种重新推送机制下,消息是不会在传输过程中丢失的。但是也会有下面这种情况会造成服务端消息丢失:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { new Thread(){ public void run(){ //处理业务逻辑 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); } }; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
这种异步消费的方式,就有可能造成消息状态返回后消费者本地业务逻辑处理失败造成消息丢失的可能。
NameServer在RocketMQ中,是扮演的一个路由中心的角色,提供到Broker的路由功能。但是其实路由中心这样的功能,在所有的MQ中都是需要的。kafka是用zookeeper和一个作为Controller的Broker一起来提供路由服务,整个功能是相当复杂纠结的。而RabbitMQ是由每一个Broker来提供路由服务。而只有RocketMQ把这个路由中心单独抽取了出来,并独立部署。
这个NameServer之前都了解过,集群中任意多的节点挂掉,都不会影响他提供的路由功能。那如果集群中所有的NameServer节点都挂了呢?
有很多人就会认为在生产者和消费者中都会有全部路由信息的缓存副本,那整个服务可以正常工作一段时间。其实这个问题大家可以做一下实验,当NameServer全部挂了后,生产者和消费者是立即就无法工作了的。至于为什么,可以回顾一下我们之前的源码课程去源码中找找答案。
那再回到我们的消息不丢失的问题,在这种情况下,RocketMQ相当于整个服务都不可用了,那他本身肯定无法给我们保证消息不丢失了。我们只能自己设计一个降级方案来处理这个问题了。例如在订单系统中,如果多次尝试发送RocketMQ不成功,那就只能另外找给地方(Redis、文件或者内存等)把订单消息缓存下来,然后起一个线程定时的扫描这些失败的订单消息,尝试往RocketMQ发送。这样等RocketMQ的服务恢复过来后,就能第一时间把这些消息重新发送出去。整个这套降级的机制,在大型互联网项目中,都是必须要有的。
完整分析过后,整个RocketMQ消息零丢失的方案其实挺简单
生产者使用事务消息机制。Broker配置同步刷盘+Dledger主从架构消费者不要使用异步消费。整个MQ挂了之后准备降级方案
那这套方案是不是就很完美呢?其实很明显,这整套的消息零丢失方案,在各个环节都大量的降低了系统的处理性能以及吞吐量。在很多场景下,这套方案带来的性能损失的代价可能远远大于部分消息丢失的代价。所以,我们在设计RocketMQ使用方案时,要根据实际的业务情况来考虑。例如,如果针对所有服务器都在同一个机房的场景,完全可以把Broker配置成异步刷盘来提升吞吐量。而在有些对消息可靠性要求没有那么高的场景,在生产者端就可以采用其他一些更简单的方案来提升吞吐,而采用定时对账、补偿的机制来提高消息的可靠性。而如果消费者不需要进行消息存盘,那使用异步消费的机制带来的性能提升也是非常显著的。
总之,这套消息零丢失方案的总结是为了在设计RocketMQ使用方案时的一个很好的参考。
这个也是面试时最常见的问题,需要对MQ场景有一定的深入理解。例如如果我们有个大数据系统,需要对业务系统的日志进行收集分析,这时候为了减少对业务系统的影响,通常都会通过MQ来做消息中转。而这时候,对消息的顺序就有一定的要求了。例如我们考虑下面这一系列的 *** 作。
- 用户的积分默认是0分,而新注册用户设置为默认的10分。用户有奖励行为,积分+2分。用户有不正当行为,积分-3分。
这样一组 *** 作,正常用户积分要变成9分。但是如果顺序乱了,这个结果就全部对不了。这时,就需要对这一组 *** 作,保证消息都是有序的。
MQ的顺序问题分为全局有序和局部有序。
全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。局部有序:只保证一部分关键消息的消费顺序。
首先 我们需要分析下这个问题,在通常的业务场景中,全局有序和局部有序哪个更重要?其实在大部分的MQ业务场景,我们只需要能够保证局部有序就可以了。例如我们用QQ聊天,只需要保证一个聊天窗口里的消息有序就可以了。而对于电商订单场景,也只要保证一个订单的所有消息是有序的就可以了。至于全局消息的顺序,并不会太关心。而通常意义下,全局有序都可以压缩成局部有序的问题。例如以前我们常用的聊天室,就是个典型的需要保证消息全局有序的场景。但是这种场景,通常可以压缩成只有一个聊天窗口的QQ来理解。即整个系统只有一个聊天通道,这样就可以用QQ那种保证一个聊天窗口消息有序的方式来保证整个系统的全局消息有序。
然后 落地到RocketMQ。通常情况下,发送者发送消息时,会通过MessageQueue轮询的方式保证消息尽量均匀的分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,他们之间的消息都是互相隔离的,在这种情况下,是无法保证消息全局有序的。
而对于局部有序的要求,只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。
另外,通常所谓的保证Topic全局消息有序的方式,就是将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序了。这个说法其实就是我们将聊天室场景压缩成只有一个聊天窗口的QQ一样的理解方式。而这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用,基本上就没有用MQ的必要了。
在正常情况下,使用MQ都会要尽量保证他的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。例如某一天一个数据库突然挂了,大家大概率就会集中处理数据库的问题。等好不容易把数据库恢复过来了,这时基于这个数据库服务的消费者程序就会积累大量的消息。或者网络波动等情况,也会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的。所以消息积压是个需要时时关注的问题。
对于消息积压,如果是RocketMQ或者kafka还好,他们的消息积压不会对性能造成很大的影响。而如果是RabbitMQ的话,那就惨了,大量的消息积压可以瞬间造成性能直线下滑。
对于RocketMQ来说,有个最简单的方式来确定消息是否有积压。那就是使用web控制台,就能直接看到消息的积压情况。
在Web控制台的主题页面,可以通过 Consumer管理 按钮实时看到消息的积压情况。
![RocketMQ消息积压.png](https://img-blog.csdnimg.cn/img_convert/104f4aaf45392c6026d1610bfbd75f2f.png#clientId=u8601d301-d74b-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=uc0629fe7&margin=[object Object]&name=RocketMQ消息积压.png&originHeight=464&originWidth=1160&originalType=binary&ratio=1&rotation=0&showTitle=false&size=66371&status=done&style=none&taskId=uedffa081-0ab4-4248-9a31-a7fc77965df&title=)
另外,也可以通过mqadmin指令在后台检查各个Topic的消息延迟情况。
还有RocketMQ也会在他的 ${storePathRootDir}/config 目录下落地一系列的json文件,也可以用来跟踪消息积压情况。
其实我们回顾下RocketMQ的负载均衡的内容就不难想到解决方案。
如果Topic下的MessageQueue配置得是足够多的,那每个Consumer实际上会分配多个MessageQueue来进行消费。这个时候,就可以简单的通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时再继续增加Consumer的服务节点就没有用了。
而如果Topic下的MessageQueue配置得不够多的话,那就不能用上面这种增加Consumer节点个数的方法了。这时怎么办呢? 这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。
四、RocketMQ的消息轨迹在官网中,还分析了一个特殊的情况。就是如果RocketMQ原本是采用的普通方式搭建主从架构,而现在想要中途改为使用Dledger高可用集群,这时候如果不想历史消息丢失,就需要先将消息进行对齐,也就是要消费者把所有的消息都消费完,再来切换主从架构。因为Dledger集群会接管RocketMQ原有的CommitLog日志,所以切换主从架构时,如果有消息没有消费完,这些消息是存在旧的CommitLog中的,就无法再进行消费了。这个场景下也是需要尽快的处理掉积压的消息。
RocketMQ默认提供了消息轨迹的功能:
打开消息轨迹功能,需要在broker.conf中打开一个关键配置:
traceTopicEnable=true
这个配置的默认值是false。也就是说默认是关闭的。
默认情况下,消息轨迹数据是存于一个系统级别的Topic ,RMQ_SYS_TRACE_TOPIC。这个Topic在Broker节点启动时,会自动创建出来。
![RocketMQ系统Topic.png](https://img-blog.csdnimg.cn/img_convert/2fb3ea22fed842f20907661bd2df76ec.png#clientId=u8601d301-d74b-4&crop=0&crop=0&crop=1&crop=1&from=ui&id=u65e1960e&margin=[object Object]&name=RocketMQ系统Topic.png&originHeight=845&originWidth=1201&originalType=binary&ratio=1&rotation=0&showTitle=false&size=99124&status=done&style=none&taskId=u11d8672f-f462-44f0-a19b-071527fe401&title=)
另外,也支持客户端自定义轨迹数据存储的Topic。
在客户端的两个核心对象 DefaultMQProducer和DefaultMQPushConsumer,他们的构造函数中,都有两个可选的参数来打开消息轨迹存储
enableMsgTrace:是否打开消息轨迹。默认是false。customizedTraceTopic:配置将消息轨迹数据存储到用户指定的Topic 。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)