RocketMQ数据存储&集群原理&顺序消费

RocketMQ数据存储&集群原理&顺序消费,第1张

RocketMQ数据存储&集群原理&顺序消费

目录

一、RocketMQ数据存储原理

二、集群原理

同步刷盘和异步刷盘

同步复制和异步复制

集群原理

主从broker如何保证消息消费一致性

三、RocktMQ顺序消费


一、RocketMQ数据存储原理

生产者投递消息

  1. 生产者在投递消息到mq服务器端,会将该消息存放在commitlog日志文件中(顺序写)。
  2. Mq后台就会开启一个异步的线程将该commitlogoffset实现分配存放到不同队列中。

消费者消费消息:

  1. 消费者消费消息的时候订阅到队列(consumerqueue),根据queueoffset 获取到该commitlogoffset
  2. 在根据commitlogoffset 去commitlog日志文件中查找到该消息主体返回给客户端。

总结 

  • 生产者将消息投递到broker时,会将所有的消息以顺序写的方式追加到Commitlog文件中,MQ开启异步线程将消息分配到相应的队列中(包含commitlogOffset值、msgSize、Tag等信息)。消费者订阅相应的队列,通过consumerQueueOffset的值去获取到commitlogOffset值,然后根据commitlogOffset的值获取到消息体,然后进行消费。
  • commitlog文件每个文件的大小默认1G ,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824,以此类推,第三个文件名字为00000000002147483648,起始偏移量为2147483648 ,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件。
  • 理想状态下一个消费者对应一个队列,如果消费者数量多于队列数量,那么多余的消费者消费不到消息。因此在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。
  • 在集群消费(Clustering)模式下每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给Consumer Group2 消费。 消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再消费这条消息。
  • RocketMQ和kafka一样,消息消费之后并不会立即删除消息,而是通过删除策略删除消息
二、集群原理 同步刷盘和异步刷盘

 RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种

写磁盘方式:

  异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写 *** 作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘 *** 作,快速写入 

           优点:性能高

           缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致

  同步刷盘方式:在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。

          优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致

          缺点:性能比异步的低

配置方式在broker.conf中配置

  • ASYNC_FLUSH 异步刷盘
  • SYNC_FLUSH 同步刷盘
  • flushDiskType=ASYNC_FLUSH
同步复制和异步复制

  如果一个broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。

       同步复制方式:等Master和Slave均写成功后才反馈给客户端写成功状态

            优点:如果Master出故障,Slave上有全部的备份数据,容易恢复,消费者仍可以从Slave消费, 消息不丢失

            缺点:增大数据写入延迟,降低系统吞吐量,性能比异步复制模式略低,大约低10%左右,发送单个Master的响应时间会略高

      异步复制方式:只要Master写成功即可反馈给客户端写成功状态

            优点:系统拥有较低的延迟和较高的吞吐量. Master宕机之后,消费者仍可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多个Master模式几乎一样

            缺点:如果Master出了故障,有些数据因为没有被写入Slave,而丢失少量消息。

配置方式在broker.conf中配置

brokerRole参数

  • ASYNC_MASTER 同步
  • SYNC_MASTER 异步
  • SLAVE 从节点
集群原理

        nameServer:多个Namesrv实例组成集群,但相互独立,没有信息交换。nameserver类似ZK和nacos等注册中心的功能。broker在启动时会将自己的ip和端口号注册到每一个nameserver中,然后与nameserver建立长连接。nameserver每隔30秒会发送一个心跳包,告诉broker自己还存活。而nameServer 定时器每隔10s的时间检测 故障Broker ,如果发生故障Broker 会直接剔除。生产者投递消息时会从nameserver中获取到broker的地址列表,然后进行消息投递。如果生产者在获取到服务列表之后,恰好当前broker宕机,那么生产者默认会有3次重试,如果依然失败,则重新从nameserver获取broker列表,进行消息投递。

集群搭建 参考 RocketMq集群安装&整合Springboot_熟透的蜗牛的博客-CSDN博客

主从broker如何保证消息消费一致性

在/data/rocketmq/store-a/config/consumerOffset.json文件中有如下结构

{
        "offsetTable":{
                "%RETRY%test-group@test-group":{0:0
                },
                "%RETRY%order-consumer@order-consumer":{0:0
                },
                "xiaojie-test@test-group":{0:4,1:3,2:4,3:28
                }
        }
}

 在"xiaojie-test@test-group":{0:4,1:3,2:4,3:28}

其中 xiaojie-test为topic名称

test-group为消费组的名称

0:4 表示queueId 为0, consumeroffset为4,也就是说队列id为0的消息消费到偏移量为4的位置。

当master节点消费消息时,主节点会将自己commitLog和consumerOffset.json文件异步的同步到salve节点上。

当主节点宕机之后,从节点不能支持写 *** 作,但是可以执行读的 *** 作。但此时主节点的consumerOffset.json中consumeroffset值滞后于主节点,当主节点恢复之后,如何消费呢?答案是主节点恢复之后,会首先同步从节点的consumerOffset.json文件,然后再进行消费。

三、RocktMQ顺序消费

 

        消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

        但正如上图所示,消费者是采用多线程的方式消费的,此时即使投递消息时的队列一致,也不能保证消费的时候就严格按照顺序消费。

官网顺序消费demo

package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;


public class Producer {

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       producer.setNamesrvAddr("127.0.0.1:9876");

       producer.start();

       String[] tags = new String[]{"TagA", "TagC", "TagD"};

       // 订单列表
       List orderList = new Producer().buildOrders();

       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) {
           // 加个时间前缀
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //根据订单id选择发送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//订单id

           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       }

       producer.shutdown();
   }

   
   private static class OrderStep {
       private long orderId;
       private String desc;

       public long getOrderId() {
           return orderId;
       }

       public void setOrderId(long orderId) {
           this.orderId = orderId;
       }

       public String getDesc() {
           return desc;
       }

       public void setDesc(String desc) {
           this.desc = desc;
       }

       @Override
       public String toString() {
           return "OrderStep{" +
               "orderId=" + orderId +
               ", desc='" + desc + ''' +
               '}';
       }
   }

   
   private List buildOrders() {
       List orderList = new ArrayList();

       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("创建");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("付款");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("推送");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("完成");
       orderList.add(orderDemo);

       return orderList;
   }
}

消费者代码

package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;


public class ConsumerInOrder {

   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       
       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       consumer.subscribe("TopicTest", "TagA || TagC || TagD");

       consumer.registerMessageListener(new MessageListenerOrderly() {

           Random random = new Random();

           @Override
           public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                   System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }

               try {
                   //模拟业务逻辑处理中...
                   TimeUnit.SECONDS.sleep(random.nextInt(10));
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });

       consumer.start();

       System.out.println("Consumer Started.");
   }
}

Springboot整合顺序消费 

package com.xiaojie.rocket.rocket.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;


@Component
@Slf4j
public class OrderProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void orderSend() {
        String msg = "这是测试顺序发送消息的内容-------insert";
        String msg1 = "这是测试顺序发送消息的内容-------update";
        String msg2 = "这是测试顺序发送消息的内容-------delete";
        String orderId = UUID.randomUUID().toString();
        SendResult sendResult1 = rocketMQTemplate.syncSendOrderly("test-orderly", msg, orderId);
        log.info(">>>>>>>>>>>>>>>result1{}", sendResult1);
        SendResult sendResult2 = rocketMQTemplate.syncSendOrderly("test-orderly", msg1, orderId);
        log.info(">>>>>>>>>>>>>>>result2{}", sendResult2);
        SendResult sendResult3 = rocketMQTemplate.syncSendOrderly("test-orderly", msg2, orderId);
        log.info(">>>>>>>>>>>>>>>result3{}", sendResult2);

    }
}
package com.xiaojie.rocket.rocket.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.Random;


@Component
@RocketMQMessageListener(topic = "test-orderly", consumerGroup = "orderly-1", consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class OrderConsumer implements RocketMQListener {
    @Override
    public void onMessage(Object message) {
        try {
            Random r = new Random(100);
            int i = r.nextInt(500);
            Thread.sleep(i);
        } catch (Exception e) {
        }
        log.info("消费者监听到消息:", message);
    }
}

参考 :https://blog.csdn.net/guyue35/article/details/105674044

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5596654.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存