RocketMQ应用

RocketMQ应用,第1张

RocketMQ应用

目录

一:普通消息

二:顺序消息

三:延时消息

四:事务消息

五:批量消息

六:消息过滤

七:消息发送重试机制

八:消息消费重试机制

九:死信队列


一:普通消息

消息发送分类

Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。

1.同步发送

 

同步发送消息是指,Producer发出一条消息后,会在收到MQ返回的ACK之后才发一条消息。该方式的消息可靠性最高,但消息发送效率太低。

2.异步发送

 

异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下一条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。

3.单向发送

单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。

代码举例

1.定义同步消息发送生产者

public static void main(String[] args) throws Exception {
        // 创建一个在名为pg组的生产者
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 制定nameServer的地址
        producer.setNamesrvAddr("MQ:9876");
        // 设置生产者发送消息失败是重新发送消息的次数,默认为2次
        producer.setRetryTimesWhenSendFailed(3);
        // 设置消息发送超时的时限,默认为3s
        producer.setSendMsgTimeout(5000);

        // 开启生产者
        producer.start();

        // 循环发送一百条消息
        for (int i = 0; i < 100; i++) {
            // 消息发送结果
            SendResult sendResult = new SendResult();
            // 消息主体
            byte[] body = ("Hi" + i).getBytes();
            Message message = new Message("someTopic", "Sometag", body);
            // 为消息指定key
            message.setKeys("key-" + i);
            // 同步发送消息
            sendResult = producer.send(message);
            System.out.println(sendResult);
        }
        // 关闭producer
        producer.shutdown();
    }

2.定义异步消息发送生产者

{
        // 创建生产者并且为其设置相关的属性
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("MQ:9876");
        // 指定异步发送失败后不进行重试发送
        producer.setRetryTimesWhenSendAsyncFailed(0);
        // 指定新创建的Topic的Queue数量为2,默认为4
        producer.setDefaultTopicQueueNums(2);
//        producer.setRetryTimesWhenSendFailed(5);
//        producer.setSendMsgTimeout(5000);

        producer.start();

        // 异步发送100条消息
        for (int i = 0; i < 100; i++) {
            byte[] bytes = ("Hi"+ i).getBytes();
            Message message = new Message("MyTopicA", "MyTag", bytes);
            message.setKeys("key-"+ i);
            // 使用异步发送消息,并且指定回调函数
            producer.send(message, new SendCallback() {
                // 当producer接收到MQ发送过来的ACK之后会回调此函数
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                @Override
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });
        }
        // sleep一会儿
        // 由于采用的是异步发送,所以若这里不sleep,
        // 则消息还未发送就会将producer给关闭,报错
        TimeUnit.SECONDS.sleep(3);
        producer.shutdown();
    }

3.定义单向消息发送生产者

public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("MQ:9876");
        producer.start();

        for (int i = 0; i < 10; i++) {
            byte[] bytes = ("Hi," + i).getBytes();
            Message message = new Message("single", "sometag", bytes);
            message.setKeys("key-" + i);
            producer.sendoneway(message);
        }
        producer.shutdown();
        System.out.println("producer shutdown");
    }

4.定义消息消费者

public static void main(String[] args) throws Exception {

        // 定义一个pull的消费者
        DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer("cg");
        // 定义一个push的消费者
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("cg");
        // 指定nameServer
        pushConsumer.setNamesrvAddr("MQ:9876");
        // 指定从第一条消息开始消费
        pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定消费topic与tag
        pushConsumer.subscribe("someTopic", "*");
        // 指定采用“广播模式”进行消费,默认为“集群模式”
        // consumer.setMessageModel(MessageModel.BROADCASTING);

        // 注册消息监听器
        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext context) {
                // 逐条消费消息
                for (MessageExt message : list) {
                    System.out.println(message);
                }
                // 返回消息消费成功消息
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 开启消费者消费消息
        pushConsumer.start();
        System.out.println("Consumer start");

    }
二:顺序消息

顺序消息指的是,严格按照消息的发送顺序进行消费的消息(FIFO)。
默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。

三:延时消息

当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。
采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。

在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在30分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完成,则取消订单,将商品再次放回到库存;如果完成支付,则忽略。
在12306平台中,车票预订成功后就会发送一条延迟消息。这条消息将会在45分钟后投递给后台业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成付。如果未完成,则取消预订,将车票再次放回到票池;如果完成支付,则忽略。

四:事务消息

1.问题引入

这里的一个需求场景是:工行用户A向建行用户B转账1万元。
我们可以使用同步消息来处理该需求场景:

1. 工行系统发送一个给B增款1万元的同步消息M给Broker
2. 消息被Broker成功接收后,向工行系统发送成功ACK
3. 工行系统收到成功ACK后从用户A中扣款1万元
4. 建行系统从Broker中获取到消息M
5. 建行系统消费消息M,即向用户B中增加1万元 

问题引入:若第3步中的扣款 *** 作失败,但消息已经成功发送到了Broker。对于MQ来
说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元,而工行中用户并没有减少一万元,因此出现了数据不一致问题。

2.思路解决       

解决思路是,让第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布式事务解决方案。

1. 事务管理器TM向事务协调器TC发起指令,开启全局事务
2. 工行系统发一个给B增款1万元的事务消息M给TC
3. TC会向Broker发送半事务消息prepareHalf,将消息M预提交到Broker。此时的建行系统是看不到Broker中的消息M的
4. Broker会将预提交执行结果Report给TC。
5. 如果预提交失败,则TC会向TM上报预提交失败的响应,全局事务结束;如果预提交成功,TC会调用工行系统的回调 *** 作,去完成工行用户A的预扣款1万元的 *** 作
6. 工行系统会向TC发送预扣款执行结果,即本地事务的执行状态
7. TC收到预扣款执行结果后,会将结果上报给TM。 

预扣款 *** 作存在三种可能性:

public enum LocalTransactionState {

        COMMIT_MESSAGE,  // 本地事务执行成功
        ROLLBACK_MESSAGE,  // 本地事务执行失败
        UNKNOW,  // 不确定,表示需要进行回查以确定本地事务的执行结果

}

8. TM会根据上报结果向TC发出不同的确认指令

  • 若预扣款成功(本地事务状态为COMMIT_MESSAGE),则TM向TC发送Global Commit指令
  • 若预扣款失败(本地事务状态为ROLLBACK_MESSAGE),则TM向TC发送Global Rollback指令
  • 若现未知状态(本地事务状态为UNKNOW),则会触发工行系统的本地事务状态回查 *** 作。回查 *** 作会将回查结果,即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上报给TM,TM会再向TC发送最终确认指令Global Commit或Global Rollback

9. TC在接收到指令后会向Broker与工行系统发出确认指令

  • TC接收的若是Global Commit指令,则向Broker与工行系统发送Branch Commit指令。此时Broker中的消息M才可被建行系统看到;此时的工行用户A中的扣款 *** 作才真正被确认
  • TC接收到的若是Global Rollback指令,则向Broker与工行系统发送Branch Rollback指令。此时Broker中的消息M将被撤销;工行用户A中的扣款 *** 作将被回滚

以上方案就是为了确保消息投递与扣款 *** 作能够在一个事务中,要成功都成功,有一个失败,则全部回滚。
以上方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的,而事务消息方案中的消息预提交与预扣款 *** 作间是同步的。

3.分布式事务

对于分布式事务,通俗地说就是,一次 *** 作由若干分支 *** 作组成,这些分支 *** 作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支 *** 作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证 *** 作结果的一致性。

4.事务消息

RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式。

5.半事务消息

暂不能投递的消息,发送方已经成功地将消息发送到了Broker,但是Broker未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到。处于该种状态下的消息即半事务消息。

6.本地事务状态

Producer回调 *** 作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令。

7.消息回查

消息回查,即重新查询本地事务的执行状态。本例就是重新到DB中查看预扣款 *** 作是否执行成功。

8.XA模式三剑客    

XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,是基于XA协议的。XA协议由Tuxedo(Transaction for Unix has been Extended for Distributed Operation,分布式 *** 作扩展之后的Unix事务系统)首先提出的,并交给X/Open组织,作为资源管理器与事务管理器的接口标准。XA模式中有三个重要组件:TC、TM、RM。

1)TC

Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。

RocketMQ中Broker充当TC

2)TR

Transaction Manager,事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。

RocketMQ中事务消息的Producer充当TM

3)RM

Resource Manager,资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

RocketMQ中事务消息的Producer及Broker均是RM

9.代码举例

1)定义工行事务监听器

package com.lwz.rocketmq.transcation;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

public class ICBCTransactionListener implements TransactionListener {

    
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("预提交消息成功:" + message);
        // 假设接收到TAGA的消息就表示扣款 *** 作成功,TAGB的消息表示扣款失败,
        // TAGC表示扣款结果不清楚,需要执行消息回查
        if (StringUtils.equals("TAGA", message.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TAGB", message.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if (StringUtils.equals("TAGC", message.getTags())) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.UNKNOW;
    }

    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("执行消息回查" + messageExt);
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

2)定义事务消息生产者

package com.lwz.rocketmq.transcation;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.*;

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("tpg");
        producer.setNamesrvAddr("106.52.41.171:9876");

        
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        // 为生产者指定一个线程池
        producer.setExecutorService(executorService);
        // 为生产者添加事务监听器
        producer.setTransactionListener(new ICBCTransactionListener());

        producer.start();

        String[] tags = {"TAGA","TAGB","TAGC"};
        for (int i = 0; i < 3; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TTopic", tags[i], body);
            // 发送事务消息
            // 第二个参数用于指定在执行本地事务时要使用的业务参数(下面的代码相当于开启了全局事务)
            SendResult sendResult = producer.sendMessageInTransaction(msg,null);
            System.out.println("发送结果为:" + sendResult.getSendStatus());
        }
    }

}

3)定义消费者

package com.lwz.rocketmq.transcation;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class SomeConsumer {

    public static void main(String[] args) throws Exception {

        // 定义一个pull的消费者
        DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer("cg");
        // 定义一个push的消费者
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("cg");
        // 指定nameServer
        pushConsumer.setNamesrvAddr("106.52.41.171:9876");
        // 指定从第一条消息开始消费
        pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定消费topic与tag
        pushConsumer.subscribe("TTopic", "*");
        // 指定采用“广播模式”进行消费,默认为“集群模式”
        // consumer.setMessageModel(MessageModel.BROADCASTING);

        // 注册消息监听器
        pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext context) {
                // 逐条消费消息
                for (MessageExt message : list) {
                    System.out.println(message);
                }
                // 返回消息消费成功消息
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 开启消费者消费消息
        pushConsumer.start();
        System.out.println("Consumer start");

    }

}
五:批量消息

生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点:

  1. 批量发送的消息必须具有相同的Topic
  2. 批量发送的消息必须具有相同的刷盘策略
  3. 批量发送的消息不能是延时消息与事务消息
六:消息过滤

消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。

七:消息发送重试机制

Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。消息发送重试有三种策略可以选择:同步发送失败策略、异步发送失败策略、消息刷盘失败策略

八:消息消费重试机制

Producer对消费失败的消息进行重新消费的机制,称为消息消费重试机制。消息消费重试机制有顺序消息的消费重试、无序消息的消费重试

九:死信队列

当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。这个队列就是死信队列(Dead-Letter Queue,DLQ),而其中的消息则称为死信消息(Dead-Letter Message,DLM)。

死信队列是用于处理无法被正常消费的消息的。

死信队列中的消息不会再被消费者正常消费,即DLQ对于消费者是不可见的
死信存储有效期与正常消息相同,均为 3 天(commitlog文件的过期时间),3 天后会被自动删除
死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,即每个消费者组都有一个死信队列
如果一个消费者组未产生死信消息,则不会为其创建相应的死信队列

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4012630.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-22
下一篇 2022-10-22

发表评论

登录后才能评论

评论列表(0条)

保存