利用redis实现,因此先导入Redis依赖:
org.springframework.boot spring-boot-starter-data-redis
编写配置文件,增加redis的ip和端口配置
spring: rabbitmq: host: 192.168.200.129 port: 5672 username: test password: test virtual-host: /test listener: simple: acknowledge-mode: manual publisher-/confirm/i-type: simple publisher-returns: true redis: host: 192.168.200.129 port: 6379
修改生产者,
@Test void contextLoads() throws IOException { CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//data,id的d巧记 rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);//增加一个data即封装id的对象,方法重载 System.in.read(); }
修改消费者,利用redis和setnx方法的特点和id控制标记法,解决消息被消费者重复消费的问题
@Autowired private StringRedisTemplate redisTemplate;//换类StringRedisTemplate是RedisTemplate的子类,更加强大 @RabbitListener(queues = "boot-queue") public void getMessage(String msg, Channel channel, Message message) throws IOException { //0. 通过消息属性的头,或者spring被动返回的消息相关,数据比如获取MessageId String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation"); //1. 设置key到Redis if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {//原来是10秒测试redis中是否有消费完消息后id为1的数据,设置为10秒太快消失不便于下面的测试查看 //2. 消费消息,暂时先打印消息来模拟消费,至于消息以后用来干啥得看具体的需求↓ System.out.println("接收到消息:" + msg); //3. 设置key的value为1 redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS); //4. 手动ack channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }else { //5. 获取Redis中的value即可 如果是1,手动ack if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } } }
启动测试,可以看到效果。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)