目录
一、RocketMQ介绍
二、名词解释
三、安装
安装方式
准备工作
传统方式搭建RocketMQ集群
四、整合Springboot
一、RocketMQ介绍
RocketMQ是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在2016年底捐赠给Apache开源基金会成为孵化项目,经过不到一年时间正式成为了Apache顶级项目;早期阿里曾经基于ActiveMQ研发消息系统, 随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ和Kafka在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。
二、名词解释消息模型:RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
消息生产者(Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
消息消费者(Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
代理服务器(Broker Server):消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
拉取式消费(Pull Consumer):Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
推动式消费(Push Consumer):Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
集群消费(Clustering):集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费(Broadcasting):广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
普通顺序消息(Normal Ordered Message):普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
严格顺序消息(Strictly Ordered Message):严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
消息(Message):消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
摘自官网 https://github.com/apache/rocketmq/blob/master/docs/cn/concept.md
三、安装 安装方式单Master模式:这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。
多Master模式:
一个集群无Slave,全是Master。
-
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
-
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
多Master多Slave模式-异步复制:
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级)。
-
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
-
缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
多Master多Slave模式-同步双写:
每个Master配置一个Slave,有多对Master-Slave,HA(双机集群)采用同步双写方式,即只有主备都写成功,才向应用返回成功。:
-
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
-
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT(响应时间)会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
本文以多Master多Slave模式-异步复制这种方式安装
准备工作1、由于rocketmq是使用java语言编写,所以首先需要安装jdk环境,本文使用jdk17 安装参考 Linux系统下安装jdk17&jdk8安装_熟透的蜗牛的博客-CSDN博客_linux安装jdk17
2、集群示意图
两台服务器互为主从
传统方式搭建RocketMQ集群1、下载安装包 版本4.9.2
Apache Downloads
2、上传到服务器,并解压,如果没有安装unzipan,安装指令
[root@localhost ~]# yum install unzip zip
3、解压文件
#解压 [root@localhost ~]# unzip rocketmq-all-4.9.2-bin-release.zip #移动到/usr/local 并重命名为rocketmq [root@localhost ~]# mv rocketmq-4.9.2 /usr/local/rocketmq
4、修改启动脚本
注意:因为我的jdk是17,所以需要修改启动脚本,jdk8不需要修改,修改的地方为垃圾回收器,和启动参数(-Xms256m -Xmx256m -Xmn128m)
runbroker.sh
#!/bin/sh # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #=========================================================================================== # Java Environment Setting #=========================================================================================== error_exit () { echo "ERROR: $1 !!" exit 1 } [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOMEvariable in your environment, We need java(x64)!" export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export base_DIR=$(dirname $0)/.. export CLASSPATH=.${JAVA_HOME}/jre/lib/ext:${base_DIR}/lib @Service @Slf4j public class MqProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendMq() { for (int i = 0; i < 10; i++) { rocketMQTemplate.convertAndSend("xiaojie-test", "测试发送消息》》》》》》》》》" + i); } } public void sync() { SendResult sendResult = rocketMQTemplate.syncSend("xiaojie-test", "sync发送消息。。。。。。。。。。"); log.info("发送结果{}", sendResult); } public void async() { String msg = "异步发送消息。。。。。。。。。。"; log.info(">msg:<<" + msg); rocketMQTemplate.asyncSend("xiaojie-test", msg, new SendCallback() { @Override public void onSuccess(SendResult var1) { log.info("异步发送成功{}", var1); } @Override public void onException(Throwable var1) { //发送失败可以执行重试 log.info("异步发送失败{}", var1); } }); } }
消费者
package com.xiaojie.rocket.rocket.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @RocketMQMessageListener(consumerGroup = "test-group", topic = "xiaojie-test") @Slf4j @Component public class MqConsumer implements RocketMQListener{ @Override public void onMessage(String message) { log.info("接收到的数据是:{}", message); } }
参考:https://blog.csdn.net/javahongxi/article/details/84931747
完整代码参考: spring-boot: Springboot整合redis、消息中间件等相关代码
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)