RabbitMQ 消息可靠篇06

RabbitMQ 消息可靠篇06,第1张

RabbitMQ 消息可靠篇06 功能

1./confirm/iCallback、ReturnsCallback回调
2.事务
3./confirm/i确认模式

1 github:源码地址 2 rabbitmq05 子工程


    4.0.0

    
        com.yzm
        rabbitmq
        0.0.1-SNAPSHOT
        ../pom.xml 
    

    rabbitmq05
    0.0.1-SNAPSHOT
    jar
    rabbitmq05
    Demo project for Spring Boot

    

    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


项目结构

application.yml

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    publisher-/confirm/i-type: correlated
    publisher-returns: true
3 消息回调

配置类

package com.yzm.rabbitmq05.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class RabbitConfig {

    
    @Bean("rabbit")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        //数据转换为json存入消息队列
        template.setMessageConverter(new Jackson2JsonMessageConverter());

        
        template.set/confirm/iCallback(/confirm/iCallback());
        template.setReturnsCallback(returnCallback());
        
        template.setMandatory(true);
        return template;
    }

    public static RabbitTemplate./confirm/iCallback /confirm/iCallback() {
        return new RabbitTemplate./confirm/iCallback() {
            
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                // ack判断消息发送到交换机是否成功
                System.out.println("回调id:" + correlationData.getId());
                if (ack) {
                    // 消息发送成功到达交换机
                    // ...
                    System.out.println("消息成功到达交换机");
                } else {
                    System.out.println("消息到达交换机失败");
                    System.out.println("错误信息:" + cause);
                }
            }
        };
    }

    public static RabbitTemplate.ReturnsCallback returnCallback() {
        return new RabbitTemplate.ReturnsCallback() {
            
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                // 该交换机没有路由键匹配对应的消息队列
                // 如果信息走了该回调,就不会走/confirm/i回调了
                System.out.println("交换机:" + returnedMessage.getExchange());
                System.out.println("路由键:" + returnedMessage.getRoutingKey());
                System.out.println("消息主体 : " + returnedMessage.getMessage());
                System.out.println("回复代码 : " + returnedMessage.getReplyCode());
                System.out.println("描述:" + returnedMessage.getReplyText());
            }
        };
    }
}

生产者

package com.yzm.rabbitmq05.service;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

@Component
public class SenderService {

    @Resource(name = "rabbit")
    private RabbitTemplate rabbitTemplate;

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendA() {
        // 全局唯一
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "Hello world! @yzm";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.convertAndSend("callback.exchange", "callback.a.yzm", message, correlationData);
    }
}

消费者

package com.yzm.rabbitmq05.service;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class ReceiverService {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "callback-a"),
            exchange = @Exchange(value = "callback.exchange", type = ExchangeTypes.TOPIC),
            key = {"callback.a.*", "callback.*.a"}
    ))
    public void receiveA(Message message) {
        System.out.println(" [ 消费者@A号 ] Received ==> '" + new String(message.getBody()) + "'");
    }
}

运行结果:

  • 消息正确到达交换机触发回调

修改生产者、指定一个不存在的交换机

	@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendA() {
        // 全局唯一
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "Hello world! @yzm";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.convertAndSend("yzm.callback.exchange", "callback.a.yzm", message, correlationData);
    }

运行结果:

  • 消息找不到交换机触发回调

修改生产者,指定一个不存在的路由键

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendA() {
        // 全局唯一
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "Hello world! @yzm";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.convertAndSend("callback.exchange", "callback.yzm.yzm", message, correlationData);
    }

运行结果:

  • 消息路由失败触发回调

路由失败还可以通过添加监听器处理
修改配置类

@Configuration
@EnableScheduling
public class RabbitConfig {

    @Bean(name = "channel")
    public Channel channel(ConnectionFactory connectionFactory) {
    	// true,启动事务;
        return connectionFactory.createConnection().createChannel(false);
    }
	...
}

生产者

package com.yzm.rabbitmq05.service;

import com.rabbitmq.client.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.UUID;

@Component
public class SenderService {

    @Resource(name = "rabbit")
    private RabbitTemplate rabbitTemplate;
    @Resource(name = "channel")
    private Channel channel;


    //    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendA() {
        // 全局唯一
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        String message = "Hello world! @yzm";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        rabbitTemplate.convertAndSend("callback.exchange", "callback.yzm.yzm", message, correlationData);
    }

	
    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendB() throws IOException {
        //消息不可达,回调
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText,
                                     String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("replyCode:" + replyCode);
                System.out.println("replyText:" + replyText);
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        });

        String message = "Hello world! @yzm";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        
        channel.basicPublish("callback.exchange", "callback.yzm.yzm", true,
                MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    }
}

运行结果:

4 事务

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
channel.txSelect()声明启动事务模式;
channel.txComment()提交事务;
channel.txRollback()回滚事务;

事务createChannel(true)跟publisher-/confirm/i-type: correlated不能同时设置
修改配置

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    publisher-/confirm/i-type: none
    publisher-returns: true

	@Bean(name = "channel")
    public Channel channel(ConnectionFactory connectionFactory) {
        // true,启动事务;
        return connectionFactory.createConnection().createChannel(true);
    }

生产者

	@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendC() throws IOException {
        String message = "Hello world! @yzm";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        try {
            // 声明事务
            channel.txSelect();
            // 发送消息
            channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            //int i = 1/0;
            // 提交事务
            channel.txCommit();
        } catch (Exception e) {
            // 事务回滚
            channel.txRollback();
        }
    }

消费者

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "tx-a"),
            exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT),
            key = {"tx.yzm"}
    ))
    public void receiveC(Message message) {
        System.out.println(" [ 消费者@C号 ] Received ==> '" + new String(message.getBody()) + "'");
    }

无异常

制造异常

消息回滚,消费者没有消息可消费

5 /confirm/i确认模式

/confirm/i发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
/confirm/i的三种实现方式:
channel.waitForConfirms()普通发送方确认模式;
channel.waitForConfirmsOrDie()批量确认模式;
channel.addConfirmListener()异步监听发送方确认模式;

修改配置

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    publisher-/confirm/i-type: correlated
    publisher-returns: true

	@Bean(name = "channel")
    public Channel channel(ConnectionFactory connectionFactory) {
        // true,启动事务;
        return connectionFactory.createConnection().createChannel(false);
    }

生产者、普通/confirm/i模式

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendD() {
        try {
            String message = "Hello world! @yzm";
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
            // 开启/confirm/i确认模式
            channel./confirm/iSelect();
            // 发送消息
            channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

            // 等待消息被确认
            if (channel.waitFor/confirm/is()) {
                System.out.println("消息发送成功");
            } else {
                // 返回false可以进行补发。
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            // channel.waitForConfirms 可能返回超时异常
            // 可以进行补发。
        }

    }

消费者

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "/confirm/i-a"),
            exchange = @Exchange(value = "/confirm/i.exchange", type = ExchangeTypes.FANOUT)
    ))
    public void receiveD(Message message) {
        System.out.println(" [ 消费者@D号 ] Received ==> '" + new String(message.getBody()) + "'");
    }

运行结果:

生产者生产消息,发送到消费者,消费者确认后,生产者收到确认

生产者、批量/confirm/i模式;消费者不变

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendE() {
        try {
            StringBuilder message = new StringBuilder("Hello world! @yzm");
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
            // 开启/confirm/i确认模式
            channel./confirm/iSelect();
            // 发送消息
            for (int i = 1; i <= 5; i++) {
                message.append("_").append(i);
                channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
            }
            
            // 阻塞线程,等待消息被确认。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。
            channel.waitFor/confirm/isOrDie();
        } catch (InterruptedException e) {
            // 可以进行补发。
        } catch (IOException e) {
            //
        }
        
        System.out.println("全部执行完成");
    }

运行结果:

生产者、异步/confirm/i模式;消费者不变

    @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000)
    public void sendF() {
        try {
            StringBuilder message = new StringBuilder("Hello world! @yzm");
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
            // 开启/confirm/i确认模式
            channel./confirm/iSelect();
            // 发送消息
            for (int i = 1; i <= 5; i++) {
                message.append("_").append(i);
                channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
            }

            //异步监听确认和未确认的消息
            channel.add/confirm/iListener(new /confirm/iListener() {
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("未确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple);
                }

                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("已确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple);
                }
            });

        } catch (IOException e) {
            //
        }
    }

运行结果:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5432810.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存