RabbitMQ之事务以及Confirm确认模式

RabbitMQ之事务以及Confirm确认模式,第1张

RabbitMQ之事务以及Confirm确认模式

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
事务

事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
channel.txSelect()声明启动事务模式
channel.txComment()提交事务;
channel.txRollback()回滚事务;

package com.yzm.rabbitmq_10.config;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean(name = "channel")
    public Channel channel(ConnectionFactory connectionFactory) {
        // true,启动事务;
        return connectionFactory.createConnection().createChannel(true);
    }
}
package com.yzm.rabbitmq_10.sender;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iListener;
import com.rabbitmq.client.MessageProperties;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;

@RestController
public class Sender {

    @Resource(name = "channel")
    private Channel channel;

    @GetMapping("/tx")
    public void tx(int normal) throws IOException {
        String message = "Hello world!";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        try {
            // 声明事务
            channel.txSelect();
            // 发送消息
            channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            int i = 1;
            if (normal == 1) i = 1 / 0;
            log.info("提交事务");
            channel.txCommit();
        } catch (Exception e) {
            log.info("事务回滚");
            channel.txRollback();
        }
    }	
}
package com.yzm.rabbitmq_10.receiver;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "tx_queue"),
            exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT),
            key = {"tx.yzm"}
    ))
    public void receiveC(Message message) {
        System.out.println(" [ 消费者@tx号 ] Received ==> '" + new String(message.getBody()) + "'");
    }

}

无异常

制造异常

消息回滚,消费者没有消息可消费

/confirm/i确认模式

/confirm/i发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
/confirm/i的三种实现方式:
channel.waitForConfirms()普通发送方确认模式;
channel.waitForConfirmsOrDie()批量确认模式;
channel.addConfirmListener()异步监听发送方确认模式;

事务跟/confirm/i不能同时使用,关闭事务

	@Bean(name = "channel")
    public Channel channel(ConnectionFactory connectionFactory) {
        // true,启动事务;
        return connectionFactory.createConnection().createChannel(false);
    }
package com.yzm.rabbitmq_10.sender;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iListener;
import com.rabbitmq.client.MessageProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.io.IOException;

@Slf4j
@RestController
public class Sender {

    @Resource(name = "channel")
    private Channel channel;

    @GetMapping("/tx")
    public void tx(int normal) throws IOException {
        String message = "Hello world!";
        System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        try {
            // 声明事务
            channel.txSelect();
            // 发送消息
            channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            int i = 1;
            if (normal == 1) i = 1 / 0;
            log.info("提交事务");
            channel.txCommit();
        } catch (Exception e) {
            log.info("事务回滚");
            channel.txRollback();
        }
    }

    @GetMapping("//confirm/i")
    public void /confirm/i() {
        try {
            String message = "Hello world!";
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
            // 开启/confirm/i确认模式
            channel./confirm/iSelect();
            // 发送消息
            channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

            // 等待消息被确认
            if (channel.waitFor/confirm/is()) {
                System.out.println("生产者确认消费者收到消息");
            } else {
                // 返回false可以进行补发。
                System.out.println("消费者未收到消息,是否重发");
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            // channel.waitForConfirms 可能返回超时异常
            // 可以进行补发。
        }

    }

    @GetMapping("//confirm/i2")
    public void /confirm/i2() {
        try {
            StringBuilder message = new StringBuilder("Hello world!");
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
            // 开启/confirm/i确认模式
            channel./confirm/iSelect();
            // 发送消息
            for (int i = 1; i <= 5; i++) {
                message.append("_").append(i);
                channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
            }

            // 阻塞线程,等待消息被确认。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。
            channel.waitFor/confirm/isOrDie();
            System.out.println("只有确认消费者收到消息了才会打印此日志");
        } catch (InterruptedException e) {
            // 可以进行补发。
        } catch (IOException e) {
            //
        }
    }

    @GetMapping("//confirm/i3")
    public void /confirm/i3() {
        try {
            StringBuilder message = new StringBuilder("Hello world!");
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
            // 开启/confirm/i确认模式
            channel./confirm/iSelect();
            // 发送消息
            for (int i = 1; i <= 5; i++) {
                message.append("_").append(i);
                channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes());
            }

            //异步监听确认和未确认的消息
            channel.add/confirm/iListener(new /confirm/iListener() {
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("未确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple);
                }

                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("已确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple);
                }
            });

        } catch (IOException e) {
            //
        }

    }
}
package com.yzm.rabbitmq_10.receiver;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "tx_queue"),
            exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT),
            key = {"tx.yzm"}
    ))
    public void receiveC(Message message) {
        System.out.println(" [ 消费者@tx号 ] Received ==> '" + new String(message.getBody()) + "'");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "/confirm/i_queue"),
            exchange = @Exchange(value = "/confirm/i.exchange", type = ExchangeTypes.FANOUT)
    ))
    public void receiveD(Message message) {
        System.out.println(" [ 消费者@cf号 ] Received ==> '" + new String(message.getBody()) + "'");
    }
}

普通/confirm/i模式
http://localhost:8080//confirm/i
运行结果:

生产者生产消息,发送到消费者,消费者确认后,生产者收到确认

批量/confirm/i模式
http://localhost:8080//confirm/i2
运行结果:

异步/confirm/i模式
http://localhost:8080//confirm/i3
运行结果:

相关链接

首页
上一篇:Channel
下一篇:集群搭建管理

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5605236.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存