RocketMQ

RocketMQ,第1张

RocketMQ 1、基础准备

 下载maven :wget http://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

 编译: mvn clean package -Dmaven.test.skip=true

 启动NameServer
   nohup sh bin/mqnamesrv -n "公网IP:9876" &   #后台运行 nameserver
   tail -f ~/logs/rocketmqlogs/namesrv.log #监听nameserver日志文件

 启动broker
   nohup sh bin/mqbroker -n 公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &
   tail -f ~/logs/rocketmqlogs/broker.log

 发送消息
   export NAMESRV_ADDR=localhost:9876 #设置环境变量
   sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

 接收消息
   sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

 关闭Server
   sh bin/mqshutdown broker
   sh bin/mqshutdown namesrv

在阿里云服务器上部署时遇到的坑,修改conf/broker.conf文件

  namesrvAddr = 自己云服务器的公网IP:9876

  brokerIP1 = 自己云服务器的公网IP

2、Rocket角色

  broker

  • Brocker面向producer和consumer接收和发送消息
  • 向nameserver提交自己的信息
  • 是消息中间件的消息存储、收发服务器
  • 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报

 broker集群

  • Broker高可用,可以配置Master / Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave
    • 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
    • Master与Slave的对应关系通过制定相同的BrokerName,不同的BrokerId来定义为0表示Master,非0表示Slave
  • Master多机负载,可以部署多个broker
    • 每个Broker与nameserver集群中的所有节点建立长链接,定时注册Topic信息到nameserver

 producer

  • 消息的生产者
  • 通过nameserver集群中的其中一个节点(随机选择)建立长连接,获取Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
  • 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳

 consumer

  • 消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接

 nameserver

  • 底层有netty实现,提供了路由管理、服务注册、服务发现,是一个无状态节点
  • nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把他从列表中剔除
  • nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用
  • nameserver集群键互不通信,没有主备的概念
  • nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
  • 为什么不用zookeeper? rockermq希望为了提高性能,CAP定理,客户端负载均衡

  对比JSM中的Topic和Queue

   Topic是一个逻辑上的概念,实际上Message是在每个Broker上以Queue的形式记录。

      

3、发送方式  同步发送

  消息发送中进入同步等待状态,可以保证消息投递一定到达

 异步消息

  想要快速发送消息,又不想丢失的时候可以使用异步消息

producer.send(message, new SendCallback() {
   public void onSuccess(SendResult sendResult) {
      System.out.println("消息发送成功");
      System.out.println("sendResult" + sendResult);
   }

   public void onException(Throwable throwable) {
      //如果发生异常 case 异常,尝试重投  或者调整业务逻辑
      throwable.printStackTrace();
      System.out.println("发送异常");
    }
});
单向消息

  只发送消息,不等待服务器响应,只发送请求不等待应答,此方式发送消息的过程耗时非常短,一般在微秒级别   peoducer.sendoneway(message);

延迟消息

4、消息  消息消费模式

  消息消费模式由消费者决定,可以由消费者设置MessageModel来决定消息模式

 消息模式默认为集群消费模式

  consumer.setMessageModel(MessageModel.BROADCASTING);

  consumer.setMessageModel(MessageModel.CLUSTERING);

 集群消息

    

  集群消息是指集群化部署消费者

    当使用集群消费模式时,MQ认为任意一条消息只需要被集群内的任意一个消费者处理即可

  特点

  •   每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
  • 在消息重投时不能保证路由到同一台机器上
  • 消费状态由broker维护
 广播消息

  

    当使用广播消费模式时,MQ会将每条消息推送给集群内所有注册过的客户端,保证至少被每台机器消费一次

  特点

  • 消费进度由consumer维护
  • 保证每个消费者消费一次消息
  • 消费失败的消息不会重投
4、过滤

 一个group中消费者的tag selector 都不能随便变,要保持统一

 TAG

   在Producer中使用Tag: Message msg = new Message("Topic001","TAG-A","KEY-A","tag".getBytes());

   在Consumer中订阅Tag: consumer.subscribe("Topic001", "TAG-A");

 SQL表达式过滤

 实例:MessageSelector selector = MessageSelector.bySql("order > 5");

            consumer.subscribe("xxxxxx",selector);

 语法:

  RocketMQ只定义了一些基本的语法来支持这个功能,也可以扩展

  1. 数字比较  >  ,  >= , < , <= , between , =

  2.字符比较  = ,<> ,  IN

  3. is null 或者 is not null

  4.逻辑运算  and  , or , not

 常量类型是:数字,字符串('abc',必须使用单引号),  NULL , TRUE , FALSE

5、消息事务

   

  事务执行流程  

 RocketMQ实现方式

  Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中

  检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调

  超时:如果超过回查次数,默认回滚消息

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5672685.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存