当消息多次消费失败后,只将消息放入死信队列其实是不够的,至少也得给个静默处理的结果吧,也就是所谓的降级处理。
引入依赖、创建topic等工作就不再赘述,可以参考上篇文章:【spring cloud】stream集成rabbitMQ,实现Topic订阅发布
配置如下:
#Fallback配置 spring.cloud.stream.bindings.fallback-consumer.destination=fallback-topic spring.cloud.stream.bindings.fallback-producer.destination=fallback-topic #重试次数(本机重试,不发回mq)(次数等于1相当于不重试) spring.cloud.stream.bindings.fallback-consumer.consumer.max-attempts=2 spring.cloud.stream.bindings.fallback-consumer.group=fallback-group
注意,这里的配置只是正常的绑定exchange、本地重试等配置,与降级无关。
降级真正的处理在于consumer的代码:
//Fallback+升版 @StreamListener(FallBackTopic.INPUT) public void goodbyeBadGut(Object payLoad, @Header("version") String version) { log.info(" Fallback - r u ok"); if ("1.0".equalsIgnoreCase(version)) { log.info("Fallback - fine, 3ku,and u?"); } else if ("2.0".equalsIgnoreCase(version)){ log.info("Fallback - Unsurported Version"); throw new RuntimeException("DLQ - I'm not OK"); } else { log.info("Fallback - version={}",version); } log.info("Fallback - Error message consumed successfully, payLoad={}", payLoad); } @ServiceActivator(inputChannel = "fallback-topic.fallback-group.errors") public void fallback(Message>message){ log.info("fallback entered"); }
通过注解 @ServiceActivator,使来自于信道(值为队列名称)的消息在抛出异常后实现降级。
而代码中的version字段,则可以使得consumer支持不同版本的接口,实现接口升版和兼容。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)