rabbitmq在商户通知中的使用代码实例

rabbitmq在商户通知中的使用代码实例,第1张

rabbitmq在商户通知中的使用代码实例

文章目录

controller层:MQ 消息发送器 接口RabbitMQ的配置项

RabbitMQBeanProcessorMQ 线程池配置 mq清除商户登录信息通知mq支付订单商户通知

代码选自jeepay

controller层:
@RestController
@RequestMapping("/api/mchNotify")
public class MchNotifyController extends CommonCtrl {

    @Autowired private MchNotifyRecordService mchNotifyService;
    @Autowired private IMQSender mqSender;

@PreAuthorize("hasAuthority('ENT_MCH_NOTIFY_RESEND')")
@RequestMapping(value="resend/{notifyId}", method = RequestMethod.POST)
public ApiRes resend(@PathVariable("notifyId") Long notifyId) {
    MchNotifyRecord mchNotify = mchNotifyService.getById(notifyId);
    if (mchNotify == null) {
        return ApiRes.fail(ApiCodeEnum.SYS_OPERATION_FAIL_SELETE);
    }
    if (mchNotify.getState() != MchNotifyRecord.STATE_FAIL) {
        throw new BizException("请选择失败的通知记录");
    }

    //更新通知中
    mchNotifyService.getbaseMapper().updateIngAndAddNotifyCountLimit(notifyId);

    //调起MQ重发
    mqSender.send(PayOrderMchNotifyMQ.build(notifyId));

    return ApiRes.ok(mchNotify);
}
    
}

MQ 消息发送器 接口
public interface IMQSender {

    
    void send(AbstractMQ mqModel);

    
    void send(AbstractMQ mqModel, int delay);

}

有三个实现类,根据自己使用的mq类型来选择:

这里看rabbitmq的:

@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
//在spring boot中有时候需要控制配置类是否生效,可以使用@ConditionalOnProperty注解来控制@Configuration是否生效.

public class RabbitMQSender implements IMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(AbstractMQ mqModel) {

        if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){

            rabbitTemplate.convertAndSend(mqModel.getMQName(), mqModel.toMessage());
        }else{

            // fanout模式 的 routeKEY 没意义。
            this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage());
        }
    }

    @Override
    public void send(AbstractMQ mqModel, int delay) {


        if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){

            rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, mqModel.getMQName(), mqModel.toMessage(), messagePostProcessor ->{
                messagePostProcessor.getMessageProperties().setDelay(Math.toIntExact(delay * 1000));
                return messagePostProcessor;
            });
        }else{

            // fanout模式 的 routeKEY 没意义。  没有延迟属性
            this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage());
        }
    }

}

PayOrderMchNotifyMQ的 build方法:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PayOrderMchNotifyMQ extends AbstractMQ {

    
    public static final String MQ_NAME = "QUEUE_PAY_ORDER_MCH_NOTIFY";

    
    private MsgPayload payload;

    
    @Data
    @AllArgsConstructor
    public static class MsgPayload {

        
        private Long notifyId;

    }

    @Override
    public String getMQName() {
        return MQ_NAME;
    }

    
    @Override
    public MQSendTypeEnum getMQType(){
        return MQSendTypeEnum.QUEUE;  // QUEUE - 点对点 、 BROADCAST - 广播模式
    }

    @Override
    public String toMessage() {
        return JSONObject.toJSONString(payload);
    }

    
    public static PayOrderMchNotifyMQ build(Long notifyId){
        return new PayOrderMchNotifyMQ(new MsgPayload(notifyId));
    }

    
    public static MsgPayload parse(String msg){
        return JSON.parseObject(msg, MsgPayload.class);
    }

    
    public interface IMQReceiver{
        void receive(MsgPayload payload);
    }

}
RabbitMQ的配置项

@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
public class RabbitMQConfig {

    
    public static final String DELAYED_EXCHANGE_NAME = "delayedExchange";

    
    public static final String FANOUT_EXCHANGE_NAME_PREFIX = "fanout_exchange_";

    
    @Autowired
    @Qualifier(DELAYED_EXCHANGE_NAME)
    private CustomExchange delayedExchange;

    
    @Autowired
    private RabbitMQBeanProcessor rabbitMQBeanProcessor;

    
    @PostConstruct
    public void init(){

        // 获取到所有的MQ定义
        Set> set = ClassUtil.scanPackageBySuper(ClassUtil.getPackage(AbstractMQ.class), AbstractMQ.class);

        for (Class aClass : set) {

            // 实例化
            AbstractMQ amq = (AbstractMQ) ReflectUtil.newInstance(aClass);

            // 注册Queue === new Queue(name),  queue名称/bean名称 = mqName
            rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName(),
                    BeanDefinitionBuilder.rootBeanDefinition(Queue.class).addConstructorArgValue(amq.getMQName()).getBeanDefinition());

            // 广播模式
            if(amq.getMQType() == MQSendTypeEnum.BROADCAST){

                // 动态注册交换机, 交换机名称/bean名称 =  FANOUT_EXCHANGE_NAME_PREFIX + amq.getMQName()
                rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),
                        BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () ->{

                            // 普通FanoutExchange 交换机
                             return new FanoutExchange(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),true,false);

                            // 支持 延迟的 FanoutExchange 交换机, 配置无效果。
//                            Map args = new HashMap<>();
//                            args.put("x-delayed-type", ExchangeTypes.FANOUT);
//                            return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
                        }

                        ).getBeanDefinition()
                );

            }else{

                // 延迟交换机与Queue进行绑定, 绑定Bean名称 = mqName_DelayedBind
                rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName() + "_DelayedBind",
                        BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () ->
                                BindingBuilder.bind(SpringBeansUtil.getBean(amq.getMQName(), Queue.class)).to(delayedExchange).with(amq.getMQName()).noargs()

                        ).getBeanDefinition()
                );
            }
        }
    }
}

RabbitMQBeanProcessor

@Configuration
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
public class RabbitMQBeanProcessor implements BeanDefinitionRegistryPostProcessor {
//对标准BeanFactoryPostProcessor SPI 的扩展,允许在常规 BeanFactoryPostProcessor 检测开始之前注册进一步的 bean 定义。
// 特别是,BeanDefinitionRegistryPostProcessor 可以注册进一步的 bean 定义,
// 这些定义反过来定义 BeanFactoryPostProcessor 实例
    
    protected BeanDefinitionRegistry beanDefinitionRegistry;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
        this.beanDefinitionRegistry = beanDefinitionRegistry;
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
    }

    
    @Bean(name = RabbitMQConfig.DELAYED_EXCHANGE_NAME)
    CustomExchange delayedExchange() {
        Map args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

}

MQ 线程池配置
@Configuration
@EnableAsync
public class MqThreadExecutor {

    public static final String EXECUTOR_PAYORDER_MCH_NOTIFY = "mqQueue4PayOrderMchNotifyExecutor";

    
    @Bean
    public Executor mqQueue4PayOrderMchNotifyExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20); // 线程池维护线程的最少数量
        executor.setMaxPoolSize(300);  // 线程池维护线程的最大数量
        executor.setQueueCapacity(10); // 缓存队列
        executor.setThreadNamePrefix("payOrderMchNotifyExecutor-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //对拒绝task的处理策略
        executor.setKeepAliveSeconds(60); // 允许的空闲时间
        executor.initialize();
        return executor;
    }

}

mq清除商户登录信息通知
    public static final String MQ_NAME = "QUEUE_CLEAN_MCH_LOGIN_AUTH_CACHE";

    
    private MsgPayload payload;

    
    @Data
    @AllArgsConstructor
    public static class MsgPayload {

        
        private List userIdList;

    }

    @Override
    public String getMQName() {
        return MQ_NAME;
    }

    
    @Override
    public MQSendTypeEnum getMQType(){
        return MQSendTypeEnum.QUEUE;  // QUEUE - 点对点 、 BROADCAST - 广播模式
    }

    @Override
    public String toMessage() {
        return JSONObject.toJSONString(payload);
    }

    
    public static CleanMchLoginAuthCacheMQ build(List userIdList){
        return new CleanMchLoginAuthCacheMQ(new MsgPayload(userIdList));
    }

    
    public static MsgPayload parse(String msg){
        return JSON.parseObject(msg, MsgPayload.class);
    }

    
    public interface IMQReceiver{
        void receive(MsgPayload payload);
    }

}

对应消息接收器:

    @Override
    @Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
    @RabbitListener(queues = CleanMchLoginAuthCacheMQ.MQ_NAME)
    public void receiveMsg(String msg){
        mqReceiver.receive(CleanMchLoginAuthCacheMQ.parse(msg));
    }

}


@Slf4j
@Component
public class CleanMchLoginAuthCacheMQReceiver implements CleanMchLoginAuthCacheMQ.IMQReceiver {

    @Override
    public void receive(CleanMchLoginAuthCacheMQ.MsgPayload payload) {

        log.info("成功接收删除商户用户登录的订阅通知, msg={}", payload);
        // 字符串转List
        List userIdList = payload.getUserIdList();
        // 删除redis用户缓存
        if(userIdList == null || userIdList.isEmpty()){
            log.info("用户ID为空");
            return ;
        }
        for (Long sysUserId : userIdList) {
            Collection cacheKeyList = RedisUtil.keys(CS.getCacheKeyToken(sysUserId, "*"));
            if(cacheKeyList == null || cacheKeyList.isEmpty()){
                continue;
            }
            for (String cacheKey : cacheKeyList) {
                // 删除用户Redis信息
                RedisUtil.del(cacheKey);
                continue;
            }
        }
        log.info("无权限登录用户信息已清除");
    }
}

mq支付订单商户通知
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
@ConditionalOnBean(PayOrderMchNotifyMQ.IMQReceiver.class)
public class PayOrderMchNotifyRabbitMQReceiver implements IMQMsgReceiver {

    @Autowired
    private PayOrderMchNotifyMQ.IMQReceiver mqReceiver;

    
    @Override
    @Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
// @Async中的参数用于指定使用哪一个线程池执行
    @RabbitListener(queues = PayOrderMchNotifyMQ.MQ_NAME)
    public void receiveMsg(String msg){
        mqReceiver.receive(PayOrderMchNotifyMQ.parse(msg));
    }

}

PayOrderMchNotifyMQ:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class PayOrderMchNotifyMQ extends AbstractMQ {

    
    public static final String MQ_NAME = "QUEUE_PAY_ORDER_MCH_NOTIFY";

    
    private MsgPayload payload;

    
    @Data
    @AllArgsConstructor
    public static class MsgPayload {

        
        private Long notifyId;

    }

    @Override
    public String getMQName() {
        return MQ_NAME;
    }

    
    @Override
    public MQSendTypeEnum getMQType(){
        return MQSendTypeEnum.QUEUE;  // QUEUE - 点对点 、 BROADCAST - 广播模式
    }

    @Override
    public String toMessage() {
        return JSONObject.toJSONString(payload);
    }

    
    public static PayOrderMchNotifyMQ build(Long notifyId){
        return new PayOrderMchNotifyMQ(new MsgPayload(notifyId));
    }

    
    public static MsgPayload parse(String msg){
        return JSON.parseObject(msg, MsgPayload.class);
    }

    
    public interface IMQReceiver{
        void receive(MsgPayload payload);
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5717820.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存