RabbitMQ整合到SpringBoot

RabbitMQ整合到SpringBoot,第1张

RabbitMQ整合到SpringBoot

SpringBoot整合RabbitMQ

创建SpringBoot工程的时候选择Web依赖

导入mq依赖


    org.springframework.boot
    spring-boot-starter-amqp
 

yml配置文件配置mq五大参数,除了hostip和端口,补上用户名密码和虚拟主机/test

spring:
  rabbitmq:
    host: 192.168.200.129
    port: 5672
    username: test
    password: test
    virtual-host: /test

Java配置类,声明exchange、queue,并且绑定它们

@Configuration
public class RabbitMQConfig {
    //1. 创建exchange - topic
    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("boot-topic-exchange",true,false);
    }

    //2. 创建queue
    @Bean
    public Queue getQueue(){
        return new Queue("boot-queue",true,false,false,null);//建议交换机和队列构造都选最长的,持久化赋值true,不认识给默认值
    }

    //3. 绑定在一起
    @Bean
    public Binding getBinding(TopicExchange topicExchange, Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
    }
    //4. 上面要注意的是交换机的名字和队列的名字不能是topic-exchange和topic-queue否则跟内部冲突启动失败
}

写一个测试类来模拟生产者发布消息到RabbitMQ,以后这个单元测试类是一个另一个发送消息的工程代码↓

@SpringBootTest
class SpringbootRabbitmqApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
    }
}

创建消费者监听消息

@Component
public class Consumer {

    @RabbitListener(queues = "boot-queue")
    public void getMessage(Object message){
        System.out.println("接收到消息:" + message);
    }

}

先启动应用程序监听消息,然后启动单元测试类发消息,控制台输出监听接收到的消息,说明整合成功

手动Ack,yml配置增加和java监听增加

执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。

但是,我们不想丢失任何任务,如果有一个消费者挂掉了,那么我们应该将分发给它的任务交付给另一个消费者去处理。

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。

如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完成,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。

没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会重新投递。即使处理一条消息会花费很长的时间。

消息应答是默认打开的。我们通过显示的设置autoAsk=true可关闭这种机制。现即自动应答开,一旦我们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听队列的下一个消费者。

yml文件配置手动ack,而不是默认的自动ack↓

spring:
  rabbitmq:
    host: 192.168.200.129
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual

代码手动ack,修改消费者的监听方法,增加client包的Channel接口和和amqp包的Message↓

@Component
public class Consumer {

    @RabbitListener(queues = "boot-queue")
    public void getMessage(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息:" + msg);
        //int i = 1 / 0;
        //手动ack
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

为什么要改为手动ack?因为如果是自动ack,消费者即使出了异常,没有正常完成任务,mq也收到自动应答表示完成了,就删除mq中的消息,但是如果改为手动ack配置,当消费者出现异常就中断了,没有走后面手动ack的代码,就没有正确应答,mq不会把消息删除,如果消费者没有出现异常,即调用手动ack代码,给mq应答正常,删除消费消息

消息的可靠性

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的 *** 作,效率太低,加了事务 *** 作后,比平时的 *** 作效率至少要慢100倍。

RabbitMQ除了事务,还提供了/confirm/i的确认机制,这个效率比事务高很多,所以先学习这个确认机制。

这个机制三种方式:1普通/confirm/i方式 2批量/confirm/i方式 3异步/confirm/i方式,接下来分别介绍一下↓

消息传递可靠性

 普通/confirm/i方式↓

//3.1 开启confirm
channel.confirmSelect();
//3.2 发送消息
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());//""这是默认交换机,改为xxx交换机测试即可
//3.3 判断消息发送是否成功
if(channel.waitForConfirms()){
    System.out.println("消息发送成功");
}else{
    System.out.println("发送消息失败");
}

 

批量/confirm/i方式↓

//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
    String msg = "Hello-World!" + i;
    channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 确定批量操作是否成功
channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException

异步/confirm/i方式↓

//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
    String msg = "Hello-World!" + i;
    channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
    }
});

Return机制,监听消息是否从exchange送到了指定的queue↓

/confirm/i只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息的。

可以采用Return机制来监听消息是否从exchange送到了指定的queue中

 开启Return机制,并在发送消息时,指定mandatory为true

// 开启return机制
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 当消息没有送达到queue时,才会回调执行。
        System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
    }
});

// 在发送消息时,换另外一个,方法重载,指定mandatory参数为true
channel.basicPublish("","HelloWorld",true,null,"msg".getBytes());

System.in.read();

上个例子的基础上,增加返回机制代码,故意设置一个不存在的路由键找不到队列,来进行消息没有到达队列的回调

SpringBoot实现/confirm/i确认机制和return返回机制

编写配置文件,增加/confirm/i和return

spring:
  rabbitmq:
    host: 192.168.200.129
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual
    publisher-/confirm/i-type: simple
    publisher-returns: true

开启/confirm/i和Return的Java代码配置,核心用到了RabbitTemplate来搞定

@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct//构造本类对象后,调用初始化方法,设置回调来触发下面确认方法和返回方法的调用↓
    public void initMethod(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("消息已经送达到Exchange");
        }else{
            System.out.println("消息没有送达到Exchange");
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息没有送达到Queue");
    }
}
避免消息重复消费

重复消费消息的原因是,消费者没有给RabbitMQ一个ack↓

重复消费

 重复消费消息,会对非幂等行 *** 作造成问题↓

幂等性 *** 作:比如数据库删除, *** 作一次和多次效果是一样的。

非幂等性 *** 作:比如添加,而且主键是自增的。

为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,先将消息的id放到Redis中,比如

id为0表示(正在执行业务)

id为1表示(执行业务成功)

如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

 

生产者,发送消息时,指定messageId↓

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    .deliveryMode(1)     //指定消息是否需要持久化 1 - 需要持久化  2 - 不需要持久化
    .messageId(UUID.randomUUID().toString())
    .build();

String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());

消费者,在消费消息时,根据具体业务逻辑去 *** 作redis↓

DefaultConsumer consume = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        Jedis jedis = new Jedis("192.168.200.129",6379);//记得修改为自己的Linux服务器ip
        String messageId = properties.getMessageId();
        
        //1. setnx到Redis中,默认指定value-0
        String result = jedis.set(messageId, "0", "NX", "EX", 10);
        
        if(result != null && result.equalsIgnoreCase("OK")) {
            System.out.println("接收到消息:" + new String(body, "UTF-8"));
            //消费消息了,打印消息
            
            //2. 消费成功,set messageId 1
            jedis.set(messageId,"1");
            channel.basicAck(envelope.getDeliveryTag(),false);
        }else {
            //3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1则手动ack
            String s = jedis.get(messageId);
            if("1".equalsIgnoreCase(s)){
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        }
    }
};
public class Publisher {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建Channel
        Channel channel = connection.createChannel();

        //3. 发布消息到exchange,同时指定路由的规则
        //修改的部分↓
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(1)     //指定消息是否需要持久化 1 - 需要持久化  2 - 不需要持久化
                .messageId(UUID.randomUUID().toString())
                .build();

        String msg = "Hello-World!";
        channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());
        //修改的部分↑

        System.out.println("生产者发布消息成功!");

        //4. 释放资源
        channel.close();
        connection.close();
    }
}

3消费者根据jedis调用的api结果,做响应的业务逻辑↓

public class Consumer {//其他消费者
    @Test
    public void consume() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();

        //2. 创建channel
        final Channel channel = connection.createChannel();

        //3. 声明队列-HelloWorld
        channel.queueDeclare("HelloWorld",true,false,false,null);

        //4. 开启监听Queue
        //修改的部分↓
        DefaultConsumer consume = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                Jedis jedis = new Jedis("192.168.200.129",6379);//记得修改为自己的Linux服务器ip
                String messageId = properties.getMessageId();

                //1. setnx到Redis中,默认指定value为0
                String result = jedis.set(messageId, "0", "NX", "EX", 10);
                
                if(result != null && result.equalsIgnoreCase("OK")) {
                    System.out.println("接收到消息:" + new String(body, "UTF-8"));
                    
                    //2. 消费成功,set messageId 1
                    jedis.set(messageId,"1");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }else {
                    //3. 如果1中的setnx失败,获取key对应的value,如果是0啥都不做,如果是1则手动ack
                    String s = jedis.get(messageId);
                    
                    if("1".equalsIgnoreCase(s)){
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            }
        };
        //修改的部分↑

        channel.basicConsume("HelloWorld",true,consume);

        System.out.println("消费者开始监听队列!");
        // System.in.read();
        System.in.read();//让程序不停止,只有键盘录入数据才会往下走

        //5. 释放资源
        channel.close();
        connection.close();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5717715.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存