controller层:MQ 消息发送器 接口RabbitMQ的配置项
RabbitMQBeanProcessorMQ 线程池配置 mq清除商户登录信息通知mq支付订单商户通知
代码选自jeepay
controller层:@RestController @RequestMapping("/api/mchNotify") public class MchNotifyController extends CommonCtrl { @Autowired private MchNotifyRecordService mchNotifyService; @Autowired private IMQSender mqSender; @PreAuthorize("hasAuthority('ENT_MCH_NOTIFY_RESEND')") @RequestMapping(value="resend/{notifyId}", method = RequestMethod.POST) public ApiRes resend(@PathVariable("notifyId") Long notifyId) { MchNotifyRecord mchNotify = mchNotifyService.getById(notifyId); if (mchNotify == null) { return ApiRes.fail(ApiCodeEnum.SYS_OPERATION_FAIL_SELETE); } if (mchNotify.getState() != MchNotifyRecord.STATE_FAIL) { throw new BizException("请选择失败的通知记录"); } //更新通知中 mchNotifyService.getbaseMapper().updateIngAndAddNotifyCountLimit(notifyId); //调起MQ重发 mqSender.send(PayOrderMchNotifyMQ.build(notifyId)); return ApiRes.ok(mchNotify); } }MQ 消息发送器 接口
public interface IMQSender { void send(AbstractMQ mqModel); void send(AbstractMQ mqModel, int delay); }
有三个实现类,根据自己使用的mq类型来选择:
这里看rabbitmq的:
@Component @ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) //在spring boot中有时候需要控制配置类是否生效,可以使用@ConditionalOnProperty注解来控制@Configuration是否生效. public class RabbitMQSender implements IMQSender { @Autowired private RabbitTemplate rabbitTemplate; @Override public void send(AbstractMQ mqModel) { if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){ rabbitTemplate.convertAndSend(mqModel.getMQName(), mqModel.toMessage()); }else{ // fanout模式 的 routeKEY 没意义。 this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage()); } } @Override public void send(AbstractMQ mqModel, int delay) { if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){ rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, mqModel.getMQName(), mqModel.toMessage(), messagePostProcessor ->{ messagePostProcessor.getMessageProperties().setDelay(Math.toIntExact(delay * 1000)); return messagePostProcessor; }); }else{ // fanout模式 的 routeKEY 没意义。 没有延迟属性 this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage()); } } }
PayOrderMchNotifyMQ的 build方法:
@Data @NoArgsConstructor @AllArgsConstructor public class PayOrderMchNotifyMQ extends AbstractMQ { public static final String MQ_NAME = "QUEUE_PAY_ORDER_MCH_NOTIFY"; private MsgPayload payload; @Data @AllArgsConstructor public static class MsgPayload { private Long notifyId; } @Override public String getMQName() { return MQ_NAME; } @Override public MQSendTypeEnum getMQType(){ return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式 } @Override public String toMessage() { return JSONObject.toJSONString(payload); } public static PayOrderMchNotifyMQ build(Long notifyId){ return new PayOrderMchNotifyMQ(new MsgPayload(notifyId)); } public static MsgPayload parse(String msg){ return JSON.parseObject(msg, MsgPayload.class); } public interface IMQReceiver{ void receive(MsgPayload payload); } }RabbitMQ的配置项
@Component @ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) public class RabbitMQConfig { public static final String DELAYED_EXCHANGE_NAME = "delayedExchange"; public static final String FANOUT_EXCHANGE_NAME_PREFIX = "fanout_exchange_"; @Autowired @Qualifier(DELAYED_EXCHANGE_NAME) private CustomExchange delayedExchange; @Autowired private RabbitMQBeanProcessor rabbitMQBeanProcessor; @PostConstruct public void init(){ // 获取到所有的MQ定义 SetRabbitMQBeanProcessor> set = ClassUtil.scanPackageBySuper(ClassUtil.getPackage(AbstractMQ.class), AbstractMQ.class); for (Class> aClass : set) { // 实例化 AbstractMQ amq = (AbstractMQ) ReflectUtil.newInstance(aClass); // 注册Queue === new Queue(name), queue名称/bean名称 = mqName rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName(), BeanDefinitionBuilder.rootBeanDefinition(Queue.class).addConstructorArgValue(amq.getMQName()).getBeanDefinition()); // 广播模式 if(amq.getMQType() == MQSendTypeEnum.BROADCAST){ // 动态注册交换机, 交换机名称/bean名称 = FANOUT_EXCHANGE_NAME_PREFIX + amq.getMQName() rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(), BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () ->{ // 普通FanoutExchange 交换机 return new FanoutExchange(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),true,false); // 支持 延迟的 FanoutExchange 交换机, 配置无效果。 // Map args = new HashMap<>(); // args.put("x-delayed-type", ExchangeTypes.FANOUT); // return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } ).getBeanDefinition() ); }else{ // 延迟交换机与Queue进行绑定, 绑定Bean名称 = mqName_DelayedBind rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName() + "_DelayedBind", BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder.bind(SpringBeansUtil.getBean(amq.getMQName(), Queue.class)).to(delayedExchange).with(amq.getMQName()).noargs() ).getBeanDefinition() ); } } } }
@Configuration @ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) public class RabbitMQBeanProcessor implements BeanDefinitionRegistryPostProcessor { //对标准BeanFactoryPostProcessor SPI 的扩展,允许在常规 BeanFactoryPostProcessor 检测开始之前注册进一步的 bean 定义。 // 特别是,BeanDefinitionRegistryPostProcessor 可以注册进一步的 bean 定义, // 这些定义反过来定义 BeanFactoryPostProcessor 实例 protected BeanDefinitionRegistry beanDefinitionRegistry; @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException { this.beanDefinitionRegistry = beanDefinitionRegistry; } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { } @Bean(name = RabbitMQConfig.DELAYED_EXCHANGE_NAME) CustomExchange delayedExchange() { MapMQ 线程池配置args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } }
@Configuration @EnableAsync public class MqThreadExecutor { public static final String EXECUTOR_PAYORDER_MCH_NOTIFY = "mqQueue4PayOrderMchNotifyExecutor"; @Bean public Executor mqQueue4PayOrderMchNotifyExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(20); // 线程池维护线程的最少数量 executor.setMaxPoolSize(300); // 线程池维护线程的最大数量 executor.setQueueCapacity(10); // 缓存队列 executor.setThreadNamePrefix("payOrderMchNotifyExecutor-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //对拒绝task的处理策略 executor.setKeepAliveSeconds(60); // 允许的空闲时间 executor.initialize(); return executor; } }mq清除商户登录信息通知
public static final String MQ_NAME = "QUEUE_CLEAN_MCH_LOGIN_AUTH_CACHE"; private MsgPayload payload; @Data @AllArgsConstructor public static class MsgPayload { private ListuserIdList; } @Override public String getMQName() { return MQ_NAME; } @Override public MQSendTypeEnum getMQType(){ return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式 } @Override public String toMessage() { return JSONObject.toJSONString(payload); } public static CleanMchLoginAuthCacheMQ build(List userIdList){ return new CleanMchLoginAuthCacheMQ(new MsgPayload(userIdList)); } public static MsgPayload parse(String msg){ return JSON.parseObject(msg, MsgPayload.class); } public interface IMQReceiver{ void receive(MsgPayload payload); } }
对应消息接收器:
@Override @Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY) @RabbitListener(queues = CleanMchLoginAuthCacheMQ.MQ_NAME) public void receiveMsg(String msg){ mqReceiver.receive(CleanMchLoginAuthCacheMQ.parse(msg)); } }
@Slf4j @Component public class CleanMchLoginAuthCacheMQReceiver implements CleanMchLoginAuthCacheMQ.IMQReceiver { @Override public void receive(CleanMchLoginAuthCacheMQ.MsgPayload payload) { log.info("成功接收删除商户用户登录的订阅通知, msg={}", payload); // 字符串转Listmq支付订单商户通知List userIdList = payload.getUserIdList(); // 删除redis用户缓存 if(userIdList == null || userIdList.isEmpty()){ log.info("用户ID为空"); return ; } for (Long sysUserId : userIdList) { Collection cacheKeyList = RedisUtil.keys(CS.getCacheKeyToken(sysUserId, "*")); if(cacheKeyList == null || cacheKeyList.isEmpty()){ continue; } for (String cacheKey : cacheKeyList) { // 删除用户Redis信息 RedisUtil.del(cacheKey); continue; } } log.info("无权限登录用户信息已清除"); } }
@Component @ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) @ConditionalOnBean(PayOrderMchNotifyMQ.IMQReceiver.class) public class PayOrderMchNotifyRabbitMQReceiver implements IMQMsgReceiver { @Autowired private PayOrderMchNotifyMQ.IMQReceiver mqReceiver; @Override @Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY) // @Async中的参数用于指定使用哪一个线程池执行 @RabbitListener(queues = PayOrderMchNotifyMQ.MQ_NAME) public void receiveMsg(String msg){ mqReceiver.receive(PayOrderMchNotifyMQ.parse(msg)); } }
PayOrderMchNotifyMQ:
@Data @NoArgsConstructor @AllArgsConstructor public class PayOrderMchNotifyMQ extends AbstractMQ { public static final String MQ_NAME = "QUEUE_PAY_ORDER_MCH_NOTIFY"; private MsgPayload payload; @Data @AllArgsConstructor public static class MsgPayload { private Long notifyId; } @Override public String getMQName() { return MQ_NAME; } @Override public MQSendTypeEnum getMQType(){ return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式 } @Override public String toMessage() { return JSONObject.toJSONString(payload); } public static PayOrderMchNotifyMQ build(Long notifyId){ return new PayOrderMchNotifyMQ(new MsgPayload(notifyId)); } public static MsgPayload parse(String msg){ return JSON.parseObject(msg, MsgPayload.class); } public interface IMQReceiver{ void receive(MsgPayload payload); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)