【设计模式-策略模式】

【设计模式-策略模式】,第1张

【设计模式-策略模式】

使用策略模式改造公司的一段关于 RabbitMQ 延时队列的一段代码

改造之前的代码如下:

@RabbitListener(queues = "${queue.name}")
    public void process(Message content, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        String msg = new String(content.getBody());
        log.info("队列接收消息: msg = {}", msg);
        RabbitMsg rmsg= JSONObject.parseObject(msg, RabbitMsg.class);
        try {
            String pType = rmsg.getProcessType();
            if ("A".equals(pType)) {
            	//do something...
            } else if ("B".equals(pType)) {
            	//do something...
            } else if ("C".equals(pType)) {
            	//do something...
            } else if ("D".equals(pType)) {
            	//do something...
            } else if ("E".equals(pType)) {
            	//do something...
            }
            
             else if ("ZZ".equals(pType)) {
            	//do something...
            }
        } catch (Exception e) {
        	// 增加try catch,防止消费失败后,一直循环消费
            log.error("消息消费异常:参数->{}", msg);
            RabbitExceptionInfo info= new RabbitExceptionInfo();
            info.setData(rmsg.getData());
            info.setTime(DateUtil.format(new Date(), DateUtil.DATETYPE_1));
            indo.setRemarks(EXCEPTION_REMARK);
            //加入异常队列
            produceExceptionMsg(rabbitExceptionInfo);
        }
        //手动ack
        channel.basicAck(tag, false);
    }

这样根据一个 processType 来判断做什么事情,用策略模式是比较适合的

动手改造吧

  • 先将根据 processType 的处理逻辑抽象出来:
public interface RabbitmqDlxHandler {
    
    void doProcess(RabbitMsg rmsg);
}
  • 有接口,就会有具体的接口实现类来完成具体的业务逻辑:

@Component
@ProcessType(value = "A")
public class processTypeA implements RabbitmqDlxHandler {

	
	@Override
	void doProcess(RabbitMsg rmsg) {
		//do something about processType A...
	}
}

@Component
@ProcessType(value = "B")
public class processTypeB implements RabbitmqDlxHandler {

	
	@Override
	void doProcess(RabbitMsg rmsg) {
		//do something about processType B...
	}
}

@Component
@ProcessType(value = "C")
public class processTypeC implements RabbitmqDlxHandler {

	
	@Override
	void doProcess(RabbitMsg rmsg) {
		//do something about processType C...
	}
}
  • 定义一个注解 @ProcessType 来标注处理器的类型,并使用此注解将处理器初始化(后面会用到):
@Inherited
@documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ProcessType {

    
    @NotEmpty
    String value();
}
  • 项目启动,需要先将具体的处理器初始化到上下文中:
@Slf4j
@Component
public class RabbitmqDlxContext implements ApplicationContextAware {

    private static final Map HANDLER_MAP = new HashMap<>();

    @Autowired
    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map beansWithAnnotation = applicationContext.getBeansWithAnnotation(ProcessType.class);
        if (!CollectionUtils.isEmpty(beansWithAnnotation)) {
            beansWithAnnotation.forEach((k, v) -> HANDLER_MAP.put(v.getClass().getAnnotation(ProcessType.class).value(), v.getClass()));
            log.info("死信队列处理器初始化完毕:HANDLER_MAP = {}", HANDLER_MAP);
        }
    }

    public RabbitmqDlxHandler getConcertHandler(String processType) {
        Class aClass = HANDLER_MAP.get(processType);
        if (Objects.isNull(aClass))
            throw new ApplicationException("8848", "系统中没有orderType =  " + processType+ " 对应的处理器", null);
        return (RabbitmqDlxHandler) applicationContext.getBean(aClass);
    }
}
  • 准备工作完成了,接着来改造 RabbitMQ 延时队列的逻辑代码吧:
	@Resource
    private RabbitmqDlxContext rabbitmqDlxContext;
    
	@RabbitListener(queues = "${queue.name}")
    public void process(Message content, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        String msg = new String(content.getBody());
        log.info("队列接收消息: msg = {}", msg);
        RabbitMsg rmsg = JSONObject.parseObject(msg, RabbitMsg.class);
        try {
            String pType = rmsg.getProcessType();
            rabbitmqDlxContext.getConcertHandler(pType).doProcess(rmsg);
        } catch (Exception e) {
        	// 增加try catch,防止消费失败后,一直循环消费
            log.error("消息消费异常:参数->{}", msg);
            RabbitExceptionInfo info= new RabbitExceptionInfo();
            info.setData(rmsg.getData());
            info.setTime(DateUtil.format(new Date(), DateUtil.DATETYPE_1));
            indo.setRemarks(EXCEPTION_REMARK);
            //加入异常队列
            produceExceptionMsg(rabbitExceptionInfo);
        }
        //手动ack
        channel.basicAck(tag, false);
    }

  • 后续需要扩展处理器的类型,只需要增加一个处理器然后实现抽象处理器接口,然后加上 @ProcessType 注解即可
  • 消息处理异常后,加入异常队列,监听异常队列消费信息,此时需要考虑消息消费的幂等性哟

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5681120.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存