rabbitmq事务和重试机制

rabbitmq事务和重试机制,第1张

rabbitmq事务和重试机制
package org.jeecg.boot.starter.rabbitmq.config;

import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.boot.starter.rabbitmq.core.MapMessageConverter;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class RabbitConfig {

    
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        template.setMandatory(true);
//        template.setMessageConverter(new MapMessageConverter());
        template.setMessageConverter(new SerializerMessageConverter());
        //使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
//        template.setUsePublisherConnection(true);
        
//        template.setConfirmCallback();
//        template.setReturnCallback();
        return template;
    }

}
package org.jeecg.boot.starter.rabbitmq.constant;


public class MqConstant {
    
    public final static String TEST_QUEUE = "test_queue";
    public final static String TEST_EXCHANGE = "test_exchange";
    public final static String ROUTERKEY = "test_queue";

    public final static String TEST_QUEUE1 = "test_queue1";
    public final static String ROUTERKEY1 = "test_queue1";
    public final static String TEST_EXCHANGE1 = "test_exchange1";
    
    public final static String DIRECT_QUEUE = "direct_queue";
    public final static String DIRECT_EXCHANGE = "direct_exchange";
    public final static String DIRECT_ROUTERKEY = "direct_routerKey";

    
    public final static String DLX_EXCHANGE = "dlx_exchange";
    public final static String DLX_QUEUE = "dlx_queue";
    public final static String DLX_ROUTERKRY = "dlx_routerkey";


    //topic 模式
    public final static String TOPIC_EXCHANGE = "topic_exchange";
    public final static String TOPIC_QUEUE = "topic_queue";
    public final static String TPOIC_ROUTERKEY = "topic.#";

    //fanout 模式
    public final static String FANOUT_EXCHANGE = "fanout_exchange";
    public final static String FANOUT_QUEUE = "fanout_queue";


    //延迟队列
    public static final Integer delay = 10000;
    public static final String LAZY_EXCHANGE = "Lazy_Exchange";
    public static final String LAZY_QUEUE = "Lazy_Queue";
    public static final String LAZY_KEY = "lazy.#";


 }

package org.jeecg.boot.starter.rabbitmq.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;


@Component
@Slf4j
public class CustomConfirmAndReturnCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {

    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage回调方法>>>" + new String(message.getBody(), StandardCharsets.UTF_8) + ",replyCode:" + replyCode
                + ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
        log.info("/confirm/i回调方法>>>回调消息ID为: " + correlationData.getId());
        if (isSendSuccess) {
            log.info("/confirm/i回调方法>>>消息发送到交换机成功!");
        } else {
            log.info("/confirm/i回调方法>>>消息发送到交换机失败!,原因 : [{}]", error);
        }
    }

}
package org.jeecg.boot.starter.rabbitmq.core;

import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.util.StringUtils;

import java.io.Serializable;



public class MqMessageFactoryInit {
    
    public static Message init(String msg, MessageDeliveryMode deliveryMode, String headerKey, String headerValue, String expiration, Integer delay, String contentType,
                               String correlationId, String messageId, Integer priority, String replyTo) {
        MessageProperties properties = new MessageProperties();
        if (null != deliveryMode) {
            properties.setDeliveryMode(deliveryMode);
        }
        if (!StringUtils.isEmpty(headerKey) && !StringUtils.isEmpty(headerValue)) {
            properties.setHeader(headerKey, headerValue);
        }
        if (!StringUtils.isEmpty(expiration)) {
            properties.setExpiration(expiration);
        }
        if (null != delay) {
            properties.setDelay(delay);  //设置延迟的时间
        }
        if (!StringUtils.isEmpty(contentType)) {
            properties.setContentType(contentType);
        }
        if (!StringUtils.isEmpty(correlationId)) {
            properties.setCorrelationId(correlationId);
        }
        if (!StringUtils.isEmpty(messageId)) {
            properties.setMessageId(messageId);
        }
        if (null != priority) {
            properties.setPriority(priority);
        }
        if (!StringUtils.isEmpty(replyTo)) {
            properties.setReplyTo(replyTo);
        }

        return new Message(msg.getBytes(), properties);
    }

    
    public static CorrelationData init(String id) {
        return new CorrelationData(id);
    }
}
package org.jeecg.boot.starter.rabbitmq.core;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;


public class SharedVariableUtils {
    //回调函数结果:Result管理仓库,每个发布都对应一个result,一个主线程,二个回调的子线程共享这个result,执行完后消除
    private static HashMap resultMap = new HashMap<>();
    //countDownLatch管理仓库,每个发布消息请求有两个回调:/confirm/iCallback和returnCallback,但第二个成功时就不会执行所以很难得到结果,所以这里并没有使用
    //每个回调线程和主线程分别共享各自的CountDownLatch,这个主键可以由appId+tenantId+回调类型来区分。
    private static HashMap countDownLatchMap = new HashMap<>();
    //发布失败后执行次数的计数器,需要每个线程单独使用
    private static ThreadLocal count = ThreadLocal.withInitial(() -> new Integer(0));
    public static String confirmCall = "/confirm/i";
    public static String returnCall = "return";


    //==============================Result=====================================

    

    private static void deleteResult(String resultName) {
        resultMap.remove(resultName);
    }


    
    public static ResultVo getResult(String resultName) {
        ResultVo result = resultMap.get(resultName);
        deleteResult(resultName);
        return result;
    }

    
    public static void setResult(String resultName, String key, String value) {
        ResultVo result = new ResultVo();
        result.put(key, value);
        resultMap.put(resultName, result);
    }

    //========================================countDownLaunch=======================================================

    
    public static void await(String countDownLatchName) {
        CountDownLatch countDownLatch = countDownLatchMap.get(countDownLatchName);
        if (countDownLatch == null) {
            countDownLatch = new CountDownLatch(1);
            countDownLatchMap.put(countDownLatchName, countDownLatch);
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    
    public static void deleteCountDownLatch(String countDownLatchName) {
        countDownLatchMap.remove(countDownLatchName);
    }

    

    public static void countDown(String countDownLatchName) {
        CountDownLatch countDownLatch = countDownLatchMap.get(countDownLatchName);
        if (countDownLatch == null) {
            countDownLatch = new CountDownLatch(1);
            countDownLatchMap.put(countDownLatchName, countDownLatch);
        }
        countDownLatch.countDown();
    }


//    ============================================count值===============================================================

    //获取count的值
    public static int getCount() {
        return count.get();
    }

    //count++
    public static void countUp() {
        count.set(count.get() + 1);
    }

    //设置count
    public static void setCount(int i) {
        count.set(0);
    }

    //remove
}
package org.jeecg.boot.starter.rabbitmq.exchange;

import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class DeadDirectExchangeConfig {
    
    @Bean
    public Queue directQueue() {
        Map agruments = new HashMap();
        agruments.put("x-dead-letter-exchange", MqConstant.DLX_EXCHANGE);
//        agruments.put("x-message-ttl", 5000);
//        queue:queue的名称
//        exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
//        1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
//        2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
//        3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
//        autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
        return new Queue(MqConstant.DIRECT_QUEUE, true, false, false, agruments);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(MqConstant.DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding directBinding() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(MqConstant.DIRECT_ROUTERKEY);
    }
}
package org.jeecg.boot.starter.rabbitmq.exchange;

import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class LazyExchangeConfig {

    
    @Bean
    public TopicExchange lazyExchange() {
        TopicExchange exchange = new TopicExchange(MqConstant.LAZY_EXCHANGE, true, false);
        exchange.setDelayed(true);  //开启延迟队列
        return exchange;
    }

    @Bean
    public Queue lazyQueue() {
        return new Queue(MqConstant.LAZY_QUEUE, true);
    }

    @Bean
    public Binding lazyBinding() {
        return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(MqConstant.LAZY_KEY);
    }
}
package org.jeecg.boot.starter.rabbitmq.exchange;

import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TestDirectExchangeConfig {
    
    @Bean
    public Queue createQueue() {
        //durable(耐用的)是为了防止宕机等异常而导致消息无法及时接收设计的。这个对queue无太多影响,但对topic影响比较大。
        return new Queue(MqConstant.TEST_QUEUE, true);
    }

    
    @Bean
    public DirectExchange createExchange() {
        //autoDelete:true自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
        return new DirectExchange(MqConstant.TEST_EXCHANGE, true, false);
    }

    
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(createQueue()).to(createExchange()).with(MqConstant.ROUTERKEY);
    }
    
    @Bean
    public Queue createQueue1() {
        //durable(耐用的)是为了防止宕机等异常而导致消息无法及时接收设计的。这个对queue无太多影响,但对topic影响比较大。
        return new Queue(MqConstant.TEST_QUEUE1, true);
    }

    
    @Bean
    public DirectExchange createExchange1() {
        //autoDelete:true自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
        return new DirectExchange(MqConstant.TEST_EXCHANGE1, true, false);
    }

    
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(createQueue1()).to(createExchange1()).with(MqConstant.ROUTERKEY1);
    }
}
package org.jeecg.boot.starter.rabbitmq.exchange;

import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicExchangeConfig {

    @Bean
    public Queue dlxQueue() {
        return new Queue(MqConstant.DLX_QUEUE, true);
    }

    
    @Bean
    public TopicExchange dlxExchange() {
        return new TopicExchange(MqConstant.DLX_EXCHANGE, true, false);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#");
    }
}
package org.jeecg.modules.pQuery.controller;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.modules.pQuery.rabitMq.Order;
import org.jeecg.modules.pQuery.sender.SendMqService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.Serializable;


@Api(tags = "消息队列重写测试")
@RestController
@RequestMapping("/mq")
@CrossOrigin
@Slf4j
public class RabbitMqTest {
    @Autowired
    private SendMqService sendMqService;

    @Value("${spring.rabbitmq.publisher-/confirm/is}")
    private String /confirm/is;
    
    @ApiOperation(value = "test1")
    @GetMapping(value = "/send")
    public void send(String msg) {
        log.info("============:"+/confirm/is);
        sendMqService.send(msg, MqConstant.ROUTERKEY);

    }

    
    @ApiOperation(value = "死信队列")
    @GetMapping(value = "/dlsSend")
    public void dlsSend(String msg) {
        sendMqService.dlsSend(msg, MqConstant.DIRECT_ROUTERKEY);
    }

    
    @ApiOperation(value = "一对多的模式")
    @GetMapping(value = "/sendTopic")
    public void sendTopic(String msg) {
        sendMqService.sendTopic(msg, "topic.message");
    }

    
    @ApiOperation(value = "广播模式")
    @GetMapping(value = "/fanoutSender")
    public void fanoutSender() {
        Order order = new Order(2,"你好,我是fanout队列");
        sendMqService.fanoutSender(order);

    }

    
    @ApiOperation(value = "延迟队列")
    @GetMapping(value = "/lazySender")
    public void lazySender(String msg) {

        sendMqService.lazySender(msg,MqConstant.LAZY_KEY);
    }

    
    @ApiOperation(value = "测试死性队列/事务回滚/重试机制")
    @GetMapping(value = "/testDb")
    public void testDb() {
        sendMqService.testDb();
    }


}
package org.jeecg.modules.pQuery.receive;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.jeecg.boot.starter.rabbitmq.receive.baseReceive;
import org.jeecg.common.base.baseMap;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.websocket.DebugContext;
import org.jeecg.modules.pQuery.controller.RabbitMqTest;
import org.jeecg.modules.pQuery.mapper.PeOrderInfoMapper;
import org.jeecg.modules.pQuery.rabitMq.Order;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.util.Map;


@Component
@Slf4j
public class ReceiveMqService {
    @Autowired
    PeOrderInfoMapper peOrderInfoMapper;
    private String token = UserTokenContext.getToken();

    @RabbitListener(queues = MqConstant.TEST_QUEUE)
    public void customer(Message message, Channel channel) throws IOException {
        UserTokenContext.setToken(token);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        channel.basicQos(0, 1, false);
        log.info("deliveryTag:{}", deliveryTag);
        Integer num = (Integer) message.getMessageProperties().getHeaders().get("num");
        if (num == 0) {
            //不回队列一个个不消费
            channel.basicNack(deliveryTag, false, false);
        } else {
            log.info("basicAck:{}", "一个个消费");
            //一个个消费
            channel.basicAck(deliveryTag, false);
        }
    }

    @RabbitListener(queues = MqConstant.DIRECT_QUEUE)
    public void customer2(Message message, Channel channel) throws IOException, InterruptedException {
        UserTokenContext.setToken(token);
        channel.basicQos(0, 1, false);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println(correlationId + "*******************");
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(message.getBody()));
        log.info("deliveryTag为:" + deliveryTag);
//        channel.basicAck(deliveryTag, false);
        channel.basicReject(deliveryTag, false);

    }

    @RabbitListener(queues = MqConstant.DLX_QUEUE)
    public void dlxCustomer(Message message, Channel channel) throws IOException, InterruptedException {
        UserTokenContext.setToken(token);
        channel.basicQos(0, 1, false);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(message.getBody()));
        Integer num = (Integer) message.getMessageProperties().getHeaders().get("num");
        log.info("死信队列里面的deliveryTag为:" + deliveryTag);
        log.info("私信队列准备ack消息了");
        channel.basicAck(deliveryTag, false);

    }

    @RabbitListener(
            bindings = @QueueBinding(
                    exchange = @Exchange(value = MqConstant.TOPIC_EXCHANGE, type = ExchangeTypes.TOPIC,
                            durable = "true", autoDelete = "false"),
                    value = @Queue(value = MqConstant.TOPIC_QUEUE, durable = "true"),
                    key = MqConstant.TPOIC_ROUTERKEY))
    public void topicCustomer(Message message, Channel channel) throws IOException {
        UserTokenContext.setToken(token);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println(deliveryTag + "*************");
        channel.basicAck(deliveryTag, false);
    }

    
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConstant.FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT, durable = "true"),
            value = @Queue(value = MqConstant.FANOUT_QUEUE, durable = "true")))
    public void fanoutCustomer(@Payload Order order, @Headers Map map, Channel channel,Message message) throws IOException {
        UserTokenContext.setToken(token);
        System.out.println(order.getId());
        Long delivery = (Long) map.get(AmqpHeaders.DELIVERY_TAG);
        log.info("批次: {},序列号:{}", delivery,message.getMessageProperties().getDeliveryTag());
        try {
            System.out.println("消费成功");
            channel.basicAck(delivery, false);
        } catch (IOException e) {
            System.out.println("消费失败");
            channel.basicNack(
                    delivery, false, false
            );
        }
    }

    @RabbitListener(queues = MqConstant.LAZY_QUEUE)
    public void lazyCustomer(Message message, Channel channel) {
        UserTokenContext.setToken(token);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.err.println("body: " + new String(message.getBody()));
        try {
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            try {
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException e1) {
                log.info("失败");
            }
        }
    }

    
    @RabbitListener(queues = MqConstant.TEST_QUEUE1)
    @Transactional
    public void customer1(String msg, Channel channel, Message message) throws IOException {
        UserTokenContext.setToken(token);
        
        try {
            // 这里模拟一个空指针异常,
            log.info("【Consumer01】批次: {}", message.getMessageProperties().getDeliveryTag());
            log.info("【Consumer01成功接收到消息】>>> {}", msg);
            peOrderInfoMapper.setValues();
            String string = null;
            string.length();
            // 确认收到消息,只确认当前消费者的一个消息收到
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
            //次重复消息投递
            log.info("【Consumer01】批次: {}", message.getMessageProperties().getDeliveryTag());
            // 拒绝消息,并且不再重新进入队列,重试机制两个条件1抛出异常,2不能重入队列而是重试,此处设置false
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            throw e;
        }

    }
}


package org.jeecg.modules.pQuery.sender;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.callback.Custom/confirm/iAndReturnCallback;
import org.jeecg.boot.starter.rabbitmq.constant.MqConstant;
import org.jeecg.common.base.baseMap;
import org.jeecg.modules.pQuery.controller.RabbitMqTest;
import org.jeecg.modules.pQuery.rabitMq.Order;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.UUID;



@Component
@Slf4j
public class SendMqService extends CustomConfirmAndReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

//    @Autowired
//    public SendMqService(RabbitTemplate amqpTemplate) {
//        amqpTemplate.setConfirmCallback(this::/confirm/i);
//        amqpTemplate.setReturnCallback(this::returnedMessage);
//        this.rabbitTemplate = amqpTemplate;
//    }
    
    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(this);
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(this);
    }




        
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        super.confirm(correlationData, ack, cause);
        log.info("数据编号========correlationdata:{}======ack:{}", ack, correlationData);
        if (!ack) {
            log.info("异常处理...."+cause);
        }
    }

    
    public void returnedMessage(Message message, int replyCode,
                                String replyText, String exchange, String routingKey) {
        super.returnedMessage(message, replyCode, replyText, exchange, routingKey);
        log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
                exchange, routingKey, replyCode, replyText);
    }
    public void send(String msg, String routingKey) {
        for (int i = 0; i <= 5; i++) {
            baseMap map = new baseMap();
            map.put("num",  i);
//            MessageProperties properties = new MessageProperties();
//            properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//            properties.setHeader("num", i);
//            Message message = new Message(msg.getBytes(), map);
//            amqpTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, map);
//            amqpTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, map, message -> {
//                return message;
//            });
            MessageProperties properties = new MessageProperties();
            properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            properties.setHeader("num",i);
            Message message = new Message(msg.getBytes(), properties);
            rabbitTemplate.convertAndSend(MqConstant.TEST_EXCHANGE, routingKey, message);
        }
    }

    public void dlsSend(String msg, String directRouterkey) {
        for (int i = 0; i <= 5; i++) {
            MessageProperties properties = new MessageProperties();
            properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            properties.setHeader("num",i);
            properties.setExpiration("5000");
            Message message = new Message(msg.getBytes(), properties);
            String a = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(String.valueOf(i));
            rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE, MqConstant.DIRECT_ROUTERKEY, message,correlationData);
        }
    }

    public void sendTopic(String msg, String routingKey) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(MqConstant.TOPIC_EXCHANGE,routingKey,msg,correlationData);
    }

    public void fanoutSender(Order order) {
        CorrelationData correlationData = new CorrelationData(order.getId().toString());
        rabbitTemplate.convertAndSend(MqConstant.FANOUT_EXCHANGE,null,order,correlationData);
    }

    public void lazySender(String msg, String routingKey) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(MqConstant.delay);  //设置延迟的时间
        //设置消息投递模式持久性和临时性,临时性重启会丢失或者没有接收者会丢失
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        Message message = new Message(msg.getBytes(),messageProperties);
        rabbitTemplate.convertAndSend(MqConstant.LAZY_EXCHANGE,routingKey,message);
    }

    public void testDb() {
        for (int i = 0; i < 1; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//            log.info("【Producer】发送的消费ID = {}", correlationData.getId());
            String msg = "hello confirm message" + i;
//            log.info("【Producer】发送的消息 = {}", msg);
            rabbitTemplate.convertAndSend(MqConstant.TEST_EXCHANGE1, MqConstant.ROUTERKEY1, msg, correlationData);
        }
    }
}

依赖

 
        
            org.springframework.cloud
            spring-cloud-starter-bus-amqp
        

配置

#rabbitmq配置
  rabbitmq:
    host: pe-boot-rabbitmq
    username: admin
    password: admin
    port: 5672
    virtual-host: /
    connection-timeout: 15000
    #开启/confirm/i模式
    publisher-/confirm/is: true
    #开启return模式,前提是下面的mandatory设置为true否则会删除消息
    publisher-returns: true
     #消费者端开启自动ack模式
    template.mandatory: true
    #新版本publisher-/confirm/is已经修改为publisher-/confirm/i-type,默认为NONE,CORRELATED值是发布消息成功到交换器会触发回调
    publisher-/confirm/i-type: correlated
    listener:
      simple:
        acknowledge-mode: manual
        #并发消费者的最大值
        max-concurrency: 5
        #并发消费者的初始化值
        concurrency: 1
        #每个消费者每次监听时可拉取
        prefetch: 1
        # 重试机制
        retry:
          #是否开启消费者重试
          enabled: true 
          #最大重试次数
          max-attempts: 5
          #重试间隔时间(单位毫秒)
          initial-interval: 5000
          #重试最大时间间隔(单位毫秒)
          max-interval: 1200000
          #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
          multiplier: 2 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678027.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存