package org.jeecg.boot.starter.rabbitmq.config; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.jeecg.boot.starter.rabbitmq.core.MapMessageConverter; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.SerializerMessageConverter; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitConfig { @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMandatory(true); // template.setMessageConverter(new MapMessageConverter()); template.setMessageConverter(new SerializerMessageConverter()); //使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞 // template.setUsePublisherConnection(true); // template.setConfirmCallback(); // template.setReturnCallback(); return template; } }
package org.jeecg.boot.starter.rabbitmq.constant; public class MqConstant { public final static String TEST_QUEUE = "test_queue"; public final static String TEST_EXCHANGE = "test_exchange"; public final static String ROUTERKEY = "test_queue"; public final static String TEST_QUEUE1 = "test_queue1"; public final static String ROUTERKEY1 = "test_queue1"; public final static String TEST_EXCHANGE1 = "test_exchange1"; public final static String DIRECT_QUEUE = "direct_queue"; public final static String DIRECT_EXCHANGE = "direct_exchange"; public final static String DIRECT_ROUTERKEY = "direct_routerKey"; public final static String DLX_EXCHANGE = "dlx_exchange"; public final static String DLX_QUEUE = "dlx_queue"; public final static String DLX_ROUTERKRY = "dlx_routerkey"; //topic 模式 public final static String TOPIC_EXCHANGE = "topic_exchange"; public final static String TOPIC_QUEUE = "topic_queue"; public final static String TPOIC_ROUTERKEY = "topic.#"; //fanout 模式 public final static String FANOUT_EXCHANGE = "fanout_exchange"; public final static String FANOUT_QUEUE = "fanout_queue"; //延迟队列 public static final Integer delay = 10000; public static final String LAZY_EXCHANGE = "Lazy_Exchange"; public static final String LAZY_QUEUE = "Lazy_Queue"; public static final String LAZY_KEY = "lazy.#"; }
package org.jeecg.boot.starter.rabbitmq.callback; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @Component @Slf4j public class CustomConfirmAndReturnCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey); } @Override public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) { log.info("/confirm/i回调方法>>>回调消息ID为: " + correlationData.getId()); if (isSendSuccess) { log.info("/confirm/i回调方法>>>消息发送到交换机成功!"); } else { log.info("/confirm/i回调方法>>>消息发送到交换机失败!,原因 : [{}]", error); } } }
package org.jeecg.boot.starter.rabbitmq.core; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.util.StringUtils; import java.io.Serializable; public class MqMessageFactoryInit { public static Message init(String msg, MessageDeliveryMode deliveryMode, String headerKey, String headerValue, String expiration, Integer delay, String contentType, String correlationId, String messageId, Integer priority, String replyTo) { MessageProperties properties = new MessageProperties(); if (null != deliveryMode) { properties.setDeliveryMode(deliveryMode); } if (!StringUtils.isEmpty(headerKey) && !StringUtils.isEmpty(headerValue)) { properties.setHeader(headerKey, headerValue); } if (!StringUtils.isEmpty(expiration)) { properties.setExpiration(expiration); } if (null != delay) { properties.setDelay(delay); //设置延迟的时间 } if (!StringUtils.isEmpty(contentType)) { properties.setContentType(contentType); } if (!StringUtils.isEmpty(correlationId)) { properties.setCorrelationId(correlationId); } if (!StringUtils.isEmpty(messageId)) { properties.setMessageId(messageId); } if (null != priority) { properties.setPriority(priority); } if (!StringUtils.isEmpty(replyTo)) { properties.setReplyTo(replyTo); } return new Message(msg.getBytes(), properties); } public static CorrelationData init(String id) { return new CorrelationData(id); } }
package org.jeecg.boot.starter.rabbitmq.core; import java.util.HashMap; import java.util.concurrent.CountDownLatch; public class SharedVariableUtils { //回调函数结果:Result管理仓库,每个发布都对应一个result,一个主线程,二个回调的子线程共享这个result,执行完后消除 private static HashMapresultMap = new HashMap<>(); //countDownLatch管理仓库,每个发布消息请求有两个回调:/confirm/iCallback和returnCallback,但第二个成功时就不会执行所以很难得到结果,所以这里并没有使用 //每个回调线程和主线程分别共享各自的CountDownLatch,这个主键可以由appId+tenantId+回调类型来区分。 private static HashMap countDownLatchMap = new HashMap<>(); //发布失败后执行次数的计数器,需要每个线程单独使用 private static ThreadLocal count = ThreadLocal.withInitial(() -> new Integer(0)); public static String confirmCall = "/confirm/i"; public static String returnCall = "return"; //==============================Result===================================== private static void deleteResult(String resultName) { resultMap.remove(resultName); } public static ResultVo getResult(String resultName) { ResultVo result = resultMap.get(resultName); deleteResult(resultName); return result; } public static void setResult(String resultName, String key, String value) { ResultVo result = new ResultVo(); result.put(key, value); resultMap.put(resultName, result); } //========================================countDownLaunch======================================================= public static void await(String countDownLatchName) { CountDownLatch countDownLatch = countDownLatchMap.get(countDownLatchName); if (countDownLatch == null) { countDownLatch = new CountDownLatch(1); countDownLatchMap.put(countDownLatchName, countDownLatch); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void deleteCountDownLatch(String countDownLatchName) { countDownLatchMap.remove(countDownLatchName); } public static void countDown(String countDownLatchName) { CountDownLatch countDownLatch = countDownLatchMap.get(countDownLatchName); if (countDownLatch == null) { countDownLatch = new CountDownLatch(1); countDownLatchMap.put(countDownLatchName, countDownLatch); } countDownLatch.countDown(); } // ============================================count值=============================================================== //获取count的值 public static int getCount() { return count.get(); } //count++ public static void countUp() { count.set(count.get() + 1); } //设置count public static void setCount(int i) { count.set(0); } //remove }
package org.jeecg.boot.starter.rabbitmq.exchange; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DeadDirectExchangeConfig { @Bean public Queue directQueue() { Mapagruments = new HashMap (); agruments.put("x-dead-letter-exchange", MqConstant.DLX_EXCHANGE); // agruments.put("x-message-ttl", 5000); // queue:queue的名称 // exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点: // 1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列; // 2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同; // 3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。 // autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 return new Queue(MqConstant.DIRECT_QUEUE, true, false, false, agruments); } @Bean public DirectExchange directExchange() { return new DirectExchange(MqConstant.DIRECT_EXCHANGE, true, false); } @Bean public Binding directBinding() { return BindingBuilder.bind(directQueue()).to(directExchange()).with(MqConstant.DIRECT_ROUTERKEY); } }
package org.jeecg.boot.starter.rabbitmq.exchange; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class LazyExchangeConfig { @Bean public TopicExchange lazyExchange() { TopicExchange exchange = new TopicExchange(MqConstant.LAZY_EXCHANGE, true, false); exchange.setDelayed(true); //开启延迟队列 return exchange; } @Bean public Queue lazyQueue() { return new Queue(MqConstant.LAZY_QUEUE, true); } @Bean public Binding lazyBinding() { return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(MqConstant.LAZY_KEY); } }
package org.jeecg.boot.starter.rabbitmq.exchange; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TestDirectExchangeConfig { @Bean public Queue createQueue() { //durable(耐用的)是为了防止宕机等异常而导致消息无法及时接收设计的。这个对queue无太多影响,但对topic影响比较大。 return new Queue(MqConstant.TEST_QUEUE, true); } @Bean public DirectExchange createExchange() { //autoDelete:true自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 return new DirectExchange(MqConstant.TEST_EXCHANGE, true, false); } @Bean public Binding binding() { return BindingBuilder.bind(createQueue()).to(createExchange()).with(MqConstant.ROUTERKEY); } @Bean public Queue createQueue1() { //durable(耐用的)是为了防止宕机等异常而导致消息无法及时接收设计的。这个对queue无太多影响,但对topic影响比较大。 return new Queue(MqConstant.TEST_QUEUE1, true); } @Bean public DirectExchange createExchange1() { //autoDelete:true自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 return new DirectExchange(MqConstant.TEST_EXCHANGE1, true, false); } @Bean public Binding binding1() { return BindingBuilder.bind(createQueue1()).to(createExchange1()).with(MqConstant.ROUTERKEY1); } }
package org.jeecg.boot.starter.rabbitmq.exchange; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicExchangeConfig { @Bean public Queue dlxQueue() { return new Queue(MqConstant.DLX_QUEUE, true); } @Bean public TopicExchange dlxExchange() { return new TopicExchange(MqConstant.DLX_EXCHANGE, true, false); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#"); } }
package org.jeecg.modules.pQuery.controller; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.jeecg.modules.pQuery.rabitMq.Order; import org.jeecg.modules.pQuery.sender.SendMqService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.io.Serializable; @Api(tags = "消息队列重写测试") @RestController @RequestMapping("/mq") @CrossOrigin @Slf4j public class RabbitMqTest { @Autowired private SendMqService sendMqService; @Value("${spring.rabbitmq.publisher-/confirm/is}") private String /confirm/is; @ApiOperation(value = "test1") @GetMapping(value = "/send") public void send(String msg) { log.info("============:"+/confirm/is); sendMqService.send(msg, MqConstant.ROUTERKEY); } @ApiOperation(value = "死信队列") @GetMapping(value = "/dlsSend") public void dlsSend(String msg) { sendMqService.dlsSend(msg, MqConstant.DIRECT_ROUTERKEY); } @ApiOperation(value = "一对多的模式") @GetMapping(value = "/sendTopic") public void sendTopic(String msg) { sendMqService.sendTopic(msg, "topic.message"); } @ApiOperation(value = "广播模式") @GetMapping(value = "/fanoutSender") public void fanoutSender() { Order order = new Order(2,"你好,我是fanout队列"); sendMqService.fanoutSender(order); } @ApiOperation(value = "延迟队列") @GetMapping(value = "/lazySender") public void lazySender(String msg) { sendMqService.lazySender(msg,MqConstant.LAZY_KEY); } @ApiOperation(value = "测试死性队列/事务回滚/重试机制") @GetMapping(value = "/testDb") public void testDb() { sendMqService.testDb(); } }
package org.jeecg.modules.pQuery.receive; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.jeecg.boot.starter.rabbitmq.listenter.MqListener; import org.jeecg.boot.starter.rabbitmq.receive.baseReceive; import org.jeecg.common.base.baseMap; import org.jeecg.common.config.mqtoken.UserTokenContext; import org.jeecg.common.websocket.DebugContext; import org.jeecg.modules.pQuery.controller.RabbitMqTest; import org.jeecg.modules.pQuery.mapper.PeOrderInfoMapper; import org.jeecg.modules.pQuery.rabitMq.Order; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.io.IOException; import java.util.Map; @Component @Slf4j public class ReceiveMqService { @Autowired PeOrderInfoMapper peOrderInfoMapper; private String token = UserTokenContext.getToken(); @RabbitListener(queues = MqConstant.TEST_QUEUE) public void customer(Message message, Channel channel) throws IOException { UserTokenContext.setToken(token); long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicQos(0, 1, false); log.info("deliveryTag:{}", deliveryTag); Integer num = (Integer) message.getMessageProperties().getHeaders().get("num"); if (num == 0) { //不回队列一个个不消费 channel.basicNack(deliveryTag, false, false); } else { log.info("basicAck:{}", "一个个消费"); //一个个消费 channel.basicAck(deliveryTag, false); } } @RabbitListener(queues = MqConstant.DIRECT_QUEUE) public void customer2(Message message, Channel channel) throws IOException, InterruptedException { UserTokenContext.setToken(token); channel.basicQos(0, 1, false); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println(correlationId + "*******************"); long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.err.println("-----------consume message----------"); System.err.println("body: " + new String(message.getBody())); log.info("deliveryTag为:" + deliveryTag); // channel.basicAck(deliveryTag, false); channel.basicReject(deliveryTag, false); } @RabbitListener(queues = MqConstant.DLX_QUEUE) public void dlxCustomer(Message message, Channel channel) throws IOException, InterruptedException { UserTokenContext.setToken(token); channel.basicQos(0, 1, false); long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.err.println("-----------consume message----------"); System.err.println("body: " + new String(message.getBody())); Integer num = (Integer) message.getMessageProperties().getHeaders().get("num"); log.info("死信队列里面的deliveryTag为:" + deliveryTag); log.info("私信队列准备ack消息了"); channel.basicAck(deliveryTag, false); } @RabbitListener( bindings = @QueueBinding( exchange = @Exchange(value = MqConstant.TOPIC_EXCHANGE, type = ExchangeTypes.TOPIC, durable = "true", autoDelete = "false"), value = @Queue(value = MqConstant.TOPIC_QUEUE, durable = "true"), key = MqConstant.TPOIC_ROUTERKEY)) public void topicCustomer(Message message, Channel channel) throws IOException { UserTokenContext.setToken(token); long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println(deliveryTag + "*************"); channel.basicAck(deliveryTag, false); } @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = MqConstant.FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT, durable = "true"), value = @Queue(value = MqConstant.FANOUT_QUEUE, durable = "true"))) public void fanoutCustomer(@Payload Order order, @Headers Mapmap, Channel channel,Message message) throws IOException { UserTokenContext.setToken(token); System.out.println(order.getId()); Long delivery = (Long) map.get(AmqpHeaders.DELIVERY_TAG); log.info("批次: {},序列号:{}", delivery,message.getMessageProperties().getDeliveryTag()); try { System.out.println("消费成功"); channel.basicAck(delivery, false); } catch (IOException e) { System.out.println("消费失败"); channel.basicNack( delivery, false, false ); } } @RabbitListener(queues = MqConstant.LAZY_QUEUE) public void lazyCustomer(Message message, Channel channel) { UserTokenContext.setToken(token); long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.err.println("body: " + new String(message.getBody())); try { channel.basicAck(deliveryTag, false); } catch (IOException e) { try { channel.basicNack(deliveryTag, false, false); } catch (IOException e1) { log.info("失败"); } } } @RabbitListener(queues = MqConstant.TEST_QUEUE1) @Transactional public void customer1(String msg, Channel channel, Message message) throws IOException { UserTokenContext.setToken(token); try { // 这里模拟一个空指针异常, log.info("【Consumer01】批次: {}", message.getMessageProperties().getDeliveryTag()); log.info("【Consumer01成功接收到消息】>>> {}", msg); peOrderInfoMapper.setValues(); String string = null; string.length(); // 确认收到消息,只确认当前消费者的一个消息收到 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); //次重复消息投递 log.info("【Consumer01】批次: {}", message.getMessageProperties().getDeliveryTag()); // 拒绝消息,并且不再重新进入队列,重试机制两个条件1抛出异常,2不能重入队列而是重试,此处设置false channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); throw e; } } }
package org.jeecg.modules.pQuery.sender; import lombok.extern.slf4j.Slf4j; import org.jeecg.boot.starter.rabbitmq.callback.Custom/confirm/iAndReturnCallback; import org.jeecg.boot.starter.rabbitmq.constant.MqConstant; import org.jeecg.common.base.baseMap; import org.jeecg.modules.pQuery.controller.RabbitMqTest; import org.jeecg.modules.pQuery.rabitMq.Order; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.UUID; @Component @Slf4j public class SendMqService extends CustomConfirmAndReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; // @Autowired // public SendMqService(RabbitTemplate amqpTemplate) { // amqpTemplate.setConfirmCallback(this::/confirm/i); // amqpTemplate.setReturnCallback(this::returnedMessage); // this.rabbitTemplate = amqpTemplate; // } @PostConstruct public void init() { //指定 ConfirmCallback rabbitTemplate.setConfirmCallback(this); //指定 ReturnCallback rabbitTemplate.setReturnCallback(this); } public void confirm(CorrelationData correlationData, boolean ack, String cause) { super.confirm(correlationData, ack, cause); log.info("数据编号========correlationdata:{}======ack:{}", ack, correlationData); if (!ack) { log.info("异常处理...."+cause); } } public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { super.returnedMessage(message, replyCode, replyText, exchange, routingKey); log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}", exchange, routingKey, replyCode, replyText); } public void send(String msg, String routingKey) { for (int i = 0; i <= 5; i++) { baseMap map = new baseMap(); map.put("num", i); // MessageProperties properties = new MessageProperties(); // properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // properties.setHeader("num", i); // Message message = new Message(msg.getBytes(), map); // amqpTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, map); // amqpTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, map, message -> { // return message; // }); MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setHeader("num",i); Message message = new Message(msg.getBytes(), properties); rabbitTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, message); } } public void dlsSend(String msg, String directRouterkey) { for (int i = 0; i <= 5; i++) { MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); properties.setHeader("num",i); properties.setExpiration("5000"); Message message = new Message(msg.getBytes(), properties); String a = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(String.valueOf(i)); rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE, MqConstant.DIRECT_ROUTERKEY, message,correlationData); } } public void sendTopic(String msg, String routingKey) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(MqConstant.TOPIC_EXCHANGE,routingKey,msg,correlationData); } public void fanoutSender(Order order) { CorrelationData correlationData = new CorrelationData(order.getId().toString()); rabbitTemplate.convertAndSend(MqConstant.FANOUT_EXCHANGE,null,order,correlationData); } public void lazySender(String msg, String routingKey) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setDelay(MqConstant.delay); //设置延迟的时间 //设置消息投递模式持久性和临时性,临时性重启会丢失或者没有接收者会丢失 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = new Message(msg.getBytes(),messageProperties); rabbitTemplate.convertAndSend(MqConstant.LAZY_EXCHANGE,routingKey,message); } public void testDb() { for (int i = 0; i < 1; i++) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // log.info("【Producer】发送的消费ID = {}", correlationData.getId()); String msg = "hello confirm message" + i; // log.info("【Producer】发送的消息 = {}", msg); rabbitTemplate.convertAndSend(MqConstant.TEST_EXCHANGE1, MqConstant.ROUTERKEY1, msg, correlationData); } } }
依赖
org.springframework.cloud spring-cloud-starter-bus-amqp
配置
#rabbitmq配置 rabbitmq: host: pe-boot-rabbitmq username: admin password: admin port: 5672 virtual-host: / connection-timeout: 15000 #开启/confirm/i模式 publisher-/confirm/is: true #开启return模式,前提是下面的mandatory设置为true否则会删除消息 publisher-returns: true #消费者端开启自动ack模式 template.mandatory: true #新版本publisher-/confirm/is已经修改为publisher-/confirm/i-type,默认为NONE,CORRELATED值是发布消息成功到交换器会触发回调 publisher-/confirm/i-type: correlated listener: simple: acknowledge-mode: manual #并发消费者的最大值 max-concurrency: 5 #并发消费者的初始化值 concurrency: 1 #每个消费者每次监听时可拉取 prefetch: 1 # 重试机制 retry: #是否开启消费者重试 enabled: true #最大重试次数 max-attempts: 5 #重试间隔时间(单位毫秒) initial-interval: 5000 #重试最大时间间隔(单位毫秒) max-interval: 1200000 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间 multiplier: 2
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)