RabbitMQ(十一)【高级 - 分布式事务】

RabbitMQ(十一)【高级 - 分布式事务】,第1张

十一、RabbitMQ - 高级 - 分布式事务

上一篇文章《高级 - 集群》

简述

分布式事务指事务 *** 作位于不同的节点上,需要保证事物的ACID特性

例如再下单场景下,库存和订单如果不在同一节点上,就涉及分布式事务

11.1 分布式事务的方式

在分布式系统中,要实现分布式事务,无外乎那几种解决方案

两阶段提交(2PC)需要数据库厂商的支持,java组件有atomikos等

两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务

1. 准备阶段

协调者询问参与者事务是否执行成功,参与者发回事务执行结果

2. 提交阶段

如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务

需要注意的是,在准备阶段,参与者执行了事务,但还未提交。只有在提交阶段接收到协调者发来的通知之后,才进行提交或回滚

存在问题

同步阻塞:所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它 *** 作单点问题:协调者在 2PC 中起到非常大的作用,发生故障会造成很大影响。特别是在阶段二发生故障,所有参与者会一直处于等待状态,无法完成其它 *** 作数据不一致:在阶段二,如果协调者只发送了部分commit消息,此时网络发生异常,那么只有部分参与者接收到commit消息,也就是说只有部分参与者提交了事务,使得系统数据不一致太过于保守:任意一个节点失败就会导致整个事务失败,没用完善的容错机制

补偿事务(TCC)严选,阿里,蚂蚁金服

TCC 其实就是采用补偿机制,其核心思想是:针对每个 *** 作,都要注册一个与其对应的确认和补偿(撤销) *** 作。它分为三个阶段

Try:阶段主要是对业务系统做检测及资源预留Confirm:阶段主要是对业务系统做确认提交,Try 阶段执行成功并开始执行 Confirm 阶段时,默认 Confirm 阶段是不会出错的。即:只要 Try 成功,Confirm 一定成功Cancel:阶段主要是在业务执行异常时,需要回滚的状态下执行业务取消,预留资源释放

本地消息表(异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式

本地消息表对业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的 *** 作满足事务特性,并且使用了消息队列来保证最终一致性

在分布式事务 *** 作的一方完成写业务数据的 *** 作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发在分布式事务 *** 作的另一方,从消息队列中读取一个消息,并执行消息中的 *** 作

优点:一种非常经典的实现,避免了分布式事务,实现了最终一致性

缺点:消息表会耦合到业务系统中,如果没用封装好的解决方案,会有很多杂务需要处理

MQ 事务消息 异步场景,通用性较强,拓展性较高

有一些第三方的MQ是支持事务消息的,比如 RocketMQ,它们支持事务消息的方式也是类似于采用二阶段提交,但是市面上一些主流MQ都是不支持事务消息的,比如 Kafka 不支持

以及阿里的 RabbitMQ 中间件为例,其思路大致为

第一阶段 Prepared 消息,会拿到消息的地址。第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息,如果确认消息发送失败了,RabbitMQ 会定期扫描消息集群中的事务消息,这时候发现了 Prepared 消息,它会向消息发送者确认,所以生产方需要需要实现一个 ckeck 接口,RabbitMQ 会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败

优点:实现了最终一致性,不需要以来本地数据库事务

缺点:实现难度大,主流MQ不支持,RockerMQ 事务消息部分代码为开源

11.2 RabbitMQ - 高级 - 分布式案例

场景测试

建数据库rabbitmq_dispatchrabbitmq_order
# 数据库 rabbitmq_dispatch
CREATE DATABASE `rabbitmq_dispatch` /*!40100 DEFAULT CHARACTER SET utf8 */
# 数据库 rabbitmq_order
CREATE DATABASE `rabbitmq_dispatch` /*!40100 DEFAULT CHARACTER SET utf8 */
# 在数据库 rabbitmq_dispatch 中创建表order
CREATE TABLE `order` (
  `dispatch_id` varchar(100) DEFAULT NULL,
  `order_id` int(11) DEFAULT NULL,
  `status` varchar(20) DEFAULT NULL,
  `order_content` varchar(100) DEFAULT NULL,
  `create_time` date DEFAULT NULL,
  `user_id` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

# 在数据库 rabbitmq_order 中创建表order
CREATE TABLE `order` (
  `order_id` int(11) DEFAULT NULL,
  `user_id` int(11) DEFAULT NULL,
  `order_content` varchar(100) DEFAULT NULL,
  `create_time` date DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
创建一个springboot工程rabbitmq-dispatcher-service

1)导入依赖

<dependencies>
    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-jdbcartifactId>
    dependency>
    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-webartifactId>
    dependency>

    
    <dependency>
        <groupId>org.projectlombokgroupId>
        <artifactId>lombokartifactId>
    dependency>

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-devtoolsartifactId>
        <scope>runtimescope>
        <optional>trueoptional>
    dependency>
    
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>5.1.47version>
    dependency>

    
    <dependency>
        <groupId>com.fasterxml.jackson.dataformatgroupId>
        <artifactId>jackson-dataformat-avroartifactId>
    dependency>
    <dependency>
        <groupId>org.apache.commonsgroupId>
        <artifactId>commons-lang3artifactId>
        <version>3.10version>
    dependency>

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-testartifactId>
        <scope>testscope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintagegroupId>
                <artifactId>junit-vintage-engineartifactId>
            exclusion>
        exclusions>
    dependency>
    <dependency>
        <groupId>org.springframework.amqpgroupId>
        <artifactId>spring-rabbit-testartifactId>
        <scope>testscope>
    dependency>
dependencies>

2)application.yml配置文件

server:
  port: 9000

spring:
  # 服务应用名称
  application:
    name: rabbitmq_dispatch
  # 配置数据源
  datasource:
    url: jdbc:mysql://localhost:3306/rabbitmq_dispatch?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
  rabbitmq:
#    单机
#    port: 5673
#    host: 192.168.159.100
    # 集群部署
    addresses: 192.168.159.100:5673,192.168.159.100:5674,192.168.159.100:5675
    username: admin
    password: admin
    virtual-host: /
    # 手动开启ack,让程序去控制MQ的消息和重发、删除、转移
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          # 开启重试模式
          enabled: true
          # 最大重试数
          max-attempts: 10

3)service包下的DispatchService.java类

package com.vinjcent.rabbitmq.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;


import java.util.UUID;

@SuppressWarnings("all")
@Service
@Transactional(rollbackFor = Exception.class)
public class DispatchService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    // 运单接收
    public void dispatch(Integer orderId) throws Exception {
        // 定义保存sql
        String sql = "insert into rabbitmq_dispatch.order(order_id,dispatch_id,status,order_content,user_id) values (?,?,?,?,?)";
        // 添加运动记录
        int count = jdbcTemplate.update(sql, orderId, UUID.randomUUID().toString(), "0","今晚吃鸡!",1);

        if (count != 1){
            throw new Exception("订单创建失败,原因[数据库 *** 作失败]");
        }
    }

}

4)controller包下的DispatchController.java类

package com.vinjcent.rabbitmq.controller;

import com.vinjcent.rabbitmq.service.DispatchService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SuppressWarnings("all")
@RestController
@RequestMapping("/dispatch")
public class DispatchController {

    @Autowired
    DispatchService dispatchService;

    // 添加订单后,添加调度信息
    @GetMapping("/order")
    public String lock(Integer orderId) throws Exception {
        if (orderId == 15){
            System.out.println("1111");
            Thread.sleep(3000L);    // 模拟业务耗时,接口调用者会认为超时
        }
        dispatchService.dispatch(orderId);  // 分配运单
        return "success";
    }

}
创建一个springboot工程rabbitmq-order-service

1)导入依赖

<dependencies>
    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-jdbcartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-webartifactId>
    dependency>

    
    <dependency>
        <groupId>org.projectlombokgroupId>
        <artifactId>lombokartifactId>
    dependency>

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-devtoolsartifactId>
        <scope>runtimescope>
        <optional>trueoptional>
    dependency>
    
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>5.1.47version>
    dependency>

    
    <dependency>
        <groupId>com.fasterxml.jackson.dataformatgroupId>
        <artifactId>jackson-dataformat-avroartifactId>
    dependency>
    <dependency>
        <groupId>org.apache.commonsgroupId>
        <artifactId>commons-lang3artifactId>
        <version>3.10version>
    dependency>

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-testartifactId>
        <scope>testscope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintagegroupId>
                <artifactId>junit-vintage-engineartifactId>
            exclusion>
        exclusions>
    dependency>
    <dependency>
        <groupId>org.springframework.amqpgroupId>
        <artifactId>spring-rabbit-testartifactId>
        <scope>testscope>
    dependency>
dependencies>

2)application.yml配置文件

server:
  port: 8000

spring:
  # 服务应用名称
  application:
    name: rabbitmq_order
  # 配置数据源
  datasource:
    url: jdbc:mysql://localhost:3306/rabbitmq_order?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
  rabbitmq:
    #    单机
    #    port: 5673
    #    host: 192.168.159.100
    # 集群部署
    addresses: 192.168.159.100:5673,192.168.159.100:5674,192.168.159.100:5675
    username: admin
    password: admin
    virtual-host: /
    # 手动开启ack,让程序去控制MQ的消息和重发、删除、转移
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          # 开启重试模式
          enabled: true
          # 最大重试数
          max-attempts: 10

3)实体类Order.java

package com.vinjcent.rabbitmq.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {

    private Integer orderId;
    private Integer userId;
    private String orderContent;
    private Date createTime;
}

4)service包下的类

OrderDatabaseService.java

package com.vinjcent.rabbitmq.service;

import com.vinjcent.rabbitmq.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@SuppressWarnings("all")
@Service
@Transactional(rollbackFor = Exception.class)
public class OrderDatabaseService {

    @Autowired
    private JdbcTemplate jdbcTemplate;


    /*
     * 保存订单记录
     */
    public void saveOrder(Order order) throws Exception {
        // 定义保存sql
        String sql = "insert into rabbitmq_order.order(order_id,user_id,order_content) values(?,?,?)";

        // 添加记录
        int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());

        if (count != 1){
            throw new Exception("订单创建失败,原因[数据库 *** 作失败]");
        }

        // 因为下单可能会使rabbitmq服务器出现宕机,从而引发消息没用放入MQ,为了消息可靠生产,对消息做一次冗余
        //saveLocalMessage(order);
    }

    /*
     * 保存信息到本地
     */
    public void saveLocalMessage(Order order) {
        // 定义sql
        String sql = "insert into message(order_id,order_content,status,unique id) values (?,?,?,?)";
    }


}

OrderService.java

package com.vinjcent.rabbitmq.service;

import com.vinjcent.rabbitmq.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

@SuppressWarnings("all")
@Service
public class OrderService {

    @Autowired
    private OrderDatabaseService orderDatabaseService;

    @Transactional(rollbackFor = Exception.class)
    public void createOrder(Order order) throws Exception {

        // 1.创建订单
        orderDatabaseService.saveOrder(order);

        // 2.通过http接口发送订单信息到运单系统
        String result = dispatchHttpApi(order.getOrderId());
        if (!"success".equals(result)){
            throw new Exception("订单创建失败,原因是运单接口调用失败!");
        }

    }

    public String dispatchHttpApi(Integer orderId) {
        SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
        // 连接超时 > 3s
        factory.setConnectTimeout(3000);
        // 处理超时 > 2s
        factory.setReadTimeout(2000);
        // 发送http请求
        String url = "http://localhost:9000/dispatch/order?orderId="+orderId;
        RestTemplate restTemplate = new RestTemplate(factory);
        String result = restTemplate.getForObject(url, String.class);
        return result;
    }

}
启动工程rabbitmq-dispatcher-service运行工程rabbitmq-order-service测试类下的函数
@SpringBootTest
class RabbitmqOrderServiceApplicationTests {

    @Autowired
    OrderService orderService;

    @Test
    void contextLoads() {
    }

    @Test
    void testCreateOrder() throws Exception {
        Order order = new Order(15,1,"今晚吃鸡!",new Date());
        orderService.createOrder(order);
        System.out.println("订单创建成功......");
    }
}
观察rabbitmq-order-service工程的控制台

分析:

在rabbitmq-order-service工程下,运行测试创建订单时,设置了分布式请求,添加调度信息调用另外一个微服务的接口;在rabbitmq-dispatcher-service工程下的/dispatch/order接口设置睡眠时间模拟用户读取超时,从而造成创建订单超时的异常(在rabbitmq-order-service工程下抛出)

如果没有设置全局事务(如@GlobalTransactional),不同微服务中的事务并不会直接干涉到。在OrderService.java类中,给createOrder()函数声明了事务的回滚,同样在http请求另外一个微服务接口的过程中同样也声明了事务回滚,但是在rabbitmq-order-service工程下出现了异常,也即是说在创建订单时出现了异常,回滚了订单,但是在rabbitmq-dispatcher-service工程下的转发过程并没有异常的抛出,所以数据不会回滚

为解决该类问题,使用RabbitMQ来解决

11.3 RabbitMQ - 高级 - 可靠生产和消息推送

基于MQ的分布式事务整体设计思路

分布式系统分布式事务问题 - 可靠生产问题

可靠生产和可靠消费的示意图

测试用例

在rabbitmq-order-service工程

在rabbitmq_order数据库创建表message
CREATE TABLE `message` (
  `order_id` int(11) DEFAULT NULL,
  `status` varchar(20) DEFAULT NULL,
  `order_content` varchar(100) DEFAULT NULL,
  `unique_id` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
service包下的类

MQOrderService.java

package com.vinjcent.rabbitmq.service;

import com.vinjcent.rabbitmq.mq.OrderMqService;
import com.vinjcent.rabbitmq.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@SuppressWarnings("all")
@Service
public class MqOrderService {

    @Autowired
    private OrderDatabaseService orderService;

    @Autowired
    private OrderMqService orderMqService;

    public void createOrder(Order order) throws Exception {

        // 1.订单信息,插入订单
        orderService.saveOrder(order);
        // 2.通过http接口发送订单信息到运单系统
        orderMqService.sendMessage(order);
    }

}
mq包下的类

OrderMqService.java

package com.vinjcent.rabbitmq.mq;


import com.vinjcent.rabbitmq.pojo.Order;
import com.vinjcent.rabbitmq.utils.JsonUtil;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@SuppressWarnings("all")
@Service
public class OrderMqService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;



    public void sendMessage(Order order) {
        rabbitTemplate.convertAndSend("order_fanout_exchange","", JsonUtil.obj2String(order),new CorrelationData(order.getOrderId().toString()));
    }


    // 该注解被用来修饰一个非静态的void方法,被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,
    // 并且只会被服务器执行一次,PostConstruct在构造函数之后执行,init()方法之前执行
    @PostConstruct
    public void regCallback() {
        // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){

            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("cause" + cause);
                // 如果ack为true代表消息已经收到
                String orderId = correlationData.getId();

                if (!ack){
                    // 这里可能要进行其它的方式存储
                    System.out.println("MQ队列答应失败,orderId是: " + orderId);
                    return;
                }

                String sql = "update message set status = 1 where order_id = ?";
                int count = jdbcTemplate.update(sql, orderId);
                try {
                    if (count == 1){
                        System.out.println("本地消息状态修改成功,消息成功投递到消息队列中...");
                    }
                } catch (Exception e) {
                    System.out.println("本地消息状态修改失败,出现异常: " + e.getMessage());
                }
            }
        });
    }
}
utils包的类

JsonUtil.java

package com.vinjcent.rabbitmq.utils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;


import java.text.SimpleDateFormat;

@Slf4j
public class JsonUtil {
    private static ObjectMapper objectMapper = new ObjectMapper();
    // 日起格式化
    private static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";
    static {
        //对象的所有字段全部列入
        objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
        //取消默认转换timestamps形式
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS,false);
        //忽略空Bean转json的错误
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,false);
        //所有的日期格式都统一为以下的样式,即yyyy-MM-dd HH:mm:ss
        objectMapper.setDateFormat(new SimpleDateFormat(STANDARD_FORMAT));
        //忽略 在json字符串中存在,但是在java对象中不存在对应属性的情况。防止错误
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
    }

    /**
     * 对象转Json格式字符串
     * @param obj 对象
     * @return Json格式字符串
     */
    public static <T> String obj2String(T obj) {
        if (obj == null) {
            return null;
        }
        try {
            return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            log.warn("Parse Object to String error : {}", e.getMessage());
            return null;
        }
    }

    /**
     * 对象转Json格式字符串(格式化的Json字符串)
     * @param obj 对象
     * @return 美化的Json格式字符串
     */
    public static <T> String obj2StringPretty(T obj) {
        if (obj == null) {
            return null;
        }
        try {
            return obj instanceof String ? (String) obj : objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            log.warn("Parse Object to String error : {}", e.getMessage());
            return null;
        }
    }

    /**
     * 字符串转换为自定义对象
     * @param str 要转换的字符串
     * @param clazz 自定义对象的class对象
     * @return 自定义对象
     */
    public static <T> T string2Obj(String str, Class<T> clazz){
        if(StringUtils.isEmpty(str) || clazz == null){
            return null;
        }
        try {
            return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);
        } catch (Exception e) {
            log.warn("Parse String to Object error : {}", e.getMessage());
            return null;
        }
    }

}

测试运行用例
@SpringBootTest
class RabbitmqOrderServiceApplicationTests {

    @Autowired
    OrderService orderService;

    @Autowired
    MQOrderService mqOrderService;

    @Test
    void contextLoads() {
    }

    @Test
    void testMQCreateOrder() throws Exception {
        Order order = new Order(15,1,"今晚吃鸡!",new Date());
        mqOrderService.createOrder(order);
        System.out.println("订单创建成功......");
    }

}
调式

查看数据库数据

order表

message表

由于发送消息给rabbitmq服务还没有响应回执,所以此时的订单状态是"0"

此时的状态修改成了"1",完成了订单消息的投递

11.4 RabbitMQ - 高级 - 可靠消费

测试用例

在rabbitmq-dispatcher-service工程下

utils包下的类
package com.vinjcent.rabbitmq.utils;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.text.SimpleDateFormat;

@Slf4j
public class JsonUtil {
    private static ObjectMapper objectMapper = new ObjectMapper();
    // 日起格式化
    private static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";
    static {
        //对象的所有字段全部列入
        objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
        //取消默认转换timestamps形式
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS,false);
        //忽略空Bean转json的错误
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,false);
        //所有的日期格式都统一为以下的样式,即yyyy-MM-dd HH:mm:ss
        objectMapper.setDateFormat(new SimpleDateFormat(STANDARD_FORMAT));
        //忽略 在json字符串中存在,但是在java对象中不存在对应属性的情况。防止错误
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
    }

    /**
     * 对象转Json格式字符串
     * @param obj 对象
     * @return Json格式字符串
     */
    public static <T> String obj2String(T obj) {
        if (obj == null) {
            return null;
        }
        try {
            return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            log.warn("Parse Object to String error : {}", e.getMessage());
            return null;
        }
    }

    /**
     * 对象转Json格式字符串(格式化的Json字符串)
     * @param obj 对象
     * @return 美化的Json格式字符串
     */
    public static <T> String obj2StringPretty(T obj) {
        if (obj == null) {
            return null;
        }
        try {
            return obj instanceof String ? (String) obj : objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            log.warn("Parse Object to String error : {}", e.getMessage());
            return null;
        }
    }

    /**
     * 字符串转换为自定义对象
     * @param str 要转换的字符串
     * @param clazz 自定义对象的class对象
     * @return 自定义对象
     */
    public static <T> T string2Obj(String str, Class<T> clazz){
        if(StringUtils.isEmpty(str) || clazz == null){
            return null;
        }
        try {
            return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);
        } catch (Exception e) {
            log.warn("Parse String to Object error : {}", e.getMessage());
            return null;
        }
    }

}

mq包下的类
package com.vinjcent.rabbitmq.mq;

import com.rabbitmq.client.Channel;
import com.vinjcent.rabbitmq.pojo.Order;
import com.vinjcent.rabbitmq.service.DispatchService;
import com.vinjcent.rabbitmq.utils.JsonUtil;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@SuppressWarnings("all")
@Service
public class OrderMqConsumer {

    @Autowired
    private DispatchService dispatchService;

    private int count = 1;

    @RabbitListener(queues = {"order.queue"})
    public void messageConsumer(String ordermsg, Channel channel,
                                CorrelationData correlationData,
                                @Header (AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {
        // 1.获取消息队列的消息
        System.out.println("收到的MQ消息是: " + ordermsg + ",count = " + count++);
        // 2.获取订单服务的信息
        Order order = JsonUtil.string2Obj(ordermsg, Order.class);
        // 3.获取订单但id
        Integer orderId = order.getOrderId();
        // 4.保存订单
        dispatchService.dispatch(orderId);

    }

}

运行工程rabbitmq-dispatcher-service

查看控制台从消息队列中获取的消息

拓展

由于在创建订单记录又或者在创建消息记录时,已经将消息推送到队列中了,但是数据库中的数据状态可能没有修改,这时需要创建一个定时任务,每个几分钟或几秒查询数据库的消息记录状态,确保已经推送到队列中的消息在数据库中完成了修改

在rabbitmq-order-service工程下

添加实体类Message.java
package com.vinjcent.rabbitmq.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {

    private Integer orderId;
    private String status;
    private String orderContent;
    private Integer uniqueId;

}
添加定时任务类TaskSchedule.java
package com.vinjcent.rabbitmq.task;

import com.vinjcent.rabbitmq.pojo.Message;
import com.vinjcent.rabbitmq.utils.JsonUtil;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import java.util.List;

@SuppressWarnings("all")
@EnableScheduling
public class TaskSchedule {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 每个5分钟将消息为"0"状态的消息记录,进行查询,然后投递到消息队列中
    @Scheduled(cron = "* 0/5 * * * ?")
    public void sendMessage() {

        // 查询所有为"0"状态的记录
        String sql = "select * from message where status = 0";
        List<Message> messages = jdbcTemplate.queryForList(sql, Message.class);

        // 循环将消息推送到消息队列中
        messages.forEach(message -> {
//            rabbitTemplate.convertAndSend("order_fanout_exchange","", JsonUtil.obj2String(order),new CorrelationData(order.getOrderId().toString()));

        });

    }

}
11.5 RabbitMQ - 高级 - 可靠消费重试机制

由于在接受消息的时候出现了故障,会进入死循环,并触发 RabbitMQ 的重试机制,队列里的消息一直无法应答

解决消息重试的几种方案

控制重发的次数try + catch + 手动acktry + catch + 手动ack + 死信队列处理

控制重发的次数

在rabbitmq-dispatcher-service工程下

修改application.yml文件

server:
  port: 9000

spring:
  # 服务应用名称
  application:
    name: rabbitmq_dispatch
  # 配置数据源
  datasource:
    url: jdbc:mysql://localhost:3306/rabbitmq_dispatch?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
  rabbitmq:
#    单机
#    port: 5673
#    host: 192.168.159.100
    # 集群部署
    addresses: 192.168.159.100:5673,192.168.159.100:5674,192.168.159.100:5675
    username: admin
    password: admin
    virtual-host: /
    # 手动开启ack,让程序去控制MQ的消息和重发、删除、转移
    listener:
      simple:
        # 手动ack机制,默认是none
        acknowledge-mode: manual
        retry:
          # 开启重试模式,默认为false
          enabled: true
          # 最大重试数,默认时3次
          max-attempts: 10
          # 重试间隔时间
          initial-interval: 2000ms
    # 消息确认机制
    publisher-confirm-type: correlated

try + catch + 手动ack

测试用例

在rabbitmq-dispatcher-service工程下

mq包下的类

OrderMqConsumer.java模拟消费信息时出现异常

package com.vinjcent.rabbitmq.mq;

import com.rabbitmq.client.Channel;
import com.vinjcent.rabbitmq.pojo.Order;
import com.vinjcent.rabbitmq.service.DispatchService;
import com.vinjcent.rabbitmq.utils.JsonUtil;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@SuppressWarnings("all")
@Service
public class OrderMqConsumer {

    @Autowired
    private DispatchService dispatchService;

    private int count = 1;

    @RabbitListener(queues = {"order.queue"})
    public void messageConsumer(String ordermsg, Channel channel,
                                CorrelationData correlationData,
                                @Header (AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {
        try {
            // 1.获取消息队列的消息
            System.out.println("收到的MQ消息是: " + ordermsg + ",count = " + count++);
            // 2.获取订单服务的信息
            Order order = JsonUtil.string2Obj(ordermsg, Order.class);
            // 3.获取订单id
            Integer orderId = order.getOrderId();
            // 4.保存订单
            dispatchService.dispatch(orderId);
            System.out.println(1 / 0);  // 模拟异常
            // 关闭ack机制
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 如果出现异常的情况下,则判断为无应答,根据实际的情况去进行重发
            // 参数1: 消息的tag   参数2: false 多条处理   参数3: requeue 重发
            // false 不会重发,false的话会扔掉消息,将消息转移到死信队列,
            // true 会死循环重发,建议如果使用true的话,不要加try/catch,否则会造成死循环,并且在配置文件中设置的重复次数失效
            channel.basicNack(deliveryTag, false, false);
        }

    }

}
在rabbitmq-order-service工程下向队列发送一条消息
@SpringBootTest
class RabbitmqOrderServiceApplicationTests {

    @Autowired
    OrderService orderService;

    @Autowired
    MqOrderService mqOrderService;

    @Test
    void contextLoads() {
    }

    @Test
    void testMQCreateOrder() throws Exception {
        Order order = new Order(15,1,"今晚吃鸡!",new Date());
        mqOrderService.createOrder(order);
        System.out.println("订单创建成功......");
    }

}

原初始队列的数据

发送消息后的记录

运行rabbitmq-dispatcher-service工程

虽然在接收完消息之后有算数异常,但还是进行了一次队列里的信息消费,不会遇到异常回滚。如果在接收完消息之后,对数据进行 *** 作,可能会导致数据的不一致性、不准确性

try + catch + 手动ack + 死信队列处理

用例测试

在rabbitmq-order-service工程下的mqConfig包添加配置类
package com.vinjcent.rabbitmq.mqConfig;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

@Configuration
public class RabbitMQConfiguration {

    // 死信队列
    @Bean
    public FanoutExchange deadExchange(){
        return new FanoutExchange("dead_order_fanout_exchange",true,false);
    }

    @Bean
    public Queue deadOrderQueue(){
        return new Queue("dead.order.queue", true);
    }

    @Bean
    public Binding bindingDeadOrder() {
        return BindingBuilder.bind(deadOrderQueue()).to(deadExchange());
    }

    // 将订单队列与死信队列绑定
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("order_fanout_exchange",true,false);
    }

    @Bean
    public Queue orderQueue(){
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dead_order_fanout_exchange");
        return new Queue("order.queue", true, false, false, args);
    }

    @Bean
    public Binding bindingOrder() {
        return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
    }

}

在rabbitmq-dispatcher-service工程下

处理死信队列里的消息
package com.vinjcent.rabbitmq.mq;

import com.rabbitmq.client.Channel;
import com.vinjcent.rabbitmq.pojo.Order;
import com.vinjcent.rabbitmq.service.DispatchService;
import com.vinjcent.rabbitmq.utils.JsonUtil;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@SuppressWarnings("all")
@Service
public class DeadMqConsumer {

    @Autowired
    private DispatchService dispatchService;

    private int count = 1;

    @RabbitListener(queues = {"dead.order.queue"})
    public void messageConsumer(String ordermsg, Channel channel,
                                CorrelationData correlationData,
                                @Header (AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {
        try {
            // 1.获取消息队列的消息
            System.out.println("收到的MQ消息是: " + ordermsg);
            // 2.获取订单服务的信息
            Order order = JsonUtil.string2Obj(ordermsg, Order.class);
            // 3.获取订单id
            Integer orderId = order.getOrderId();
            // 4.这里原本已经将消息存入数据库当中,应该是执行更新 *** 作!需要换成update
            dispatchService.dispatch(orderId);
            // 关闭ack机制
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            System.out.println("人工干预");
            System.out.println("发短信预警");
            System.out.println("同时把消息转移到别的存储DB");
            // 移除死信队列中的消息
            channel.basicNack(deliveryTag,false, false);

        }

    }

}

在rabbitmq-order-service工程下向队列发送一条消息

debug测试运行过程

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/2990099.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-09-23
下一篇 2022-09-23

发表评论

登录后才能评论

评论列表(0条)

保存