1 github:源码地址 2 rabbitmq05 子工程1./confirm/iCallback、ReturnsCallback回调
2.事务
3./confirm/i确认模式
4.0.0 rabbitmq05 com.yzm rabbitmq0.0.1-SNAPSHOT ../pom.xml 0.0.1-SNAPSHOT jar rabbitmq05 Demo project for Spring Boot org.springframework.boot spring-boot-maven-plugin
项目结构
application.yml
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest publisher-/confirm/i-type: correlated publisher-returns: true3 消息回调
配置类
package com.yzm.rabbitmq05.config; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class RabbitConfig { @Bean("rabbit") public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); //数据转换为json存入消息队列 template.setMessageConverter(new Jackson2JsonMessageConverter()); template.set/confirm/iCallback(/confirm/iCallback()); template.setReturnsCallback(returnCallback()); template.setMandatory(true); return template; } public static RabbitTemplate./confirm/iCallback /confirm/iCallback() { return new RabbitTemplate./confirm/iCallback() { @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { // ack判断消息发送到交换机是否成功 System.out.println("回调id:" + correlationData.getId()); if (ack) { // 消息发送成功到达交换机 // ... System.out.println("消息成功到达交换机"); } else { System.out.println("消息到达交换机失败"); System.out.println("错误信息:" + cause); } } }; } public static RabbitTemplate.ReturnsCallback returnCallback() { return new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { // 该交换机没有路由键匹配对应的消息队列 // 如果信息走了该回调,就不会走/confirm/i回调了 System.out.println("交换机:" + returnedMessage.getExchange()); System.out.println("路由键:" + returnedMessage.getRoutingKey()); System.out.println("消息主体 : " + returnedMessage.getMessage()); System.out.println("回复代码 : " + returnedMessage.getReplyCode()); System.out.println("描述:" + returnedMessage.getReplyText()); } }; } }
package com.yzm.rabbitmq05.service; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.UUID; @Component public class SenderService { @Resource(name = "rabbit") private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendA() { // 全局唯一 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); String message = "Hello world! @yzm"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); rabbitTemplate.convertAndSend("callback.exchange", "callback.a.yzm", message, correlationData); } }
消费者
package com.yzm.rabbitmq05.service; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class ReceiverService { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "callback-a"), exchange = @Exchange(value = "callback.exchange", type = ExchangeTypes.TOPIC), key = {"callback.a.*", "callback.*.a"} )) public void receiveA(Message message) { System.out.println(" [ 消费者@A号 ] Received ==> '" + new String(message.getBody()) + "'"); } }
运行结果:
- 消息正确到达交换机触发回调
修改生产者、指定一个不存在的交换机
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendA() { // 全局唯一 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); String message = "Hello world! @yzm"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); rabbitTemplate.convertAndSend("yzm.callback.exchange", "callback.a.yzm", message, correlationData); }
运行结果:
- 消息找不到交换机触发回调
修改生产者,指定一个不存在的路由键
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendA() { // 全局唯一 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); String message = "Hello world! @yzm"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); rabbitTemplate.convertAndSend("callback.exchange", "callback.yzm.yzm", message, correlationData); }
运行结果:
- 消息路由失败触发回调
路由失败还可以通过添加监听器处理
修改配置类
@Configuration @EnableScheduling public class RabbitConfig { @Bean(name = "channel") public Channel channel(ConnectionFactory connectionFactory) { // true,启动事务; return connectionFactory.createConnection().createChannel(false); } ... }
生产者
package com.yzm.rabbitmq05.service; import com.rabbitmq.client.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; import java.util.UUID; @Component public class SenderService { @Resource(name = "rabbit") private RabbitTemplate rabbitTemplate; @Resource(name = "channel") private Channel channel; // @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendA() { // 全局唯一 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); String message = "Hello world! @yzm"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); rabbitTemplate.convertAndSend("callback.exchange", "callback.yzm.yzm", message, correlationData); } @Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendB() throws IOException { //消息不可达,回调 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("replyCode:" + replyCode); System.out.println("replyText:" + replyText); System.out.println("exchange:" + exchange); System.out.println("routingKey:" + routingKey); System.out.println("properties:" + properties); System.out.println("body:" + new String(body)); } }); String message = "Hello world! @yzm"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); channel.basicPublish("callback.exchange", "callback.yzm.yzm", true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); } }
4 事务运行结果:
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
channel.txSelect()声明启动事务模式;
channel.txComment()提交事务;
channel.txRollback()回滚事务;
事务createChannel(true)跟publisher-/confirm/i-type: correlated不能同时设置
修改配置
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest publisher-/confirm/i-type: none publisher-returns: true @Bean(name = "channel") public Channel channel(ConnectionFactory connectionFactory) { // true,启动事务; return connectionFactory.createConnection().createChannel(true); }
生产者
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendC() throws IOException { String message = "Hello world! @yzm"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); try { // 声明事务 channel.txSelect(); // 发送消息 channel.basicPublish("tx.exchange", "tx.yzm", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //int i = 1/0; // 提交事务 channel.txCommit(); } catch (Exception e) { // 事务回滚 channel.txRollback(); } }
消费者
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "tx-a"), exchange = @Exchange(value = "tx.exchange", type = ExchangeTypes.DIRECT), key = {"tx.yzm"} )) public void receiveC(Message message) { System.out.println(" [ 消费者@C号 ] Received ==> '" + new String(message.getBody()) + "'"); }
5 /confirm/i确认模式无异常
制造异常
消息回滚,消费者没有消息可消费
/confirm/i发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
/confirm/i的三种实现方式:
channel.waitForConfirms()普通发送方确认模式;
channel.waitForConfirmsOrDie()批量确认模式;
channel.addConfirmListener()异步监听发送方确认模式;
修改配置
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest publisher-/confirm/i-type: correlated publisher-returns: true @Bean(name = "channel") public Channel channel(ConnectionFactory connectionFactory) { // true,启动事务; return connectionFactory.createConnection().createChannel(false); }
生产者、普通/confirm/i模式
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendD() { try { String message = "Hello world! @yzm"; System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); // 开启/confirm/i确认模式 channel./confirm/iSelect(); // 发送消息 channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 等待消息被确认 if (channel.waitFor/confirm/is()) { System.out.println("消息发送成功"); } else { // 返回false可以进行补发。 } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { // channel.waitForConfirms 可能返回超时异常 // 可以进行补发。 } }
消费者
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "/confirm/i-a"), exchange = @Exchange(value = "/confirm/i.exchange", type = ExchangeTypes.FANOUT) )) public void receiveD(Message message) { System.out.println(" [ 消费者@D号 ] Received ==> '" + new String(message.getBody()) + "'"); }
运行结果:
生产者生产消息,发送到消费者,消费者确认后,生产者收到确认
生产者、批量/confirm/i模式;消费者不变
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendE() { try { StringBuilder message = new StringBuilder("Hello world! @yzm"); System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); // 开启/confirm/i确认模式 channel./confirm/iSelect(); // 发送消息 for (int i = 1; i <= 5; i++) { message.append("_").append(i); channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes()); } // 阻塞线程,等待消息被确认。该方法可以指定一个等待时间。该方法无返回值,只能根据抛出的异常进行判断。 channel.waitFor/confirm/isOrDie(); } catch (InterruptedException e) { // 可以进行补发。 } catch (IOException e) { // } System.out.println("全部执行完成"); }
运行结果:
生产者、异步/confirm/i模式;消费者不变
@Scheduled(fixedDelay = 60 * 60 * 1000, initialDelay = 5000) public void sendF() { try { StringBuilder message = new StringBuilder("Hello world! @yzm"); System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); // 开启/confirm/i确认模式 channel./confirm/iSelect(); // 发送消息 for (int i = 1; i <= 5; i++) { message.append("_").append(i); channel.basicPublish("/confirm/i.exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.toString().getBytes()); } //异步监听确认和未确认的消息 channel.add/confirm/iListener(new /confirm/iListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("已确认消息,标识:" + deliveryTag + "是否批量处理:" + multiple); } }); } catch (IOException e) { // } }
运行结果:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)