spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest事务
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
channel.txSelect()声明启动事务模式;
channel.txComment()提交事务;
channel.txRollback()回滚事务;
package com.yzm.rabbitmq_10.config; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean(name = "channel") public Channel channel(ConnectionFactory connectionFactory) { // true,启动事务; return connectionFactory.createConnection().createChannel(true); } }
package com.yzm.rabbitmq_10.sender; import com.rabbitmq.client.Channel; import com.rabbitmq.client./confirm/iListener; import com.rabbitmq.client.MessageProperties; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.IOException; @RestController public class Sender { @Resource(name = "channel") private Channel channel; @GetMapping("/tx") public void tx(int normal) throws IOException { String message = "Hello world!"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); try { // 声明事务 channel.txSelect(); // 发送消息 channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); int i = 1; if (normal == 1) i = 1 / 0; log.info("提交事务"); channel.txCommit(); } catch (Exception e) { log.info("事务回滚"); channel.txRollback(); } } }
package com.yzm.rabbitmq_10.receiver; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "tx_queue"), exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT), key = {"tx.yzm"} )) public void receiveC(Message message) { System.out.println(" [ 消费者@tx号 ] Received ==> '" + new String(message.getBody()) + "'"); } }
无异常
/confirm/i确认模式制造异常
消息回滚,消费者没有消息可消费
/confirm/i发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
/confirm/i的三种实现方式:
channel.waitForConfirms()普通发送方确认模式;
channel.waitForConfirmsOrDie()批量确认模式;
channel.addConfirmListener()异步监听发送方确认模式;
事务跟/confirm/i不能同时使用,关闭事务
@Bean(name = "channel") public Channel channel(ConnectionFactory connectionFactory) { // true,启动事务; return connectionFactory.createConnection().createChannel(false); }
package com.yzm.rabbitmq_10.sender; import com.rabbitmq.client.Channel; import com.rabbitmq.client./confirm/iListener; import com.rabbitmq.client.MessageProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.IOException; @Slf4j @RestController public class Sender { @Resource(name = "channel") private Channel channel; @GetMapping("/tx") public void tx(int normal) throws IOException { String message = "Hello world!"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); try { // 声明事务 channel.txSelect(); // 发送消息 channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); int i = 1; if (normal == 1) i = 1 / 0; log.info("提交事务"); channel.txCommit(); } catch (Exception e) { log.info("事务回滚"); channel.txRollback(); } } @GetMapping("//confirm/i") public void /confirm/i() { try { String message = "Hello world!"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); // 开启/confirm/i确认模式 channel./confirm/iSelect(); // 发送消息 channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 等待消息被确认 if (channel.waitFor/confirm/is()) { System.out.println("生产者确认消费者收到消息"); } else { // 返回false可以进行补发。 System.out.println("消费者未收到消息,是否重发"); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { // channel.waitForConfirms 可能返回超时异常 // 可以进行补发。 } } @GetMapping("//confirm/i2") public void /confirm/i2() { try { StringBuilder message = new StringBuilder("Hello world!"); System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); // 开启/confirm/i确认模式 channel./confirm/iSelect(); // 发送消息 for (int i = 1; i <= 5; i++) { message.append("_").append(i); channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes()); } // 阻塞线程,等待消息被确认。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。 channel.waitFor/confirm/isOrDie(); System.out.println("只有确认消费者收到消息了才会打印此日志"); } catch (InterruptedException e) { // 可以进行补发。 } catch (IOException e) { // } } @GetMapping("//confirm/i3") public void /confirm/i3() { try { StringBuilder message = new StringBuilder("Hello world!"); System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); // 开启/confirm/i确认模式 channel./confirm/iSelect(); // 发送消息 for (int i = 1; i <= 5; i++) { message.append("_").append(i); channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes()); } //异步监听确认和未确认的消息 channel.add/confirm/iListener(new /confirm/iListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("已确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple); } }); } catch (IOException e) { // } } }
package com.yzm.rabbitmq_10.receiver; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Receiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "tx_queue"), exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT), key = {"tx.yzm"} )) public void receiveC(Message message) { System.out.println(" [ 消费者@tx号 ] Received ==> '" + new String(message.getBody()) + "'"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "/confirm/i_queue"), exchange = @Exchange(value = "/confirm/i.exchange", type = ExchangeTypes.FANOUT) )) public void receiveD(Message message) { System.out.println(" [ 消费者@cf号 ] Received ==> '" + new String(message.getBody()) + "'"); } }
普通/confirm/i模式
http://localhost:8080//confirm/i
运行结果:
生产者生产消息,发送到消费者,消费者确认后,生产者收到确认
批量/confirm/i模式
http://localhost:8080//confirm/i2
运行结果:
相关链接异步/confirm/i模式
http://localhost:8080//confirm/i3
运行结果:
首页
上一篇:Channel
下一篇:集群搭建管理
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)