RocketMQ原理和实战

RocketMQ原理和实战,第1张

RocketMQ原理和实战 1、架构图

2.系统角色

  Producer:

       充当消息发布的角色,支持分布式集群方式部署。producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递。投递的过程支持快速失败并且低延迟。

  Consumer:

       充当消息消费者的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播形式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  NameServer:

      NameServer是一个功能齐全的服务器,其角色类似dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:

  1.  Broker管理。NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。
  2.  路由信息管理。每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

       NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Produce,Consumer仍然可以动态感知Broker的路由的信息。

  配置NameServer:

      代码方式:producer.setNamesrvAddr("ip:port"); consumer.setNamesrvAddr("ip:port");多个实例的NameServer以";'分割。

      java 配置:rocketmq.namesrv.addr

      *** 作系统环境变量: NAMESRV_ADDR

      HTTP Endpoint.

  Broker Server:

     broker主要负责消息的存储、投递和查询以及服务高可用保证。为了实现这些功能broker包含了以下几个重要子模块。

     Remoting Module:整个broker的实体,负责处理来自clients端的请求。

     Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的topic订阅信息

     Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。

     HA Service:高可用服务,提供master broker 和 slave broker之间的数据同步功能。

     Index Service:根据特定的Message key对投递到broker的消息进行索引服务,以提供消息的快速查询。

3.消息的组成

1、Message
消息载体。Message发送或者消费的时候必须指定Topic。Message有一个可选的Tag项用于过滤消息,还可以添加额外的键值对。
2、topic
消息的逻辑分类,发消息之前必须要指定一个topic才能发,就是将这条消息发送到这个topic上。消费消息的时候指定这个topic进行消费。就是逻辑分类。
3、queue
1个Topic会被分为N个Queue,数量是可配置的。message本身其实是存储到queue上的,消费者消费的也是queue上的消息。多说一嘴,比如1个topic4个queue,有5个Consumer都在消费这个topic,那么会有一个consumer浪费掉了,因为负载均衡策略,每个consumer消费1个queue,5>4,溢出1个,这个会不工作。
4、Tag
Tag 是 Topic 的进一步细分,顾名思义,标签。每个发送的时候消息都能打tag,消费的时候可以根据tag进行过滤,选择性消费。
5、Message Model
消息模型:集群(Clustering)和广播(Broadcasting)
6、Message Order
消息顺序:顺序(Orderly)和并发(Concurrently)
7、Producer Group
消息生产者组
8、Consumer Group
消息消费者组

4.消费模式

1、集群模式(Clustering)

  • 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者

  • 在消息重投时,不能保证路由到同一台机器上

  • 消费状态由broker维护

广播模式(Broadcasting)

  • 消费进度由consumer维护

  • 保证每个消费者都消费一次消息

  • 消费失败的消息不会重投

5.消息持久化

Broker单个实例下所有的队列共用一个日志数据文件CommitLog来存储

Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据

ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值

IndexFile(索引文件)则只是为了消息查询提供了一种通过key或时间区间来查询消息的方法

6.SpringBoot环境中使用RocketMQ

引入依赖


	org.apache.rocketmq
	rocketmq-common
	4.6.1


	org.apache.rocketmq
	rocketmq-client
	4.6.1

1.生产者

public class RmqProducerFactory {

    @Value("${rocketmq.producer.groupName}")
    private String groupName;

    @Value("${rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @Bean(name="testProducer",initMethod = "init", destroyMethod = "shutdown")
    public RmqProducer testProducer() throws RmqException {
        RmqProducer producer = new RmqProducer();
        producer.setGroupName(groupName);
        producer.setNamesrvAddr(namesrvAddr);
        return producer;
    }
}

public class RmqProducer {

    private DefaultMQProducer producer;

    private String groupName;

    private String namesrvAddr;

    private int maxMessageSize = 1024 * 1024 * 4;//4M

    private int sendMsgTimeout = 10000;

    public void init() throws RmqException {
        if(StringUtils.isBlank(this.groupName)){
            throw new RmqException("groupName is black!");
        }
        if(StringUtils.isBlank(this.namesrvAddr)){
            throw new RmqException("namesrvAddr is black!");
        }

        this.producer = new DefaultMQProducer(this.groupName);
        this.producer.setNamesrvAddr(namesrvAddr);
        this.producer.setMaxMessageSize(maxMessageSize);
        this.producer.setSendMsgTimeout(sendMsgTimeout);

        try {
            this.producer.start();
            log.info(String.format("producer is start!groupName[%s],namesrvAddr:[%s]",this.groupName,this.namesrvAddr));
        } catch (MQClientException e) {
            log.error(String.format("producer start error!groupName[%s],namesrvAddr:[%s]",this.groupName,this.namesrvAddr));
            throw new RmqException();
        }

    }

    public SendResult sendMessage(String topic, String tags, String keys, String messageText) throws RmqException{
        if(StringUtils.isBlank(topic)){
            throw new RmqException("topic is black!");
        }
        if(StringUtils.isBlank(messageText)){
            throw new RmqException("messageText is black!");
        }
        Message message = new Message(topic,tags,keys,messageText.getBytes());

        try {
            SendResult result = producer.send(message);
            return result;
        } catch (Exception e) {
            log.error("sendMessage error!{}",e.getMessage(),e);
            throw new RmqException(e);
        }
    }

    public void sendMessage(String topic, String tags, String keys, String messageText, SendCallback sendCallback) throws RmqException{
        if(StringUtils.isBlank(topic)){
            throw new RmqException("topic is black!");
        }
        if(StringUtils.isBlank(messageText)){
            throw new RmqException("messageText is black!");
        }
        Message message = new Message(topic,tags,keys,messageText.getBytes());

        try {
            producer.send(message,sendCallback);
        } catch (Exception e) {
            log.error("sendMessage error!{}",e.getMessage(),e);
            throw new RmqException(e);
        }
    }

    public  void sendMessage(MQEnums.TopicEnum tenum, RmqMsgReq req, SendCallback sendCallback) throws RmqException{
        sendMessage( tenum.getTopic(),tenum.getTag(),  req.getKeys(),  JSON.toJSonString(req),  sendCallback);
    }
    public  SendResult sendMessagendMessage(MQEnums.TopicEnum tenum, RmqMsgReq req) throws RmqException {
        return this.sendMessage(tenum.getTopic(),tenum.getTag(),req.getKeys(), JSON.toJSonString(req));
    }

    public void shutdown(){
        if(producer != null){
            producer.shutdown();
        }
    }

}

2.消费者

public class RmqConsumerFactory {


    @Value("${rocketmq.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;

    @Autowired
    @Qualifier("testProcessor")
    ImessageProcessor testProcessor;

    @Bean(name="testConsumer",initMethod = "init", destroyMethod = "shutdown")
    public RmqConsumer testConsumer() throws RmqException {

        RmqConsumer consumer = new RmqConsumer();
        consumer.setGroupName(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setTopic(MQEnums.TopicEnum.TEST.getTopic());
        consumer.setTag(MQEnums.TopicEnum.TEST.getTag());
        consumer.setProcessor(testProcessor);
        return consumer;

    }
}


public class RmqConsumer {

    private DefaultMQPushConsumer consumer;

    private String groupName;

    private String namesrvAddr;

    private String topic;

    private String tag = "*";//多个以|| 分割

    private int consumerThreadMin = 20;

    private int consumerThreadMax = 64;

    private ImessageProcessor processor;


    public void init() throws RmqException {
        if(StringUtils.isBlank(this.topic)){
            throw new RmqException("topic is black!");
        }
        if(StringUtils.isBlank(this.groupName)){
            throw new RmqException("groupName is black!");
        }
        if(StringUtils.isBlank(this.namesrvAddr)){
            throw new RmqException("namesrvAddr is black!");
        }
        try {
            this.consumer = new DefaultMQPushConsumer(this.groupName);
            this.consumer.setNamesrvAddr(namesrvAddr);
            this.consumer.setConsumeThreadMax(consumerThreadMax);
            this.consumer.setConsumeThreadMin(consumerThreadMin);
            this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            this.consumer.subscribe(topic,tag);

            RmqListener rmqListener = new RmqListener();
            rmqListener.setMessageProcessor(processor);
            consumer.registerMessageListener(rmqListener);

            this.consumer.start();
            log.info(String.format("consumer is start!groupName[%s],namesrvAddr:[%s]",this.groupName,this.namesrvAddr));
        } catch (MQClientException e) {
            log.error(String.format("consumer start error!groupName[%s],namesrvAddr:[%s]",this.groupName,this.namesrvAddr));
            throw new RmqException();
        }

    }

    public void shutdown(){
        if(consumer != null){
            consumer.shutdown();
        }
    }

}


public class RmqListener implements MessageListenerConcurrently {

    private ImessageProcessor messageProcessor;

    public void setMessageProcessor(ImessageProcessor messageProcessor){
        this.messageProcessor = messageProcessor;
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
        for(MessageExt msg : msgs){
            String body = new String(msg.getBody());
            T tMsg = JSONObject.parseObject(body,messageProcessor.getMsgType());
            boolean result = messageProcessor.handleMessage(tMsg,msg.getMsgId(),msg.getReconsumeTimes());
            if(!result){
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
7.事务
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    private OrderService orderService;

    
    @Override
    @Transactional
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        String orderId = (String) arg;

        // 根据本地事务执行成与否判断 事务消息是否需要commit与 rollback
        LocalTransactionState state = LocalTransactionState.UNKNOW;

        YzyOrderDO record = JSONObject.parseObject(msg.getBody(), YzyOrderDO.class);

        //MQ已经收到了TransactionProducer send方法发送的事务消息,下面执行本地的事务
        //本地记录订单信息
        int ret = orderService.updatePayStatusByOrderId(orderId);

        if (ret < 1) {
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            state = LocalTransactionState.COMMIT_MESSAGE;
        }

        //如果事务很长,可以异步去执行,最后返回 LocalTransactionState.UNKNOW;  在checkLocalTransaction查状态
        System.out.println("executeLocalTransaction------" + state.toString());

        return state;
    }

    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {

        LocalTransactionState state = LocalTransactionState.UNKNOW;

        YzyOrderDO record = JSONObject.parseObject(msg.getBody(), YzyOrderDO.class);

        //根据是否有order_id对应转账记录 来判断事务是否执行成功
        boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId());

        if (isLocalSuccess) {
            state = LocalTransactionState.COMMIT_MESSAGE;
        } else {
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }

        //如果有中间状态,可返回LocalTransactionState.UNKNOW  等再次确认

        //优化方向
        //设计一张Transaction表,将业务表和Transaction绑定在同一个本地事务中,
        // 如果扣款本地事务成功时,Transaction中应当已经记录该TransactionId的状态为「已完成」。
        // 当RocketMq回查时,只需要检查对应的TransactionId的状态是否是「已完成」就好,而不用关心具体的业务数据。

        System.out.println("checkLocalTransaction------" + state.toString());

        return state;
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5638941.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存