@Slf4j @Configuration public class RabbitMqConfiguration { @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause)); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message)); return rabbitTemplate; } @Bean public Queue directoneQueue() { return new Queue(RabbitConstant.DIRECT_MODE_QUEUE_ONE); } @Bean public Queue queueTwo() { return new Queue(RabbitConstant.QUEUE_TWO); } @Bean public Queue queueThree() { return new Queue(RabbitConstant.QUEUE_THREE); } @Bean public Queue queueFour() { return new Queue(RabbitConstant.QUEUE_FOUR); } @Bean public Queue queueFive() { return new Queue(RabbitConstant.QUEUE_FIVE); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(RabbitConstant.FANOUT_MODE_QUEUE); } @Bean public Binding fanoutBinding1(Queue directOneQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(directOneQueue).to(fanoutExchange); } @Bean public Binding fanoutBinding2(Queue queueTwo, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueTwo).to(fanoutExchange); } @Bean public TopicExchange topicExchange() { return new TopicExchange(RabbitConstant.TOPIC_MODE_QUEUE); } @Bean public Binding topicBinding1(FanoutExchange fanoutExchange, TopicExchange topicExchange) { return BindingBuilder.bind(fanoutExchange).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_ONE); } @Bean public Binding topicBinding2(Queue queueTwo, TopicExchange topicExchange) { return BindingBuilder.bind(queueTwo).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_TWO); } @Bean public Binding topicBinding3(Queue queueThree, TopicExchange topicExchange) { return BindingBuilder.bind(queueThree).to(topicExchange).with(RabbitConstant.TOPIC_ROUTING_KEY_THREE); } @Bean public Queue delayQueue() { return new Queue(RabbitConstant.DELAY_QUEUE, true); } @Bean public CustomExchange delayExchange() { Mapargs = Maps.newHashMap(); args.put("x-delayed-type", "direct"); return new CustomExchange(RabbitConstant.DELAY_MODE_QUEUE, "x-delayed-message", true, false, args); } @Bean public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { return BindingBuilder.bind(delayQueue).to(delayExchange).with(RabbitConstant.DELAY_QUEUE).noargs(); } }
@Slf4j @Component public class DelayQueueHandler { @RabbitListener(queues = RabbitConstant.DELAY_QUEUE) @RabbitHandler public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) { // 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 final long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("延迟队列,手动ACK,接收消息:{}", JsonUtil.toJson(messageStruct)); // 通知 MQ 消息已被成功消费,可以ACK了 channel.basicAck(deliveryTag, false); } catch (IOException e) { try { // 处理失败,重新压入MQ channel.basicRecover(); } catch (IOException e1) { e1.printStackTrace(); } } } }
public interface RabbitConstant { String DIRECT_MODE_QUEUE_ONE = "queue.direct.1"; String FANOUT_MODE_QUEUE = "fanout.mode"; String TOPIC_MODE_QUEUE = "topic.mode"; String TOPIC_ROUTING_KEY_ONE = "queue.#"; String TOPIC_ROUTING_KEY_TWO = "*.queue"; String TOPIC_ROUTING_KEY_THREE = "3.queue"; String DELAY_QUEUE = "delay.queue"; String DELAY_MODE_QUEUE = "delay.mode"; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)