使用策略模式改造公司的一段关于 RabbitMQ 延时队列的一段代码
改造之前的代码如下:
@RabbitListener(queues = "${queue.name}") public void process(Message content, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { String msg = new String(content.getBody()); log.info("队列接收消息: msg = {}", msg); RabbitMsg rmsg= JSONObject.parseObject(msg, RabbitMsg.class); try { String pType = rmsg.getProcessType(); if ("A".equals(pType)) { //do something... } else if ("B".equals(pType)) { //do something... } else if ("C".equals(pType)) { //do something... } else if ("D".equals(pType)) { //do something... } else if ("E".equals(pType)) { //do something... } else if ("ZZ".equals(pType)) { //do something... } } catch (Exception e) { // 增加try catch,防止消费失败后,一直循环消费 log.error("消息消费异常:参数->{}", msg); RabbitExceptionInfo info= new RabbitExceptionInfo(); info.setData(rmsg.getData()); info.setTime(DateUtil.format(new Date(), DateUtil.DATETYPE_1)); indo.setRemarks(EXCEPTION_REMARK); //加入异常队列 produceExceptionMsg(rabbitExceptionInfo); } //手动ack channel.basicAck(tag, false); }
这样根据一个 processType 来判断做什么事情,用策略模式是比较适合的
动手改造吧
- 先将根据 processType 的处理逻辑抽象出来:
public interface RabbitmqDlxHandler { void doProcess(RabbitMsg rmsg); }
- 有接口,就会有具体的接口实现类来完成具体的业务逻辑:
@Component @ProcessType(value = "A") public class processTypeA implements RabbitmqDlxHandler { @Override void doProcess(RabbitMsg rmsg) { //do something about processType A... } }
@Component @ProcessType(value = "B") public class processTypeB implements RabbitmqDlxHandler { @Override void doProcess(RabbitMsg rmsg) { //do something about processType B... } }
@Component @ProcessType(value = "C") public class processTypeC implements RabbitmqDlxHandler { @Override void doProcess(RabbitMsg rmsg) { //do something about processType C... } }
- 定义一个注解 @ProcessType 来标注处理器的类型,并使用此注解将处理器初始化(后面会用到):
@Inherited @documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface ProcessType { @NotEmpty String value(); }
- 项目启动,需要先将具体的处理器初始化到上下文中:
@Slf4j @Component public class RabbitmqDlxContext implements ApplicationContextAware { private static final MapHANDLER_MAP = new HashMap<>(); @Autowired private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(ProcessType.class); if (!CollectionUtils.isEmpty(beansWithAnnotation)) { beansWithAnnotation.forEach((k, v) -> HANDLER_MAP.put(v.getClass().getAnnotation(ProcessType.class).value(), v.getClass())); log.info("死信队列处理器初始化完毕:HANDLER_MAP = {}", HANDLER_MAP); } } public RabbitmqDlxHandler getConcertHandler(String processType) { Class aClass = HANDLER_MAP.get(processType); if (Objects.isNull(aClass)) throw new ApplicationException("8848", "系统中没有orderType = " + processType+ " 对应的处理器", null); return (RabbitmqDlxHandler) applicationContext.getBean(aClass); } }
- 准备工作完成了,接着来改造 RabbitMQ 延时队列的逻辑代码吧:
@Resource private RabbitmqDlxContext rabbitmqDlxContext; @RabbitListener(queues = "${queue.name}") public void process(Message content, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { String msg = new String(content.getBody()); log.info("队列接收消息: msg = {}", msg); RabbitMsg rmsg = JSONObject.parseObject(msg, RabbitMsg.class); try { String pType = rmsg.getProcessType(); rabbitmqDlxContext.getConcertHandler(pType).doProcess(rmsg); } catch (Exception e) { // 增加try catch,防止消费失败后,一直循环消费 log.error("消息消费异常:参数->{}", msg); RabbitExceptionInfo info= new RabbitExceptionInfo(); info.setData(rmsg.getData()); info.setTime(DateUtil.format(new Date(), DateUtil.DATETYPE_1)); indo.setRemarks(EXCEPTION_REMARK); //加入异常队列 produceExceptionMsg(rabbitExceptionInfo); } //手动ack channel.basicAck(tag, false); }
- 后续需要扩展处理器的类型,只需要增加一个处理器然后实现抽象处理器接口,然后加上 @ProcessType 注解即可
- 消息处理异常后,加入异常队列,监听异常队列消费信息,此时需要考虑消息消费的幂等性哟
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)