RocketMq

RocketMq,第1张

RocketMq 一,MQ的作用

一,MQ的作用
  • 限流削峰
  • 异步解耦

  • 数据解耦
1,各种中间件比较:

 注意:spring cloud alibaba 提倡使用rocketMq

STOMP:面向文件流的信息协议

RocketMQ 是什么

Github 上关于 RocketMQ 的介绍:
RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:

  1. 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  2. 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  3. 支持拉(pull)和推(push)两种消息模式
  4. 单一队列百万消息的堆积能力
  5. 支持多种消息协议,如 JMS、MQTT 等
  6. 分布式高可用的部署架构,满足至少一次消息传递语义
  7. 提供 docker 镜像用于隔离测试和云集群部署
  8. 提供配置、指标和监控等功能丰富的 Dashboard

概览
Apache RocketMQ是一个分布式消息和流平台,它的特性包括低延迟,高性能,高可靠性,万亿级容量和d性扩展。它的架构主要包括四部分:命名服务(name servers),代理服务(brokers),生产者(Producers),消费者(Consumers)。其中每个部分都是可以水平扩展从而避免单点故障。如图所示:

一,基本概念

1,消息 message

发送内容的载体

2,主题 Topic       topic:message    1:n

  • 对消息进行分类,一个消息只能有一个topic
  • topic表示一类消息,每个主题有若干条消息
  • 是消息进行订阅的基本单位
  • 一个consumer只能订阅一个topic   1:1
  • 一个producer可以生产多种topic     1:n

3,队列 queue

  • 存储消息的实体
  • 分区也叫队列
  • 一个topic可以多个队列
  • topic中的一个queue只能被同一个consumer组中的一个消费者所消费(同一个分区不能被一个消费者组重复消费)
  • 注意,分片不同于分区

4,消息标识(messageID/key)

 二,系统架构

 命名服务集群


命名服务提供了轻量的服务用于发现和路由。每个命名服务都记录了全量的路由信息,提供了读写一致性的服务,并且支持快速存储扩展。

代理服务集群


代理服务利用轻量的话题(TOPIC)和队列(QUEUE)机制来存储消息。代理服务能够支持推(Push)和拉(Pull)的模式,利用多拷贝来支持容错(2份或者3分拷贝),提供强大的峰值填充和以原始时间顺序堆积万亿数量级消息。并且代理服务还具备一些传统消息系统没有的特性,比如灾难恢复,多维度统计,预警机制等。

生产者集群


生产者支持分布式部署。分布式生产者通过负载均衡将信息发送到代理服务集群,发送进程支持快速失败(fail fast)以及低延迟。同一个生产者组可以生产不同的topic的消息

消费者集群


消费者也能够在推拉模式中也能够支持分布式部署,并且支持集群消费以及消息广播。这种方式也提供了实时消息订阅机制,能够满足大部分消费者的需求。RocketMQ的官网也提供了相关的网页给有兴趣的用户。同一个消费者组只能消费同一个topic的消息

 

当消费者组的consumer数量应该小于topic的queue的数量

命名服务


命名服务是纯粹的功能性服务器,主要包含2个特性:

代理管理(broker),命名服务接受代理服务集群的注册,同时通过心跳机制来检查单个代理的有效性
路由管理,每个路由服务都保存了代理集群完整的路由信息以及客户端查询所需要的队列信息。RocketMQ的客户端(生产者/消费者)会从命名服务获取队列路由信息,但是客户端如何获取命名

路由注册

Nameserver,无状态,各节点不通讯

 

服务的地址?
下面有4个方法将命名服务地址推流至客户端:

硬编码,如 producer.setNamesAddr("ip:port").
Java运行参数,使用 rocketmq.namesrv.addr。
环境变量,使用 NAMESRV_ADDR。
HTTP的终点(Endpoint)。
点击这里查看更多关于命名服务的细节文档。

代理服务


代理服务负责消息存储和分发,消息查询,高可用等等。
如下图所示,代理服务包含几个重要的子模块:

  1. 远程模块,代理服务入口,处理客户端发送的请求。
  2. 客户端管理,管理客户端(生产者/消费者)并且维护消费者订阅。
  3. 存储服务,为存储在物理硬盘上的消息提供简单的保存或查询接口。
  4. 高可用服务,在主代理和从代理直接提供数据同步功能。
  5. 索引服务,根据特定的key对消息进行索引构建,用于提供消息的快速检索。

原文链接:https://blog.csdn.net/pluto4596/article/details/88915116

消息同其他关系:

  • message   ------>Topic      : 多个message可以放入一个topic
  • topic   ------>  group           :一个topic可以有多个group,  一个group可以有多个topic          
  • topic   -------> queue       :一个topic可以有多个队列
  • queue -------> offset       :一个queue只有一个offset      

 

consumer和queue的数量不等

  • consumer > queue:会有对应的consumer空闲
  • consumer = queue :  一对一消费
  • consumer <  queue : 一个consumer消费多个queue

docker下载Rocketmq控制台
命令:

docker pull styletang/rocketmq-console-ng
 

启动服务
RocketMQ有两个服务需要启动: namesrv和broker

1.4.1 启动namesrv服务
     

   docker run -d -p 9876:9876 
        -v /opt/rocketmq/data/namesrv/logs:/root/logs 
        -v /opt/rocketmq/data/namesrv/store:/root/store 
        --name rmqnamesrv -e "MAX_POSSIBLE_HEAP=100000000" 
        rocketmqinc/rocketmq:latest sh mqnamesrv


-p 指定端口,前面为外部访问端口,后面为宿主机访问端口
-v 挂载目录,将docker内目录和linux目录进行挂载
–name 指定当前容器名称
-e 指定配置
sh mqnamesrv 启动容器内部指令
1.4.2 启动broker服务
获取配置文件
- 可先直接启动broker,然后将其中的配置文件复制出来

- docker cp 容器名:要拷贝的文件在容器里面的路径 要拷贝到宿主机的相应路径

- 复制到/usr/broker.conf位置
修改配置文件broker.conf
- 添加一下配置(更多配置内容见文章末尾)
  

  brokerIP1=宿主机IP
    namesrvAddr=宿主机IP:9876


启动broker服务
 

docker run -d -p 10911:10911 -p 10909:10909 
-v /opt/rocketmq/data/broker/logs:/root/logs 
-v /opt/rocketmq/data/broker/store:/root/store 
-v /usr/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf 
--name rmqbroker --link rmqnamesrv:namesrv 
-e "NAMESRV_ADDR=namesrv:9876" 
-e "MAX_POSSIBLE_HEAP=200000000" 
rocketmqinc/rocketmq:latest 
sh mqbroker -n 192.168.126.130:9876 
-c /opt/rocketmq-4.4.0/conf/broker.conf autoCreateTopicEnable=true


挂载配置文件:
-v /usr/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
指定宿主机ip
-n 192.168.126.130:9876
启用配置文件
-c /opt/rocketmq-4.4.0/conf/broker.conf

1.4.3 启动RocketMQ控制台
 

docker run --name rocketmq-console 
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.21.152:9876 
-Dcom.rocketmq.sendMessageWithVIPChannel=false" 
-p 8081:8080 -t styletang/rocketmq-console-ng

访问控制台: http://虚拟机IP:8081
如果有一个集群说明启动成功, 地址需要显示宿主机IP地 


原文链接:https://blog.csdn.net/qq_15740267/article/details/118523509

Message
Message(消息)就是要传输的信息。

一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。

一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。

Topic
Topic(主题)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。

Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。

一个 Topic 也可以被 0个、1个、多个消费者订阅。

Tag
Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。

标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

Group
分组,一个组可以订阅多个Topic。

分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的

Queue
在Kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。

Message Queue
Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列。

一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。

消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。

Offset
在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。

也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。

消息消费模式
消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。

默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

而广播消费消息会发给消费者组中的每一个消费者进行消费。

Message Order
Message Order(消息顺序)有两种:Orderly(顺序消费)和Concurrently(并行消费)。

顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。 

并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

一次完整的通信流程是怎样的?
Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。

Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
 


原文链接:https://blog.csdn.net/qq_35190492/article/details/103341634

producer 

1,核心参数

  • producerGroup:组名唯一
  • createTopicKey:创建主题时需要的密钥
  • defaultTopicQueueNums:在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数,默认4
  • sendMsgTimeout:发送消息超时时间
  • compressMsgBodyOverHowmuch:消息压缩字节,默认4096字节,超过该值,rocketmq就会对消息进行压缩
  • retryTimesWhenSendFailed:重发策略,同理存在异步的(retryTimesWhenSendAsyncFailed)
  • retryAnotherBrokerWhenNotStoreOk:默认false
  • maxMessagerSize:消息最大容量,默认128k

consumer 

 1、消费模式

  • 集群消费(默认)同一个comsumerGroup只消费一次
  • 广播消费  每个consumer都会对消息进行消费

2,消息类型

通过实现MessageListenerConcurrently对应的consumeMessage方法,返回消息类型

3,定时消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688956.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存