RabbitMQ-高级

RabbitMQ-高级,第1张

RabbitMQ-高级 rabbitMq学习笔记

提示:边看视频边做的笔记,可能有错别字,请见谅。视频地址:https://www.bilibili.com/video/BV1dX4y1V73G?p=38&spm_id_from=pageDriver


文章目录

rabbitMq学习笔记

21、RabbitMQ-高级-集群

1、RabbitMQ 集群2、集群搭建3、单机多实例搭建

3.1、**第一步**:启动第一个节点rabbit-13.2、启动第二个节点rabbit-23.3、验证启动 “ps aux|grep rabbitmq”3.4、rabbit-1 *** 作作为主节点3.5、rabbit2 *** 作为从节点3.6、验证集群状态3.7、多主集群转发部署模式3.8、Web监控 4、集群搭建之docker5、多机部署 21、RabbitMQ-高级-分布式事务

1、简述2、、分布式事务的方式

2.1、两阶段提交(2PC)需要数据库产商的支持,java组件有atomikos等。2.2、补偿事务(TCC) 严选,阿里,蚂蚁金服。2.3、本地消息表(异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式2.4、MQ 事务消息 异步场景,通用性较强,拓展性较高。 3、总结4、具体实现

4.1、系统与系统之间的分布式事务问题4.2、系统间调用过程中事务回滚问题(需要按上面 *** 作进行了rabbitMq集群搭建)4.3、基于MQ的分布式事务整体设计思路4.4、基于MQ的分布式事务消息的可靠生产问题4.5、基于MQ的分布式事务消息的可靠生产问题-定时重发4.6、基于MQ的分布式事务消息的可靠消费4.7、基于MQ的分布式事务消息的消息重发4.8、基于MQ的分布式事务消息的死信队列消息转移 + 人工处理 5、总结

5.1、基于MQ的分布式事务解决方案优点:5.2、基于MQ的分布式事务解决方案缺点:5.3、建议 22、Springboot整合rabbitmq集群配置详解23、RabbitMQ-集群监控-参考24、RabbitMQ面试题分析


提示:rabbitmq初级知识笔记链接地址:https://blog.csdn.net/weixin_43947102/article/details/122533426

21、RabbitMQ-高级-集群 1、RabbitMQ 集群

RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。在实际使用过程中多采取多机多实例部署方式,为了便于同学们练习搭建,有时候你不得不在一台机器上去搭建一个rabbitmq集群,本章主要针对单机多实例这种方式来进行开展。

主要参考官方文档:https://www.rabbitmq.com/clustering.html

2、集群搭建

配置的前提是你的rabbitmq可以运行起来,比如”ps aux|grep rabbitmq”你能看到相关进程,又比如运行“rabbitmqctl status”你可以看到类似如下信息,而不报错:

执行下面命令进行查看:

ps aux|grep rabbitmq

​ 或者

systemctl status rabbitmq-server

注意:确保RabbitMQ可以运行的,确保完成之后,把单机版的RabbitMQ服务停止,后台看不到RabbitMQ的进程为止

3、单机多实例搭建

**场景:**假设有两个rabbitmq节点,分别为rabbit-1, rabbit-2,rabbit-1作为主节点,rabbit-2作为从节点。启动命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached结束命令:rabbitmqctl -n rabbit-1 stop 3.1、第一步:启动第一个节点rabbit-1

RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start 

至此节点rabbit-1启动完成。

3.2、启动第二个节点rabbit-2
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start 

注意:web管理插件端口占用,所以还要指定其web插件占用的端口号
RABBITMQ_SERVER_START_ARGS=”-rabbitmq_management listener [{port,15673}]”

至此节点rabbit-2启动完成

3.3、验证启动 “ps aux|grep rabbitmq”

3.4、rabbit-1 *** 作作为主节点
#停止应用
> sudo rabbitmqctl -n rabbit-1 stop_app
#目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
> sudo rabbitmqctl -n rabbit-1 reset
#启动应用
> sudo rabbitmqctl -n rabbit-1 start_app

3.5、rabbit2 *** 作为从节点
# 停止应用
> sudo rabbitmqctl -n rabbit-2 stop_app
# 目的是清除节点上的历史数据(如果不清除,无法将节点加入到集群)
> sudo rabbitmqctl -n rabbit-2 reset
# 将rabbit2节点加入到rabbit1(主节点)集群当中【Server-node服务器的主机名】  可能出现以下图片上的问题
> sudo rabbitmqctl -n rabbit-2 join_cluster rabbit-1@'Server-node'
# 上述如果抛出问题执行修改文件的权限
chmod 600 /var/lib/rabbitmq/.erlang.cookie
# 启动应用
> sudo rabbitmqctl -n rabbit-2 start_app

下面问题,也能用上面的方式解决:

3.6、验证集群状态
rabbitmqctl cluster_status -n rabbit-1

3.7、多主集群转发部署模式

3.8、Web监控

注意在访问的时候:web结面的管理需要给15672 node-1 和15673的node-2 设置用户名和密码。如下:

rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"

通过web界面进行查看

进行主节点的测试,在主节点的web页面创建交换机,队列,绑定关系能否在从节点中查看到信息。

进行从节点的测试,在从节点的web页面创建交换机,队列,绑定关系能否在主节点中查看到信息。

尝试关闭从节点,发现数据不会丢失,消息也不会丢失,主节点还能够继续访问 *** 作;

关闭命令:

rabbitmqctl -n rabbit-1 stop_app

当关闭了主节点,发现数据不会丢失,消息也不会丢失,其余从节点无法继续 *** 作。

启动命令:

RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start
4、集群搭建之docker

两台虚拟机安装rabbitMq(安装参考上文:安装rabbitMq),如果是docker启动多台rabbitMq服务,就按下面的来。

配置的前提是你的rabbitmq可以运行起来,比如”docker ps ”你能看到相关进程。

**场景:**假设有两个rabbitmq节点,分别为myrabbit, myrabbit_1,myrabbit作为主节点,myrabbit_1作为从节点。

5、多机部署

Tips:
如果采用多机部署方式,需读取其中一个节点的cookie, 并复制到其他节点(节点之间通过cookie确定相互是否可通信)。cookie存放在/var/lib/rabbitmq/.erlang.cookie。
例如:主机名分别为rabbit-1、rabbit-2
1、逐个启动各节点
2、配置各节点的hosts文件( vim /etc/hosts)
​ ip1:rabbit-1
​ ip2:rabbit-2
其它步骤雷同单机部署方式

21、RabbitMQ-高级-分布式事务 1、简述

分布式事务指事务的 *** 作位于不同的节点上,需要保证事务的 AICD 特性。例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。 2、、分布式事务的方式

在分布式系统中,要实现分布式事务,无外乎那几种解决方案。

2.1、两阶段提交(2PC)需要数据库产商的支持,java组件有atomikos等。

    两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。

    准备阶段

    协调者询问参与者事务是否执行成功,参与者发回事务执行结果。

    提交阶段

    如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。

    需要注意的是,在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。

    存在的问题

    同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它 *** 作。单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它 *** 作。数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

2.2、补偿事务(TCC) 严选,阿里,蚂蚁金服。
    TCC 其实就是采用的补偿机制,其核心思想是:针对每个 *** 作,都要注册一个与其对应的确认和补偿(撤销) *** 作。它分为三个阶段:

    Try 阶段主要是对业务系统做检测及资源预留Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 /confirm/i阶段时,默认 - - - /confirm/i阶段是不会出错的。即:只要Try成功,/confirm/i一定成功。Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

举个例子,假入 Bob 要向 Smith 转账,思路大概是: 我们有一个本地方法,里面依次调用
1:首先在 Try 阶段,要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。
2:在 Confirm 阶段,执行远程调用的转账的 *** 作,转账成功进行解冻。
3:如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法 (Cancel)。

    优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。
2.3、本地消息表(异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式

    本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的 *** 作满足事务特性,并且使用了消息队列来保证最终一致性。

    在分布式事务 *** 作的一方完成写业务数据的 *** 作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。

    之后将本地消息表中的消息转发到 Kafka 等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。

    在分布式事务 *** 作的另一方从消息队列中读取一个消息,并执行消息中的 *** 作。

    优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。

    缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

2.4、MQ 事务消息 异步场景,通用性较强,拓展性较高。

    有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 Kafka 不支持。

    以阿里的 RabbitMQ 中间件为例,其思路大致为:

    第一阶段Prepared消息,会拿到消息的地址。 第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。

    也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

    优点: 实现了最终一致性,不需要依赖本地数据库事务。

    缺点: 实现难度大,主流MQ不支持,RocketMQ事务消息部分代码也未开源。

3、总结

总结并对比了几种分布式分解方案的优缺点,分布式事务本身是一个技术难题,是没有一种完美的方案应对所有场景的,具体还是要根据业务场景去抉择吧。阿里RocketMQ去实现的分布式事务,现在也有除了很多分布式事务的协调器,比如LCN等,大家可以多去尝试。 4、具体实现

分布式事务的完整架构图

美团架构图:

4.1、系统与系统之间的分布式事务问题

4.2、系统间调用过程中事务回滚问题(需要按上面 *** 作进行了rabbitMq集群搭建)

    创建两个springboot工程,raabbitmq-order-service(订单服务)、raabbitmq-dispatcher-service(订单配送服务),都导入以下依

    
    
        4.0.0
    
        rabbitmq.order
        orderService
        1.0-SNAPSHOT
    
        
            org.springframework.boot
            spring-boot-starter-parent
            2.5.3
             
        
    
    
        
    
            
                org.springframework.boot
                spring-boot-starter-amqp
            
    
            
                org.springframework.boot
                spring-boot-starter-web
            
    
            
                org.springframework.boot
                spring-boot-starter-jdbc
            
    
            
                org.projectlombok
                lombok
            
    
            
            
                org.springframework.boot
                spring-boot-devtools
                true
            
    
            
                org.springframework.boot
                spring-boot-starter-test
            
    
            
            
                org.springframework.boot
                spring-boot-configuration-processor
                true
            
            
            
                org.codehaus.jackson
                jackson-mapper-asl
                1.9.13
            
    
            
                com.fasterxml.jackson.dataformat
                jackson-dataformat-avro
            
    
            
                org.apache.commons
                commons-lang3
                3.6
            
    
            
                mysql
                mysql-connector-java
                5.1.47
            
        
    
    

    创建数据库,数据库表字段如下

    编写raabbitmq-order-service(订单服务)、raabbitmq-dispatcher-service(订单配送服务)配置yaml

    # raabbitmq-order-service(订单服务)配置
    server:
      port: 8090
    spring:
      datasource:
        url: jdbc:mysql://localhost:3306/test2
        username: root
        password: root
        driver-class-name: com.mysql.jdbc.Driver
      #rabbitmq的设置
      rabbitmq:
        #单击的连接方式
    #    host: 169.254.118.178
    #    port: 15672
        username: admin
        password: admin
        virtual-host: /
        #这里是开启手动ack,让程序去控制mq的消息的重发和删除和转移
        listener:
          simple:
            acknowledge-mode: manual
            retry:
              enabled: true #开启重试
              max-attempts: 10 #最大重试次数
              initial-interval: 200000ms #重试间隔时间
        #集群的连接方式  多个的话就算逗号隔开 ip:端口的机制
        addresses: 169.254.118.178:5672
    logging:
      level:
        root: debug
        
    #########################################################
    #raabbitmq-dispatcher-service(订单配送服务)配置
    server:
      port: 9000
    spring:
      datasource:
        #与订单服务的 *** 作的数据库不同
        url: jdbc:mysql://localhost:3306/test
        username: root
        password: root
        driver-class-name: com.mysql.jdbc.Driver
      #rabbitmq的设置
      rabbitmq:
        #单击的连接方式
    #    host: 169.254.118.178
    #    port: 15672
        username: admin
        password: admin
        virtual-host: /
        #这里是开启手动ack,让程序去控制mq的消息的重发和删除和转移
        listener:
          simple:
            acknowledge-mode: manual
            retry:
              enabled: true #开启重试
              max-attempts: 10 #最大重试次数
              initial-interval: 2000ms #重试间隔时间
        #集群的连接方式  多个的话就算逗号隔开 ip:端口的机制
        addresses: 169.254.118.178:5672
    #logging:
    #  level:
    #    root: debug
    

    在raabbitmq-order-service(订单服务)编写order实体类

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Order {
        public String orderId;
        public Integer userId;
        public String orderContent;
        public Date createTime;
    
    }
    

    在raabbitmq-order-service(订单服务),编写业务代码

    @Component
    @Transactional(rollbackFor = Exception.class)
    public class OrderDatabaseService {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        
        public void saveOder(Order order) throws Exception {
            String sql = "insert into rabbitmq_order(order_id,user_id,order_content) values(?,?,?)";
            int count = jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getOrderContent());
            if (count!=1){
                throw new Exception("订单创建失败~");
            }
            //因为下订单可能会rabbit会出现宕机,就会引发消息是没有放入mq,为了消息可靠生产,对消息做一次冗余
        }
    }
    
    @Service
    public class OrderService {
    
        @Autowired
        private OrderDatabaseService orderDatabaseService;
    
        @Transactional(rollbackFor = Exception.class) //创建订单整个方法添加事务
        public void createOder(Order order) throws Exception {
            //加了事务 插入成功不会立即放入数据库中  要等下面待执行完成后,且没有异常后在插入  要么一起成功,要么一起失败,保证数据了的一致性
            orderDatabaseService.saveOder(order);
    
            //通过http接口发送订单信息到运单系统  但是当前事务不能保证远程调用的接口的事务  这里会超时报错,会进行回滚,但是对于配送的订单的保存,不是在当前服务中,在另一个服务上,只睡了3秒钟而已,数据还是会进行保存
            String result = dispatchHttpApi(order.getOrderId());
            if (!"success".equals(result)){
                throw new Exception("订单创建失败,原因是运单配送接口调用失败!");
            }
        }
    
        public String dispatchHttpApi(String orderId){
            SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
            //连接超时 > 3  为了演示分布式事务的问题,远程调用的接口睡眠了3秒钟
            factory.setConnectTimeout(3000);
            //处理超时 > 2
            factory.setReadTimeout(2000);
            //发送http请求
            String url = "http://localhost:9000/dispatch/order?orderId="+orderId;
            RestTemplate restTemplate = new RestTemplate(factory);
            String result = restTemplate.getForObject(url, String.class);
            return result;
        }
    }
    

    raabbitmq-dispatcher-service(订单配送服务)的业务编写

    @Service
    public class DispatchService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        
        public void dispatch(String orderId) throws Exception {
            String sql = "insert into rabbitmq_dispather_order(order_id,dispatch_id,status,order_content,user_id) values (?,?,?,?,?)";
            if (orderId.equals("1000001")){
                try {
                    //模拟业务需要处理3秒钟,但是前面远程调用只允许3秒钟的连接,会抛出异常
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            int count = jdbcTemplate.update(sql, orderId, UUID.randomUUID().toString(), 0, "木子",1);
            if (count != 1){
                throw new Exception("运单接收失败");
            }
        }
    }
    
    @RestController
    @RequestMapping(value = "dispatch")
    public class DispatchController {
        @Autowired
        private DispatchService dispatchService;
    
        //添加订单后,添加配送信息
        @GetMapping("/order")
        public String lock(String orderId) throws Exception {
            dispatchService.dispatch(orderId);
            return "success";
        }
    }
    
    //启动类
    @SpringBootApplication
    public class RabbitmqDispatcherAffair {
        public static void main(String[] args) {
            SpringApplication.run(RabbitmqDispatcherAffair.class,args);
        }
    }
    

    在raabbitmq-order-service(订单服务),编写测试代码

    @SpringBootTest
    public class RabbitmqAffairTest {
    
        @Autowired
        private OrderService orderService;
        @Test
        public void orderCreated() throws Exception {
            String orderId = "1000001";
            Order order = new Order();
            order.setOrderId(orderId);
            order.setUserId(1);
            order.setOrderContent("买了一个方便面");
            orderService.createOder(order);
            System.out.println("订单创建成功");
        }
    }
    

    启动raabbitmq-dispatcher-service(订单配送服务)的启动类,运行raabbitmq-order-service(订单服务)的测试类,查看结果。

    // 抛出如下异常  连接超时 查看数据库的数据
    org.springframework.web.client.ResourceAccessException: I/O error on GET request for "http://localhost:9000/dispatch/order": Read timed out; nested exception is java.net.SocketTimeoutException: Read timed out
    

    结论:不同服务间的事务不能一起回滚,存在数据不一致问题,导致配送服务有数据,订单服务没数据。

4.3、基于MQ的分布式事务整体设计思路

4.4、基于MQ的分布式事务消息的可靠生产问题

    创建一个冗余表,字段如下

    在raabbitmq-order-service(订单服务)下,编写消息发送到队列和监听队列收到消息的更改状态的业务类

    @Service
    public class OrderMQService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        
        @PostConstruct
        public void regCallback(){
            //消息发送成功以后,基于生产者的消息回执,来确保生产者的可靠性
            rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
                @Override
                public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("cause:"+cause);
                    //如果ack为true代表消息已经收到  获取消息
                    String orderId = correlationData.getId();
                    if (!ack){
                        System.out.println("MQ队列应答失败,orderId是:"+orderId);
                        return;
                    }
                    try {
                        String sql = "update rabbitmq_order_message set status = 1 where order_id = ?";
                        int update = jdbcTemplate.update(sql, orderId);
                        if (update == 1){
                            System.out.println("本地消息状态修改成功,消息成功投递到消息队列中...");
                        }
                    }catch (Exception e) {
                        System.out.println("本地消息出现异常:"+e.getMessage());
                    }
                }
            });
        }
        public void sendMessage(Order order) {
            //通过MQ发送消息 交换机要提前创建好,在创建order.queue队列,绑定关系 以OrderId为数据 fanout模式不需要路由key
            rabbitTemplate.convertAndSend("order_fanout_exchange","", JsonUtil.obj2String(order), new CorrelationData(order.getOrderId()));
        }
    }
    

    jsonUtlis工具类

    public class JsonUtil {
    
        private static ObjectMapper objectMapper = new ObjectMapper();
    
        private static final String  STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";
    
        static {
            // 对象字段全部列入
            objectMapper.setSerializationInclusion(Inclusion.NON_DEFAULT);
    
            // 取消默认转换timestamps形式
            objectMapper.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS,false);
    
            // 忽略空bean转json的错误
            objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS,false);
    
            // 统一日期格式yyyy-MM-dd HH:mm:ss
            objectMapper.setDateFormat(new SimpleDateFormat(STANDARD_FORMAT));
    
            // 忽略在json字符串中存在,但是在java对象中不存在对应属性的情况
            objectMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES,false);
        }
    
        
        public static  String obj2String(T obj){
            if (obj == null){
                return null;
            }
            try {
                return obj instanceof String ? (String) obj : objectMapper.writevalueAsString(obj);
            } catch (Exception e) {
                System.out.println("Parse object to String error");
                e.printStackTrace();
                return null;
            }
        }
    
        
        public static  String obj2StringPretty(T obj){
            if (obj == null){
                return null;
            }
            try {
                return obj instanceof String ? (String) obj : objectMapper.writerWithDefaultPrettyPrinter().writevalueAsString(obj);
            } catch (Exception e) {
                System.out.println("Parse object to String error");
                e.printStackTrace();
                return null;
            }
        }
    
        
        public static  T string2Obj(String str,Class clazz){
            if (StringUtils.isEmpty(str) || clazz == null){
                return null;
            }
            try {
                return clazz.equals(String.class)? (T) str :objectMapper.readValue(str,clazz);
            } catch (IOException e) {
                System.out.println("Parse String to Object error");
                e.printStackTrace();
                return null;
            }
        }
    
        
        public static  T string2Obj(String str, TypeReference typeReference){
            if (StringUtils.isEmpty(str) || typeReference == null){
                return null;
            }
            try {
                return (T)(typeReference.getType().equals(String.class)? str :objectMapper.readValue(str,typeReference));
            } catch (IOException e) {
                System.out.println("Parse String to Object error");
                e.printStackTrace();
                return null;
            }
        }
    
        
        public static  T string2Obj(String str,Class collectionClass,Class... elementClasses){
            JavaType javaType = objectMapper.getTypeFactory().constructParametricType(collectionClass,elementClasses);
            try {
                return objectMapper.readValue(str,javaType);
            } catch (IOException e) {
                System.out.println("Parse String to Object error");
                e.printStackTrace();
                return null;
            }
        }
    }
    

    在raabbitmq-order-service(订单服务)下,编写业务类

    //OrderDatabaseService 下添加如下方法
    public void saveLocalMessage(Order order) throws Exception {
        String sql = "insert into rabbitmq_order_message(order_id,order_content,status,unique_id) values(?,?,?,?)";
        int count = jdbcTemplate.update(sql, order.getOrderId(), order.getOrderContent(), 0, 1);
        if (count != 1){
            throw new Exception("出现异常,原因数据库 *** 作失败");
        }
    }
    
    @Service
    public class MQOrderService {
        @Autowired
        private OrderDatabaseService orderDatabaseService;
    
        @Autowired
        private OrderMQService orderMQService;
    
        
        public void createOrder(Order order) throws Exception {
            //1、订单信息--插入订单系统,订单数据库事务
            orderDatabaseService.saveOder(order);
            //2、通过http接口发送订单信息到运单系统
            orderMQService.sendMessage(order);
        }
    }
    

    编写测试类

    @Autowired
    private MQOrderService mqOrderService;
    //消息队列的方式进行派单发送
    @Test
    public void orderMQCreated() throws Exception {
        String orderId = "1000001";
        Order order = new Order();
        order.setOrderId(orderId);
        order.setUserId(1);
        order.setOrderContent("买了一个方便面");
        mqOrderService.createOrder(order);
        System.out.println("订单创建成功");
    }
    

    运行测试类查看效果。

    结论:消息可靠生产,就看数据冗余表中的状态是否为1,如果没有可靠生产,就需要进行重新发送。

    如果这个时候MQ服务器出现了异常和故障,那么消息是无法获取到回执信息。怎么解决呢?

    使用定时重发。

4.5、基于MQ的分布式事务消息的可靠生产问题-定时重发

    在raabbitmq-order-service(订单服务)下,编写定时任务发送消息

    @Configuration
    @EnableScheduling
    public class MQTimingTask {
    
        @Autowired
        private OrderMQService orderMQService;
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        // 参考连接:https://www.cnblogs.com/mmzs/p/10161936.html 23点执行一次:0 0 23 * * ?
        @Scheduled(cron = "0 0 23 * * ?")
        private void mqConfigureTasks(){
            System.out.println("定时任务开始重发");
            //查询冗余表中 状态为0的任务
            String sql = "select * from rabbitmq_order_message where status != 1";
            List orders = jdbcTemplate.queryForList(sql, Order.class);
            orders.forEach((order)->{
                //进行重新发送 消息到队列中
                orderMQService.sendMessage(order);
            });
        }
    }
    
4.6、基于MQ的分布式事务消息的可靠消费

    在raabbitmq-dispatcher-service(订单配送服务)下,关闭rabbitmq手动应答ack配置和重试次数。

    在raabbitmq-dispatcher-service(订单配送服务)下,编写消息接收

    @Component
    public class OrderMqConsumer {
    
        @Autowired
        private DispatchService dispatchService;
        private int count = 1;
    
        @RabbitListener(queues = {"order.queue"})
        public void messageConsumer(String ordermsg , Channel channel,
                                    CorrelationData correlationData,
                                    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
                //1、获取消息队列的消息
                System.out.println("收到的MQ的消息"+ordermsg+",count = "+count++);
                //工具类解析消息内容
                Order order = JsonUtil.string2Obj(ordermsg, Order.class);
                //获取订单id
                String orderId = order.getOrderId();
                //进行订单运单保存
                dispatchService.dispatch(orderId);
        }
    }
    

    启动 raabbitmq-dispatcher-service(订单配送服务),断点到消息监听,队列的消息都会被依次消费。

    结论:生成者,生成一个订单,冗余表生成一个冗余数据,在发送消息到队列中,由消费者监听,形成了一个完整的闭环。

    如果消费者中出现了异常,会导致死循环。

    @Component
    public class OrderMqConsumer {
    
        @Autowired
        private DispatchService dispatchService;
        //记录循环了多少次
        private int count = 1;
    
        @RabbitListener(queues = {"order.queue"})
        public void messageConsumer(String ordermsg , Channel channel,
                                    CorrelationData correlationData,
                                    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
                //1、获取消息队列的消息
                System.out.println("收到的MQ的消息"+ordermsg+",count = "+count++);
                //工具类解析消息内容
                Order order = JsonUtil.string2Obj(ordermsg, Order.class);
                //获取订单id
                String orderId = order.getOrderId();
                
                  System.out.println(1/0);
                //进行订单运单保存
                dispatchService.dispatch(orderId);
        }
    }
    

    启动raabbitmq-dispatcher-service(订单配送服务),使用raabbitmq-order-service(订单服务)的orderMQCreated()测试方法进行测试,查看控制台。

4.7、基于MQ的分布式事务消息的消息重发

    手动控制重发次数(当大于重发次数后,队列中的消息会进行移除,造成下消息的丢失问题),raabbitmq-dispatcher-service(配送服务)配置文件修改如下,在进行重启查看次数

    因为开启了 acknowledge-mode: manual :这里是开启手动ack,消息不会被删除,而是成为了未确认的消息。默认值为none,代表自动应答ack,抛出异常,会自动移除消息。

    try+catch+手动ack(配置的重置次数就毫无意义了),raabbitmq-dispatcher-service(配送服务)修改队列消费者,重启raabbitmq-dispatcher-service(配送服务),查看结果

    @Component
    public class OrderMqConsumer {
    
        @Autowired
        private DispatchService dispatchService;
        private int count = 1;
    
        @RabbitListener(queues = {"order.queue"})
        public void messageConsumer(String ordermsg , Channel channel,
                                    CorrelationData correlationData,
                                    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
            try {
                //1、获取消息队列的消息
                System.out.println("收到的MQ的消息"+ordermsg+",count = "+count++);
                //工具类解析消息内容
                Order order = JsonUtil.string2Obj(ordermsg, Order.class);
                //获取订单id
                String orderId = order.getOrderId();
                
                //进行订单运单保存
                dispatchService.dispatch(orderId);
                System.out.println(1/0);
                //tag是队列的标号 唯一标识 手动ack告诉mq消息已经正常消费
                channel.basicAck(tag,false);
            }catch (Exception e){
                
                channel.basicNack(tag, false, false);
                
            }
        }
    }
    

4.8、基于MQ的分布式事务消息的死信队列消息转移 + 人工处理

    删除原有的交换机和队列。

    在raabbitmq-order-service(订单服务)下,编写rabbitmq的配置文件,其余同上

    @Configuration
    public class RabbitMQConfiguration {
    
        
        @Bean
        public FanoutExchange deadExchange(){
            return new FanoutExchange("dead_order_fanout_exchange",true,false);
        }
    
        
        @Bean
        public Queue deadOrderQueue(){
            return new Queue("dead.order.queue", true);
        }
    
        
        @Bean
        public Binding bindingDeadOrder() {
            return BindingBuilder.bind(deadOrderQueue()).to(deadExchange());
    
        }
    
        
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("order_fanout_exchange",true,false);
        }
    
        
        @Bean
        public Queue orderQueue(){
            Map map = new HashMap<>();
            map.put("x-dead-letter-exchange", "dead_order_fanout_exchange");
            return new Queue("order.queue", true,false,false,map);
        }
    
        
        @Bean
        public Binding bindingOrder(){
            return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
        }
    
    }
    

    使用raabbitmq-order-service(订单服务)的orderMQCreated()测试方法进行测试,重启raabbitmq-dispatcher-service(配送服务),查看控制台和web界面。

    监听死信队列,对死信队列中的消息进行处理,raabbitmq-dispatcher-service(配送服务)编写死信队列的消费者

    @Component
    public class DeadOrderMqConsumer {
    
        @Autowired
        private DispatchService dispatchService;
    
        @RabbitListener(queues = {"dead.order.queue"})
        public void messageConsumer(String ordermsg , Channel channel,
                                    CorrelationData correlationData,
                                    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
            try {
                //1、获取消息队列的消息
                System.out.println("收到的MQ的消息"+ordermsg);
                //工具类解析消息内容
                Order order = JsonUtil.string2Obj(ordermsg, Order.class);
                //获取订单id
                String orderId = order.getOrderId();
                //进行订单运单保存   这里会出现幂等性问题,数据库的数据会出现相同数据,OrderMqConsumer之前有存储过可以通过数据唯一值、或者查询当前订单是否存在,在决定是更新还是插入
                dispatchService.dispatch(orderId);
                //手动ack告诉mq消息已经正常消费
                channel.basicAck(tag,false);
            }catch (Exception e){
                System.out.println("进行通知");
                System.out.println("短信通知");
                System.out.println("邮件通知");
                channel.basicNack(tag, false, false);
            }
        }
    }
    

    raabbitmq-order-service(订单服务)的orderMQCreated()测试方法进行测试,重启raabbitmq-dispatcher-service(配送服务),查看控制台和web界面和数据库变化。

    总体流程:由raabbitmq-order-service(订单服务)产生了一条订单,订单也会存储到冗余表中(记录队列是否接收消息成功,默认未成功),在发送消息到队列,队列监听到消息后,将冗余表的状态改为成功接收到消息,如果rabbitmq中途出现问题,未接收到消息,会有定时任务将状态为0的消息进行再次发送,消息投递成功后,由raabbitmq-dispatcher-service(配送服务)监听消费消息,但当前服务会出现错误,会将消息放到死信队列中,在由监听死信队列的消费者进行最终的消费(需要注意幂等性问题)。

    如果监听的死信队列都出错,只能进行通知,或者存到数据库,进行人工干预处理。

5、总结 5.1、基于MQ的分布式事务解决方案优点:
    通用性强拓展方便耦合度低,方案也比较成熟
5.2、基于MQ的分布式事务解决方案缺点:
    基于消息中间件,只适合异步场景消息会延迟处理,需要业务上能够容忍(有中间件的 *** 作,有网络延迟)
5.3、建议
    尽量去避免分布式事务尽量将非核心业务做成异步
22、Springboot整合rabbitmq集群配置详解

    详细配置如下

     rabbitmq:
        addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
    #    port:
        ##集群配置 addresses之间用逗号隔开
        # addresses: ip:port,ip:port
        password: admin
        username: 123456
        virtual-host: / # 连接到rabbitMQ的vhost
        requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s
        publisher-/confirm/is: #是否启用 发布确认
        publisher-reurns: # 是否启用发布返回
        connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时
        cache:
          channel.size: # 缓存中保持的channel数量
          channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
          connection.size: # 缓存的连接数,只有是CONNECTION模式时生效
          connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION
        listener:
          simple.auto-startup: # 是否启动时自动启动容器
          simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
          simple.concurrency: # 最小的消费者数量
          simple.max-concurrency: # 最大的消费者数量
          simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
          simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
          simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
          simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒
          simple.retry.enabled: # 监听重试是否可用
          simple.retry.max-attempts: # 最大重试次数
          simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
          simple.retry.multiplier: # 应用于上一重试间隔的乘数
          simple.retry.max-interval: # 最大重试时间间隔
          simple.retry.stateless: # 重试是有状态or无状态
        template:
          mandatory: # 启用强制信息;默认false
          receive-timeout: # receive()  *** 作的超时时间
          reply-timeout: # sendAndReceive()  *** 作的超时时间
          retry.enabled: # 发送重试是否可用
          retry.max-attempts: # 最大重试次数
          retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
          retry.multiplier: # 应用于上一重试间隔的乘数
          retry.max-interval: #最大重试时间间隔
    

    对于发送方而言,需要做以下配置:

    配置CachingConnectionFactory配置Exchange/Queue/Binding配置RabbitAdmin创建上一步的Exchange/Queue/Binding配置RabbitTemplate用于发送消息,RabbitTemplate通过CachingConnectionFactory获取到Connection,然后想指定Exchange发送

    对于消费方而言,需要做以下配置:

    配置CachingConnectionFactory配置Exchange/Queue/Binding配置RabbitAdmin创建上一步的Exchange/Queue/Binding配置RabbitListenerContainerFactory配置@RabbitListener/@RabbitHandler用于接收消息

    在默认情况下主要的配置如下:

    Spring AMQP的主要对象

注:如果不了解AMQP请前往官网了解.

    使用:

    通过配置类加载的方式:

    package com.yd.demo.config;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import java.util.HashMap;
    import java.util.Map;
    @Configuration
    public class RabbitConfig {
        private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
        public static final String RECEIVEDLXEXCHANGE="spring-ex";
        public static final String RECEIVEDLXQUEUE="spring-qu1";
        public static final String RECEIVEDLXROUTINGKEY="aa";
        public static final String DIRECTEXCHANGE="spring-ex";
        public static final String MDMQUEUE="mdmQueue";
        public static final String TOPICEXCHANGE="spring-top";
        @Value("${spring.rabbitmq.addresses}")
        private String hosts;
        @Value("${spring.rabbitmq.username}")
        private String userName;
        @Value("${spring.rabbitmq.password}")
        private String password;
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
     
    //    @Value("${rabbit.port}")
    //    private int port;
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
            cachingConnectionFactory.setAddresses(hosts);
            cachingConnectionFactory.setUsername(userName);
            cachingConnectionFactory.setPassword(password);
    //        cachingConnectionFactory.setChannelCacheSize(channelCacheSize);
            //cachingConnectionFactory.setPort(port);
            cachingConnectionFactory.setVirtualHost(virtualHost);
            //设置连接工厂缓存模式:
            cachingConnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
            //缓存连接数
            cachingConnectionFactory.setConnectionCacheSize(3);
            //设置连接限制
            cachingConnectionFactory.setConnectionLimit(6);
            logger.info("连接工厂设置完成,连接地址{}"+hosts);
            logger.info("连接工厂设置完成,连接用户{}"+userName);
            return cachingConnectionFactory;
        }
        @Bean
        public RabbitAdmin rabbitAdmin(){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
            rabbitAdmin.setAutoStartup(true);
            rabbitAdmin.setIgnoreDeclarationExceptions(true);
            rabbitAdmin.declareBinding(bindingMdmQueue());
            //声明topic交换器
            rabbitAdmin.declareExchange(directExchange());
            logger.info("管理员设置完成");
            return rabbitAdmin;
        }
        @Bean
        public RabbitListenerContainerFactory listenerContainerFactory() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory());
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            //最小消费者数量
            factory.setConcurrentConsumers(10);
            //最大消费者数量
            factory.setMaxConcurrentConsumers(10);
            //一个请求最大处理的消息数量
            factory.setPrefetchCount(10);
            //
            factory.setChannelTransacted(true);
            //默认不排队
            factory.setDefaultRequeueRejected(true);
            //手动确认接收到了消息
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            logger.info("监听者设置完成");
            return factory;
        }
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange(DIRECTEXCHANGE,true,false);
        }
        @Bean
        public Queue mdmQueue(){
            Map arguments = new HashMap<>();
            // 绑定该队列到私信交换机
            arguments.put("x-dead-letter-exchange",RECEIVEDLXEXCHANGE);
            arguments.put("x-dead-letter-routing-key",RECEIVEDLXROUTINGKEY);
            logger.info("队列交换机绑定完成");
            return new Queue(RECEIVEDLXQUEUE,true,false,false,arguments);
        }
        @Bean
        Binding bindingMdmQueue() {
            return BindingBuilder.bind(mdmQueue()).to(directExchange()).with("");
        }
        @Bean
        public RabbitTemplate rabbitTemplate(){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
            rabbitTemplate.setMandatory(true);
            //发布确认
    //        rabbitTemplate.setConfirmCallback(/confirm/iCallBackListener);
            // 启用发布返回
    //        rabbitTemplate.setReturnCallback(returnCallBackListener);
            logger.info("连接模板设置完成");
            return rabbitTemplate;
        }
      
      /**
         * @return DirectExchange
         *//*
        @Bean
        public DirectExchange dlxExchange() {
            return new DirectExchange(RECEIVEDLXEXCHANGE,true,false);
        }
    *//*
    *
         * @return Queue
    *//*
        @Bean
        public Queue dlxQueue() {
            return new Queue(RECEIVEDLXQUEUE,true);
        }
    *//*
         * @return Binding
         *//*
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(RECEIVEDLXROUTINGKEY);
        }*/
    }
    

    通过两种方式加载

    通过配置文件通过配置类说明:上面是通过配置文件与配置类的方式去加载,常用的配置如上所示。实际使用中要生产方与消费方要分开配置,相关配置也会有小变动,大体配置不变。更多信息可查看官网配置。

23、RabbitMQ-集群监控-参考

https://www.kuangstudy.com/zl/rabbitmq#1368199762003718146

24、RabbitMQ面试题分析

面试题:1、Rabbitmq 为什么需要信道,为什么不是TCP直接通信

1、TCP的创建和销毁,开销大,创建要三次握手,销毁要4次分手。

2、如果不用信道,那应用程序就会TCP连接到Rabbit服务器,高峰时每秒成千上万连接就会造成资源的巨大浪费,而且==底层 *** 作系统每秒处理tcp连接数也是有限制的,==必定造成性能瓶颈。

3、信道的原理是一条线程一条信道,多条线程多条信道同用一条TCP连接,一条TCP连接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能瓶颈。

面试题:2、queue队列到底在消费者创建还是生产者创建?

1、 一般建议是在rabbitmq *** 作面板创建。这是一种稳妥的做法。
2、按照常理来说,确实应该消费者这边创建是最好,消息的消费是在这边。这样你承受一个后果,可能我生产在生产消息可能会丢失消息。
3、在生产者创建队列也是可以,这样稳妥的方法,消息是不会出现丢失。
4、如果你生产者和消费都创建的队列,谁先启动谁先创建,后面启动就覆盖前面的

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5708836.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存