Java实现一个MQ

Java实现一个MQ,第1张

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。

指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。

消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。

削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

消费者 消费者拉取消息

消息由消费者定时从 broker 拉取,优点是实现简单,可以根据消费者自己的处理能力来消费。

缺点是实时性相对较差

定时拉取

我们通过重载了 push 策略的 afterInit 方法实现定时拉取。

拉取消息回执 

消费者拉取消息需要回执,告诉中间人消息拉取成功。实现并不复杂,该处不再贴出实现代码。

需要注意,同一个消息,可以被不同的 groupName 进行消费,所以回执是需要根据 groupName 进行分开的。

public class MqConsumerPull extends MqConsumerPush {
    @Override
    public void subscribe(String topicName, String tagRegex) {
        MqTopicTagDto tagDto = buildMqTopidTagDto(topicName,tagRegex);

        if(!subscribeList.contains(tagDto)){
            subscribeList.add(tagDto);
        }
    }

    @Override
    public void unSubscribe(String topicName, String tagRegex) {
        MqTopicTagDto tagDto = buildMqTopidTagDto(topicName,tagRegex);
        subscribeList.remove(tagDto);
    }

    private MqTopicTagDto buildMqTopidTagDto(String topicName,String tagRegex){
    MqTopicTagDto dto = new MqTopicTagDto();
    dto.setTagRegex(tagRegex);
    dto.setTopicName(topicName);
    //主动拉取这里应该会有问题,会导致不同的groupName的消息,实际上已经被消费了
        //所以实际上应该有一个消息+group的映射关系表,单个消息可以被多次重复消费
        //groupName+messageId+status==>在数据库层面实现
        dto.setGroupName(groupName);
        return dto;
    }

    /**
     * 初始化拉取消息
     */
    @Override
    protected void afterInit() {

        //5s发一次心跳
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if(CollectionUtil.isEmpty(subscribeList)) {
                    log.warn("订阅列表为空,忽略处理。");
                    return;
                }
                for(MqTopicTagDto tagDto:subscribeList){
                    final String topicName = tagDto.getTopicName();
                    final String tagRegex = tagDto.getTagRegex();
                    MqConsumerPullResp resp = consumerBrokerService.pull(topicName,tagRegex,size);
                    if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())){
                        List mqMessageList = resp.getList();
                        if(CollectionUtil.isNotEmpty(mqMessageList)){
                            List statusDtoList = new ArrayList<>(mqMessageList.size());
                            for(MqMessage mqMessage : mqMessageList){
                                IMqConsumerListenerContext context = new MqConsumerListenerContext();
                                final String messageId = mqMessage.getTraceId();
                                ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage,context);
                                log.info("消息:{} 消费结果 {}",messageId,consumerStatus);

                                //状态同步更新
                                if(!ackBatchFlag){
                                    MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId,consumerStatus);
                                    log.info("消息:{} 状态回执结果 {}",messageId, JSON.toJSON(ackResp));
                                }else {
                                    //批量
                                    MqConsumerUpdateStatusDto statusDto = new MqConsumerUpdateStatusDto();
                                    statusDto.setMessageId(messageId);
                                    statusDto.setMessageStatus(consumerStatus.getCode());
                                    statusDto.setConsumerGroupName(groupName);
                                    statusDtoList.add(statusDto);
                                }
                            }
                            //批量执行
                            if(ackBatchFlag){
                                MqCommonResp ackResp = consumerBrokerService.consumerStatusAckBatch(statusDtoList);
                                log.info("消息:{} 状态批量回执结果 {}",statusDtoList,JSON.toJSON(ackResp));
                                statusDtoList = null;
                            }
                        }else {
                            log.error("拉取消息失败:{}",JSON.toJSON(resp));
                        }
                    }
                }
            }
        },pullInitDelaySeconds,pullPeriodSeconds, TimeUnit.SECONDS);
    }
}
消费者通过推送获取消息

消息由 broker 直接推送给消费者,实时性比较好。

缺点是如果消费者处理不过来,就会造成大量问题。

public class MqConsumerPush extends Thread implements IMqConsumer {

    /**
     * 参数校验
     */
    private void paramCheck(){
        ArgUtil.notEmpty(brokerAddress,"brokerAddress");
        ArgUtil.notEmpty(groupName,"groupName");
    }

    @Override
    public void run() {
        // 启动服务端
        log.info("MQ 消费者开始启动服务端 groupName :{}, brokerAddress:{}",groupName,brokerAddress);

        //1.参数校验
        this.paramCheck();

        try {
            //0.配置信息
            ConsumerBrokerConfig config = ConsumerBrokerConfig.newInstance()
                    .groupName(groupName).brokerAddress(brokerAddress).check(check)
                    .respTimeoutMills(respTimeoutMills).invokeService(invokeService)
                    .statusManager(statusManager).mqListenerService(mqListenerService)
                    .loadBalance(loadBalance).subscribeMaxAttempt(subscribeMaxAttempt)
                    .unSubscribeMaxAttempt(unSubscribeMaxAttempt).consumerStatusMaxAttempt(consumerStatusMaxAttempt)
                    .appKey(appKey).appSecret(appSecret);

            //1.初始化
            this.consumerBrokerService.initChannelFutureList(config);

            //2.连接到服务端
            this.consumerBrokerService.registerToBroker();

            //3.标识为可用
            statusManager.status(true);

            //4.添加钩子函数
            final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook();
            rpcShutdownHook.setStatusManager(statusManager);
            rpcShutdownHook.setInvokeService(invokeService);
            rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest);
            rpcShutdownHook.setDestroyable(this.consumerBrokerService);
            ShutdownHooks.rpcShutdownHook(rpcShutdownHook);

            //5.启动完成以后的时间
            afterInit();

            log.info("MQ 消费者启动完成");
        }catch (Exception e){
            log.error("MQ 消费者启动异常",e);

            statusManager.initFailed(true);

            throw  new MqException(ConsumerRespCode.RPC_INIT_FAILED);
        }
    }

    /**
     * 初始化完成后
     */
    protected void afterInit(){}

    @Override
    public void subscribe(String topicName, String tagRegex) {
        final String consumerType = getConsumerType();
        consumerBrokerService.subscribe(topicName,tagRegex,consumerType);
    }

    /**
     * 获取消费策略类型
     * @return
     */
    protected String getConsumerType(){
        return ConsumerTypeConst.PUSH;
    }



    @Override
    public void unSubscribe(String topicName, String tagRegex) {
        final String consumerType = getConsumerType();
        consumerBrokerService.unSubscribe(topicName,tagRegex,consumerType);
    }

    @Override
    public void registerListener(IMqConsumerListener listener) {
        this.mqListenerService.register(listener);
    }

}
消费者处理类(业务处理逻辑)
public class MqConsumerHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(MqConsumerHandler.class);

    /**
     * 调用管理类
     */
    private IInvokeService invokeService;

    /**
     * 消息监听服务类
     */
    private IMqListenerService mqListenerService;

    public void setInvokeService(IInvokeService invokeService){
        this.invokeService = invokeService;
    }

    public void setMqListenerService(IMqListenerService mqListenerService){
        this.mqListenerService = mqListenerService;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);

        RpcMessageDto rpcMessageDto = null;

        try {
            rpcMessageDto = JSON.parseObject(bytes,RpcMessageDto.class);
        }catch (Exception e){
            log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes));
            return;
        }

        if(rpcMessageDto.isRequest()){
            MqCommonResp commonResp = this.dispatch(rpcMessageDto,ctx);

            if(commonResp == null){
                log.debug("当前消息为null,忽略处理.");
                return;
            }
            writeResponse(rpcMessageDto,commonResp,ctx);
        }else {
            final String traceId = rpcMessageDto.getTraceId();

            //丢掉traceId为空的信息
            if(StringUtil.isBlank(traceId)){
                log.debug("[server Response] response traceId 为空,直接丢弃",JSON.toJSON(rpcMessageDto));
                return;
            }

            //添加消息
            invokeService.addResponse(traceId,rpcMessageDto);
        }
    }


    /**
     * 消息的分发
     *
     * @param rpcMessageDto 入参
     * @param ctx 上下文
     * @return
     */
    private MqCommonResp dispatch(RpcMessageDto rpcMessageDto,ChannelHandlerContext ctx){
        final String methodType = rpcMessageDto.getMethodType();
        final String json = rpcMessageDto.getJson();

        String channelId = ChannelUtil.getChannelId(ctx);
        log.debug("channelId:{} 接收到 method:{} 内容:{}",channelId,methodType,json);
        //消息发送
        if(MethodType.B_MESSAGE_PUSH.equals(methodType)){
            //日志输出
            log.info("接收到服务端消息:{}",json);
            return this.consumer(json);
        }
        throw new UnsupportedOperationException("暂不支持的方法类型");
    }

    /**
     * 消息消费
     * @param json 原始请求
     * @return 结果
     */
    private MqCommonResp consumer(final String json){
        try {
            //如果是broker,应该进行处理化等 *** 作
            MqMessage mqMessage = JSON.parseObject(json,MqMessage.class);
            IMqConsumerListenerContext context = new MqConsumerListenerContext();
            ConsumerStatus consumerStatus = this.mqListenerService.consumer(mqMessage,context);

            MqConsumerResultResp resp = new MqConsumerResultResp();
            resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
            resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
            resp.setConsumerStatus(consumerStatus.getCode());
            return resp;
        }catch (MqException mqException){
            log.error("消息消费业务异常",mqException);
            MqConsumerResultResp resp = new MqConsumerResultResp();
            resp.setRespCode(mqException.getCode());
            resp.setRespMessage(mqException.getMsg());
            return resp;
        }catch (Exception exception){
            log.error("消息消费系统异常",exception);
            MqConsumerResultResp resp = new MqConsumerResultResp();
            resp.setRespCode(MqCommonRespCode.FAIL.getCode());
            resp.setRespMessage(MqCommonRespCode.FAIL.getMsg());
            return resp;
        }
    }

    /**
     * 结果写回
     *
     * @param req 请求
     * @param resp 响应
     * @param ctx 上下文
     */
    private void writeResponse(RpcMessageDto req,
                               Object resp,
                               ChannelHandlerContext ctx){
        final String id = ctx.channel().id().asLongText();
        RpcMessageDto rpcMessageDto = new RpcMessageDto();

        //响应类消息
        rpcMessageDto.setRequest(false);
        rpcMessageDto.setTraceId(req.getTraceId());
        rpcMessageDto.setMethodType(req.getMethodType());
        rpcMessageDto.setRequestTime(System.currentTimeMillis());
        String json = JSON.toJSONString(resp);
        rpcMessageDto.setJson(json);

        //回写到client端
        ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
        ctx.writeAndFlush(byteBuf);
        log.debug("[server] channel {} response {}",id,JSON.toJSON(rpcMessageDto));
    }
}
消费者与broker
public class ConsumerBrokerService implements IConsumerBrokerService {


    /**
     * 初始化心跳
     */
    private void initHeartbeat(){
        //5s发一次心跳
        scheduledExecutorService.scheduleAtFixedRate(()->{heartbeat();},5,5, TimeUnit.SECONDS);
    }
    
    private ChannelHandler initChannelHandler(){
        
        final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);
        
        final MqConsumerHandler mqConsumerHandler = new MqConsumerHandler();
        mqConsumerHandler.setInvokeService(invokeService);
        mqConsumerHandler.setMqListenerService(mqListenerService);
        
        
        //handler实际上会被多次调用如果不是@Shareable,应该每次都重新创建
        ChannelHandler handler = new ChannelInitializer() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline()
                        .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH, delimiterBuf))
                        .addLast(mqConsumerHandler);
            }
        };
        return handler;
    }

    @Override
    public void registerToBroker() {
        int successCount = 0;
        
        for(RpcChannelFuture channelFuture : this.channelFutureList){
            ServiceEntry serviceEntry = new ServiceEntry();
            serviceEntry.setGroupName(groupName);
            serviceEntry.setAddress(channelFuture.getAddress());
            serviceEntry.setPort(channelFuture.getPort());
            serviceEntry.setWeight(channelFuture.getWeight());

            BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();
            brokerRegisterReq.setServiceEntry(serviceEntry);
            brokerRegisterReq.setMethodType(MethodType.C_REGISTER);
            brokerRegisterReq.setTraceId(IdHelper.uuid32());
            brokerRegisterReq.setAppKey(appKey);
            brokerRegisterReq.setAppSecret(appSecret);
            
            log.info("[Register] 开始注册到 broker :{}", JSON.toJSON(brokerRegisterReq));
            final Channel channel = channelFuture.getChannelFuture().channel();
            MqCommonResp resp = callServer(channel,brokerRegisterReq,MqCommonResp.class);
            log.info("[Register] 完成注册到 broker :{}",JSON.toJSON(resp));
            
            if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())){
                successCount++;
            }
        }
        
        if(successCount <=0 &&check){
            log.error("校验 broker 可用性, 可连接成功数为 0");
            throw new MqException(MqCommonRespCode.C_REGISTER_TO_BROKER_FAILED);
        }
    }

    @Override
    public  R callServer(Channel channel, T commonReq, Class respClass) {
        final String traceId = commonReq.getTraceId();
        final long requestTime = System.currentTimeMillis();

        RpcMessageDto rpcMessageDto = new RpcMessageDto();
        rpcMessageDto.setTraceId(traceId);
        rpcMessageDto.setRequestTime(requestTime);
        rpcMessageDto.setJson(JSON.toJSONString(commonReq));
        rpcMessageDto.setMethodType(commonReq.getMethodType());
        rpcMessageDto.setRequest(true);
        
        //添加调用服务
        invokeService.addRequest(traceId,respTimeoutMills);
        
        //遍历channel
        //关闭当前线程,以获取对应的信息
        //使用序列化方式
        ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
        
        //负载均衡获取channel
        channel.writeAndFlush(byteBuf);
        
        String channelId  = ChannelUtil.getChannelId(channel);
        log.debug("[Client] channelId {} 发送消息 {}",channelId,JSON.toJSON(rpcMessageDto));
        if(respClass == null){
            log.debug("[Client] 当前消息为 one-way 消息,忽略响应");
            return null;
        }else {
            //channelHandler 中获取对应的响应
            RpcMessageDto messageDto = invokeService.getResponse(traceId);
            if(MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())){
                throw new MqException(MqCommonRespCode.TIMEOUT);
            }
            
            String respJson = messageDto.getJson();
            return JSON.parseObject(respJson,respClass);
        }
    }

    @Override
    public Channel getChannel(String key) {
        //等待启动完成
        while (!statusManager.status()){
            if(statusManager.initFailed()){
                log.error("初始化失败");
                throw new MqException(MqCommonRespCode.C_INIT_FAILED);
            }
            log.debug("等待初始化完成...");
            DateUtil.sleep(100);
        }
        RpcChannelFuture rpcChannelFuture = RandomUtils.loadBalance(loadBalance,
                channelFutureList,key);
        return rpcChannelFuture.getChannelFuture().channel();
    }

    @Override
    public void subscribe(String topicName, String tagRegex, String consumerType) {
        final ConsumerSubscribeReq req = new ConsumerSubscribeReq();
        
        String messageId = IdHelper.uuid32();
        req.setTraceId(messageId);
        req.setMethodType(MethodType.C_SUBSCRIBE);
        req.setTopicName(topicName);
        req.setTagRegex(tagRegex);
        req.setGroupName(groupName);
        req.setConsumerType(consumerType);
        
        //重试订阅
        Retryer.newInstance()
                .maxAttempt(subscribeMaxAttempt)
                .callable(new Callable() {
                    @Override
                    public String call() throws Exception {
                        Channel channel = getChannel(null);
                        MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
                        if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
                            throw new MqException(ConsumerRespCode.SUBSCRIBE_FAILED);
                        }
                        return resp.getRespCode();
                    }
                }).retryCall();
    }

    @Override
    public void unSubscribe(String topicName, String tagRegex, String consumerType) {
        final ConsumerUnSubscribeReq req = new ConsumerUnSubscribeReq();

        String messageId = IdHelper.uuid32();
        req.setTraceId(messageId);
        req.setMethodType(MethodType.C_UN_SUBSCRIBE);
        req.setTopicName(topicName);
        req.setTagRegex(tagRegex);
        req.setGroupName(groupName);
        req.setConsumerType(consumerType);

        // 重试取消订阅
        Retryer.newInstance()
                .maxAttempt(unSubscribeMaxAttempt)
                .callable(new Callable() {
                    @Override
                    public String call() throws Exception {
                        Channel channel = getChannel(null);
                        MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
                        if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
                            throw new MqException(ConsumerRespCode.UN_SUBSCRIBE_FAILED);
                        }
                        return resp.getRespCode();
                    }
                }).retryCall();
    }

    @Override
    public MqConsumerPullResp pull(String topicName, String tagRegex, int fetchSize) {
        MqConsumerPullReq req = new MqConsumerPullReq();
        req.setSize(fetchSize);
        req.setGroupName(groupName);
        req.setTagRegex(tagRegex);
        req.setTopicName(topicName);
        
        final String traceId = IdHelper.uuid32();
        req.setTraceId(traceId);
        req.setMethodType(MethodType.C_MESSAGE_PULL);
        Channel channel = getChannel(null);
        return this.callServer(channel,req,MqConsumerPullResp.class);
    }

    @Override
    public void heartbeat() {
        final MqHeartBeatReq req = new MqHeartBeatReq();
        final String traceId = IdHelper.uuid32();
        req.setTraceId(traceId);
        req.setMethodType(MethodType.C_HEARTBEAT);
        req.setAddress(NetUtil.getLocalHost());
        req.setPort(0);
        req.setTime(System.currentTimeMillis());
        
        log.debug("[HEARTBEAT] 往服务端发送心跳包 {}",JSON.toJSON(req));
        
        //通知全部
        for (RpcChannelFuture channelFuture : channelFutureList){
            try{
                Channel channel = channelFuture.getChannelFuture().channel();
                callServer(channel,req,null);
            }catch (Exception exception){
                log.error("[HEARTBEAT] 往服务端处理异常",exception);
                
            }
        }
    }

    @Override
    public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {
        final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq();
        req.setMessageId(messageId);
        req.setMessageStatus(consumerStatus.getCode());
        req.setConsumerGroupName(groupName);
        
        final String traceId = IdHelper.uuid32();
        req.setTraceId(traceId);
        req.setMethodType(MethodType.C_CONSUMER_STATUS);
        
        //重试
        return Retryer.newInstance()
                .maxAttempt(consumerStatusMaxAttempt)
                .callable(new Callable() {
                    @Override
                    public MqCommonResp call() throws Exception {
                        Channel channel = getChannel(null);
                        MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
                        if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
                            throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED);
                        }
                        return resp;
                    }
                }).retryCall();
    }

    @Override
    public MqCommonResp consumerStatusAckBatch(List statusDtoList) {
        final MqConsumerUpdateStatusBatchReq req = new MqConsumerUpdateStatusBatchReq();
        req.setStatusList(statusDtoList);
        
        final String traceId = IdHelper.uuid32();
        req.setTraceId(traceId);
        req.setMethodType(MethodType.C_CONSUMER_STATUS_BATCH);
        
        //重试
        return Retryer.newInstance()
                .maxAttempt(consumerStatusMaxAttempt)
                .callable(new Callable() {
                    @Override
                    public MqCommonResp call() throws Exception {
                        Channel channel = getChannel(null);
                        MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
                        if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
                            throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_BATCH_FAILED);
                        }
                        return resp;
                    }
                }).retryCall();
        
    }

    @Override
    public void destroyAll() {
        for(RpcChannelFuture channelFuture : channelFutureList){
            Channel channel = channelFuture.getChannelFuture().channel();
            final String channelId = ChannelUtil.getChannelId(channel);
            log.info("开始注销:{}",channelId);
            
            ServiceEntry serviceEntry = InnerChannelUtils.buildServiceEntry(channelFuture);
            BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();
            brokerRegisterReq.setServiceEntry(serviceEntry);
            
            String messageId = IdHelper.uuid32();
            brokerRegisterReq.setTraceId(messageId);
            brokerRegisterReq.setMethodType(MethodType.C_UN_REGISTER);
            
            this.callServer(channel,brokerRegisterReq,null);
            
            log.info("完成注销:{}",channelId);
        }
    }

}

生产者

只贴出了核心代码

生产者启动核心类
public class MqProducer extends Thread implements IMqProducer {
    @Override
    public void run() {
        this.paramCheck();

        //启动服务端
        log.info("MQ 生产者开始启动客户端 GROUP:{} brokerAddress:{}",
                groupName,brokerAddress);

        try {
            //0.配置信息
            ProducerBrokerConfig config = ProducerBrokerConfig.newInstance()
                    .groupName(groupName)
                    .brokerAddress(brokerAddress)
                    .check(check)
                    .respTimeoutMills(respTimeoutMills)
                    .invokeService(invokeService)
                    .statusManager(statusManager)
                    .loadBalance(loadBalance)
                    .maxAttempt(maxAttempt)
                    .appKey(appKey)
                    .appSecret(appSecret);


            //1.初始化
            this.producerBrokerService.initChannelFutureList(config);

            //2.连接到服务端
            this.producerBrokerService.registerToBroker();

            //3.标识为可用
            statusManager.status(true);

            //4.添加钩子函数
            final DefaultShutdownHook rpcShutdownHook = new DefaultShutdownHook();
            rpcShutdownHook.setStatusManager(statusManager);
            rpcShutdownHook.setInvokeService(invokeService);
            rpcShutdownHook.setWaitMillsForRemainRequest(waitMillsForRemainRequest);
            rpcShutdownHook.setDestroyable(this.producerBrokerService);
            ShutdownHooks.rpcShutdownHook(rpcShutdownHook);

            log.info("MQ生产者启动完成");

        }catch (Exception e){
            log.error("MQ生产者启动遇到异常",e);
            //设置为初始化失败
            statusManager.initFailed(true);

            throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
        }
    }
}
public class MqProducerHandler extends SimpleChannelInboundHandler {

    private static final Log log = LogFactory.getLog(MqProducerHandler.class);


    /**
     * 调用管理类
     */
    private IInvokeService invokeService;

    public void setInvokeService(IInvokeService invokeService) {
        this.invokeService = invokeService;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;

        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);

        String text = new String(bytes);
        log.debug("[Client] channelId {} 接收到消息 {}", ChannelUtil.getChannelId(ctx),text);

        RpcMessageDto rpcMessageDto = null;
        try {
            rpcMessageDto = JSON.parseObject(bytes,RpcMessageDto.class);
        }catch (Exception e){
            log.error("RpcMessageDto json 格式转换异常{}",JSON.parse(bytes));
            return;
        }
        if(rpcMessageDto.isRequest()){

            //请求类
            final String methodType = rpcMessageDto.getMethodType();
            final String json = rpcMessageDto.getJson();
        }else {
            //丢掉traceId为空的消息
            if(StringUtil.isBlank(rpcMessageDto.getTraceId())){
                log.debug("[Client] response traceId 为空 直接丢弃",JSON.toJSON(rpcMessageDto));
                return;
            }
            invokeService.addResponse(rpcMessageDto.getTraceId(),rpcMessageDto);
            log.debug("[Client] response is :{}",JSON.toJSON(rpcMessageDto));
        }
    }
}
public class ProducerBrokerService implements IProducerBrokerService {    
    @Override
    public void initChannelFutureList(ProducerBrokerConfig config) {
        //1.配置初始化
        this.invokeService = config.invokeService();
        this.check = config.check();
        this.respTimeoutMills = config.respTimeoutMills();
        this.brokerAddress = config.brokerAddress();
        this.groupName = config.groupName();
        this.statusManager = config.statusManager();
        this.loadBalance = config.loadBalance();
        this.maxAttempt = config.maxAttempt();
        this.appKey = config.appKey();
        this.appSecret = config.appSecret();


        //2.初始化
        this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
                initChannelHandler(),check);
    }

    private ChannelHandler initChannelHandler(){
        final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);

        final MqProducerHandler mqProducerHandler = new MqProducerHandler();
        mqProducerHandler.setInvokeService(invokeService);

        //handler实际上会被多次调用,如果不是@Sharealbe,应该每次都重新创建
        ChannelHandler handler = new ChannelInitializer() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline()
                        .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH,delimiterBuf))
                        .addLast(mqProducerHandler);
            }
        };
        return handler;
    }

    @Override
    public void registerToBroker() {
        int successCount = 0;
        for(RpcChannelFuture channelFuture : this.channelFutureList) {
            ServiceEntry serviceEntry = new ServiceEntry();
            serviceEntry.setGroupName(groupName);
            serviceEntry.setAddress(channelFuture.getAddress());
            serviceEntry.setPort(channelFuture.getPort());
            serviceEntry.setWeight(channelFuture.getWeight());

            BrokerRegisterReq brokerRegisterReq = new BrokerRegisterReq();
            brokerRegisterReq.setServiceEntry(serviceEntry);
            brokerRegisterReq.setMethodType(MethodType.P_REGISTER);
            brokerRegisterReq.setTraceId(IdHelper.uuid32());
            brokerRegisterReq.setAppKey(appKey);
            brokerRegisterReq.setAppSecret(appSecret);

            log.info("[Register] 开始注册到 broker:{}", JSON.toJSON(brokerRegisterReq));
            final Channel channel = channelFuture.getChannelFuture().channel();
            MqCommonResp resp = callServer(channel, brokerRegisterReq, MqCommonResp.class);
            log.info("[Register] 完成注册到 broker:{}", JSON.toJSON(resp));

            if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
                successCount++;
            }
        }

        if(successCount <= 0 && check) {
            log.error("校验 broker 可用性,可连接成功数为 0");
            throw new MqException(MqCommonRespCode.P_REGISTER_TO_BROKER_FAILED);
        }
    }

    @Override
    public  R callServer(Channel channel, T commonReq, Class respClass) {
        final String traceId = commonReq.getTraceId();
        final long requestTime = System.currentTimeMillis();

        RpcMessageDto rpcMessageDto = new RpcMessageDto();
        rpcMessageDto.setTraceId(traceId);
        rpcMessageDto.setRequestTime(requestTime);
        rpcMessageDto.setJson(JSON.toJSONString(commonReq));
        rpcMessageDto.setMethodType(commonReq.getMethodType());
        rpcMessageDto.setRequest(true);

        //添加调用服务
        invokeService.addRequest(traceId,respTimeoutMills);

        //遍历channel
        //关闭当前线程以获取对应的信息
        //使用序列化的方式
        ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);

        //负载均衡获取channel
        channel.writeAndFlush(byteBuf);

        String channelId = ChannelUtil.getChannelId(channel);
        log.debug("[Client] channelId {} 发送消息 {}",channelId,JSON.toJSON(rpcMessageDto));

        if(respClass == null){
            log.debug("[Client] 当前消息为 one-way消息,忽略响应");
            return null;
        }else {
            //channelHandler 中获取对应的响应
            RpcMessageDto messageDto = invokeService.getResponse(traceId);
            if(MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())){
                throw new MqException(MqCommonRespCode.TIMEOUT);
            }
            String respJson = messageDto.getJson();
            return JSON.parseObject(respJson,respClass);
        }
    }
}
broker

通过引入 broker,消息的中间人,达到生产者与消费者解耦的效果

broker启动核心类
public class MqBroker extends Thread implements IMqBroker{  
  @Override
    public void run() {
        //启动服务端
        log.info("MQ 中间人开始启动服务端 port :{}",port);

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            final ByteBuf delimiterBuf = DelimiterUtil.getByteBuf(DelimiterUtil.DELIMITER);
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(workerGroup,bossGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new DelimiterBasedFrameDecoder(DelimiterUtil.LENGTH,delimiterBuf))
                                    .addLast(initChannelHandler());
                        }
                    })
                    //这个参数影响的是还没有被accept取出的链接
            .option(ChannelOption.SO_BACKLOG,128)
                    //这个参数只是过一段时间内客户端没有响应,服务端会发送一个ack包,以判断客户端是否还活着
                    .childOption(ChannelOption.SO_KEEPALIVE,true);

            //绑定端口,开始接受进来的链接
            ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
            log.info("MQ 中间人启动完成,监听{"+port+"} 端口");

            channelFuture.channel().closeFuture().syncUninterruptibly();
            log.info("MQ 中间人关闭完成");
        }catch (Exception e){
            log.error("MQ 中间人启动异常",e);
            throw new MqException(BrokerRespCode.RPC_INIT_FAILED);
        }finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}
public class MqBrokerHandler extends SimpleChannelInboundHandler {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);

        RpcMessageDto rpcMessageDto = null;
        try {
            rpcMessageDto = JSON.parseObject(bytes, RpcMessageDto.class);
        } catch (Exception e) {
            log.error("RpcMessageDto json 格式转换异常 {}", new String(bytes));
            return;
        }

        if (rpcMessageDto.isRequest()) {
            MqCommonResp commonResp = this.dispatch(rpcMessageDto,ctx);

            if(commonResp == null){
                log.debug("当前消息为null, 忽略处理。");
                return;
            }
            writeResponse(rpcMessageDto,commonResp,ctx);
        }else {
            final String traceId = rpcMessageDto.getTraceId();

            //丢弃掉traceId 为空的信息
            if(StringUtil.isBlank(traceId)){
                log.debug("[Server Response] response traceId 为空,直接丢弃",JSON.toJSON(rpcMessageDto));
                return;
            }

            //添加消息
            invokeService.addResponse(traceId,rpcMessageDto);
        }
    }

    /**
     * 消息的分发
     *
     * @param rpcMessageDto 入参
     * @param ctx           上下文
     * @return
     */

    private MqCommonResp dispatch(RpcMessageDto rpcMessageDto, ChannelHandlerContext ctx) {
        try {
            final String methodType = rpcMessageDto.getMethodType();
            final String json = rpcMessageDto.getJson();

            String channelId = ChannelUtil.getChannelId(ctx);
            final Channel channel = ctx.channel();
            log.debug("channelId :{} 接收到 method :{} 内容 :{}", channelId, methodType, json);

            // 生产者注册
            if (MethodType.P_REGISTER.equals(methodType)) {
                BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
                if (!brokerRegisterValidService.producerValid(registerReq)) {
                    log.error("{} 生产者注册验证失败", JSON.toJSON(registerReq));
                    throw new MqException(MqBrokerRespCode.P_REGISTER_VALID_FAILED);
                }

                return registerProducerService.register(registerReq.getServiceEntry(), channel);
            }
            // 生产者注销
            if (MethodType.P_UN_REGISTER.equals(methodType)) {
                registerProducerService.checkValid(channelId);

                BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
                return registerProducerService.unRegister(registerReq.getServiceEntry(), channel);
            }
            // 生产者消息发送
            if (MethodType.P_SEND_MSG.equals(methodType)) {
                registerProducerService.checkValid(channelId);

                return handleProducerSendMsg(channelId, json);
            }
            // 生产者消息发送-ONE WAY
            if (MethodType.P_SEND_MSG_ONE_WAY.equals(methodType)) {
                registerProducerService.checkValid(channelId);

                handleProducerSendMsg(channelId, json);

                return null;
            }
            // 生产者消息发送-批量
            if (MethodType.P_SEND_MSG_BATCH.equals(methodType)) {
                registerProducerService.checkValid(channelId);

                return handleProducerSendMsgBatch(channelId, json);
            }
            // 生产者消息发送-ONE WAY-批量
            if (MethodType.P_SEND_MSG_ONE_WAY_BATCH.equals(methodType)) {
                registerProducerService.checkValid(channelId);

                handleProducerSendMsgBatch(channelId, json);

                return null;
            }

            // 消费者注册
            if (MethodType.C_REGISTER.equals(methodType)) {
                BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
                if (!brokerRegisterValidService.consumerValid(registerReq)) {
                    log.error("{} 消费者注册验证失败", JSON.toJSON(registerReq));
                    throw new MqException(MqBrokerRespCode.C_REGISTER_VALID_FAILED);
                }

                return registerConsumerService.register(registerReq.getServiceEntry(), channel);
            }
            // 消费者注销
            if (MethodType.C_UN_REGISTER.equals(methodType)) {
                registerConsumerService.checkValid(channelId);

                BrokerRegisterReq registerReq = JSON.parseObject(json, BrokerRegisterReq.class);
                return registerConsumerService.unRegister(registerReq.getServiceEntry(), channel);
            }
            // 消费者监听注册
            if (MethodType.C_SUBSCRIBE.equals(methodType)) {
                registerConsumerService.checkValid(channelId);

                ConsumerSubscribeReq req = JSON.parseObject(json, ConsumerSubscribeReq.class);
                return registerConsumerService.subscribe(req, channel);
            }
            // 消费者监听注销
            if (MethodType.C_UN_SUBSCRIBE.equals(methodType)) {
                registerConsumerService.checkValid(channelId);

                ConsumerUnSubscribeReq req = JSON.parseObject(json, ConsumerUnSubscribeReq.class);
                return registerConsumerService.unSubscribe(req, channel);
            }
            // 消费者主动 pull
            if (MethodType.C_MESSAGE_PULL.equals(methodType)) {
                registerConsumerService.checkValid(channelId);

                MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class);
                return mqBrokerPersist.pull(req, channel);
            }
            // 消费者心跳
            if (MethodType.C_HEARTBEAT.equals(methodType)) {
                registerConsumerService.checkValid(channelId);

                MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class);
                registerConsumerService.heartbeat(req, channel);
                return null;
            }
            // 消费者消费状态 ACK
            if (MethodType.C_CONSUMER_STATUS.equals(methodType)) {
                registerConsumerService.checkValid(channelId);

                MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class);
                final String messageId = req.getMessageId();
                final String messageStatus = req.getMessageStatus();
                final String consumerGroupName = req.getConsumerGroupName();
                return mqBrokerPersist.updateStatus(messageId, consumerGroupName, messageStatus);
            }
            //消费者消费状态 ACK-批量
            if (MethodType.C_CONSUMER_STATUS_BATCH.equals(methodType)) {
                registerConsumerService.checkValid(channelId);

                MqConsumerUpdateStatusBatchReq req = JSON.parseObject(json, MqConsumerUpdateStatusBatchReq.class);
                final List statusDtoList = req.getStatusList();
                return mqBrokerPersist.updateStatusBatch(statusDtoList);
            }

            log.error("暂时不支持的方法类型 {}", methodType);
            throw new MqException(MqBrokerRespCode.B_NOT_SUPPORT_METHOD);
        } catch (MqException mqException) {
            log.error("业务执行异常", mqException);
            MqCommonResp resp = new MqCommonResp();
            resp.setRespCode(mqException.getCode());
            resp.setRespMessage(mqException.getMsg());
            return resp;
        } catch (Exception exception) {
            log.error("执行异常", exception);
            MqCommonResp resp = new MqCommonResp();
            resp.setRespCode(MqCommonRespCode.FAIL.getCode());
            resp.setRespMessage(MqCommonRespCode.FAIL.getMsg());
            return resp;
        }
    }
    /**
     * 处理生产者发送的消息
     *
     * @param channelId 通道标识
     * @param json 消息体
     */
    private MqCommonResp handleProducerSendMsg(String channelId,String json){
        MqMessage mqMessage = JSON.parseObject(json,MqMessage.class);
        MqMessagePersistPut persistPut = new MqMessagePersistPut();
        persistPut.setMqMessage(mqMessage);
        persistPut.setMessageStatus(MessageStatusConst.WAIT_CONSUMER);
        //构建rpc信息
        final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId);
        persistPut.setRpcAddress(serviceEntry);

        MqCommonResp commonResp = mqBrokerPersist.put(persistPut);
        return commonResp;
    }

    /**
     * 异步处理消息
     * @param put 信息
     */
    private void asyncHandleMessage(MqMessagePersistPut put){
        final MqMessage mqMessage = put.getMqMessage();
        List channelList = registerConsumerService.getPushSubscribeList(mqMessage);
        if(CollectionUtil.isEmpty(channelList)){
            log.info("监听列表为空,忽略处理");
            return;
        }
        BrokerPushContext brokerPushContext = BrokerPushContext.newInstance()
                .channelList(channelList)
                .mqMessagePersistPut(put)
                .mqBrokerPersist(mqBrokerPersist)
                .invokeService(invokeService)
                .respTimeoutMills(respTimeoutMills)
                .pushMaxAttempt(pushMaxAttempt);
        brokerPushService.asyncPush(brokerPushContext);
    }

    /**
     * 处理生产者批量发送消息
     * @param channelId 通道标识
     * @param json 消息体
     */
    private MqCommonResp handleProducerSendMsgBatch(String channelId,String json){
        MqMessageBatchReq batchReq = JSON.parseObject(json,MqMessageBatchReq.class);
        final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId);

        List putList = buildPersistPutList(batchReq,serviceEntry);

        MqCommonResp commonResp = mqBrokerPersist.putBatch(putList);

        //遍历异步推送
        for(MqMessagePersistPut persistPut : putList){
            this.asyncHandleMessage(persistPut);
        }
        return commonResp;
    }

    /**
     * 构建列表
     * @param batchReq 入参
     * @param serviceEntry 实例
     * @return
     */
    private List buildPersistPutList(MqMessageBatchReq batchReq,
                                                          final ServiceEntry serviceEntry){
        List resultList = new ArrayList<>();

        //构建列表
        List messageList = batchReq.getMqMessageList();
        for(MqMessage mqMessage : messageList){
            MqMessagePersistPut put = new MqMessagePersistPut();
            put.setRpcAddress(serviceEntry);
            put.setMessageStatus(MessageStatusConst.WAIT_CONSUMER);
            put.setMqMessage(mqMessage);

            resultList.add(put);
        }
        return resultList;
    }

    /**
     * 结果写回
     * @param req 请求
     * @param resp 响应
     * @param ctx 上下文
     */
    private void writeResponse(RpcMessageDto req,
                               Object resp,
                               ChannelHandlerContext ctx){
        final String id = ctx.channel().id().asLongText();

        RpcMessageDto rpcMessageDto = new RpcMessageDto();
        //响应类消息
        rpcMessageDto.setRequest(false);
        rpcMessageDto.setTraceId(req.getTraceId());
        rpcMessageDto.setMethodType(req.getMethodType());
        rpcMessageDto.setRequestTime(System.currentTimeMillis());
        String json = JSON.toJSONString(resp);
        rpcMessageDto.setJson(json);

        //回写到client端
        ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);
        ctx.writeAndFlush(byteBuf);
        log.debug("[Server] channel {} response {}",id,JSON.toJSON(rpcMessageDto));
    }

}
public class BrokerPushService implements IBrokerPushService {

    private static final Log log = LogFactory.getLog(BrokerPushService.class);

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();


    @Override
    public void asyncPush(final BrokerPushContext context) {
        EXECUTOR_SERVICE.submit(new Runnable() {
            @Override
            public void run() {
                log.info("开始异步处理 {}", JSON.toJSON(context));
                final MqMessagePersistPut persistPut = context.mqMessagePersistPut();
                final MqMessage mqMessage = persistPut.getMqMessage();
                final List channelList = context.channelList();
                final IMqBrokerPersist mqBrokerPersist = context.mqBrokerPersist();
                final IInvokeService invokeService = context.invokeService();
                final long responseTime = context.respTimeoutMills();
                final int pushMaxAttempt = context.pushMaxAttempt();

                //更新状态为处理中
                final String messageId = mqMessage.getTraceId();
                log.info("开始更新消息为处理中 :{}",messageId);

                for(final ChannelGroupNameDto channelGroupNameDto : channelList){
                    final Channel channel = channelGroupNameDto.getChannel();
                    final String consumerGroupName = channelGroupNameDto.getConsumerGroupName();

                    try {
                        mqBrokerPersist.updateStatus(messageId,consumerGroupName, MessageStatusConst.TO_CONSUMER_PROCESS);

                        String channelId = ChannelUtil.getChannelId(channel);

                        log.info("开始处理 channelId :{}",channelId);
                        //1.调用
                        mqMessage.setMethodType(MethodType.B_MESSAGE_PUSH);

                        //重试推送
                        MqConsumerResultResp resultResp = Retryer.newInstance()
                                .maxAttempt(pushMaxAttempt)
                                .callable(new Callable() {
                                    @Override
                                    public MqConsumerResultResp call() throws Exception {
                                        MqConsumerResultResp resp = callServer(channel,mqMessage,
                                                MqConsumerResultResp.class,invokeService,responseTime);

                                        //失败校验
                                        if(resp == null
                                        || !ConsumerStatus.SUCCESS.getCode().equals(resp.getConsumerStatus())){
                                            throw new MqException(BrokerRespCode.MSG_PUSH_FAILED);
                                        }
                                        return resp;
                                    }
                                }).retryCall();
                        //2.更新状态
                        //2.1 处理成功,取push消费状态
                        if(MqCommonRespCode.SUCCESS.getCode().equals(resultResp.getRespCode())){
                            mqBrokerPersist.updateStatus(messageId,consumerGroupName,resultResp.getConsumerStatus());
                        }else {
                            //2.2处理失败
                            log.error("消费失败 :{}",JSON.toJSON(resultResp));
                            mqBrokerPersist.updateStatus(messageId,consumerGroupName,MessageStatusConst.TO_CONSUMER_FAILED);
                        }
                        log.info("完成处理 channelId :{}",channelId);
                    }catch (Exception e){
                        log.error("处理异常");
                        mqBrokerPersist.updateStatus(messageId,consumerGroupName,MessageStatusConst.TO_CONSUMER_FAILED);
                    }
                }
                log.info("完成异步处理");
            }
        });
    }


    /**
     * 调用服务端
     * @param channel 调用通道
     * @param commonRep 通用请求
     * @param respClass 类
     * @param invokeService 调用管理类
     * @param respTimeoutMills 响应超时时间
     * @param  泛型
     * @param  结果
     * @return 结果
     */
    private  R callServer(Channel channel,
                                                                        T commonRep,
                                                                        Class respClass,
                                                                        IInvokeService invokeService,
                                                                        long respTimeoutMills){
        final String traceId = commonRep.getTraceId();
        final long requestTime = System.currentTimeMillis();

        RpcMessageDto rpcMessageDto = new RpcMessageDto();
        rpcMessageDto.setTraceId(traceId);
        rpcMessageDto.setRequestTime(requestTime);
        rpcMessageDto.setJson(JSON.toJSONString(commonRep));
        rpcMessageDto.setMethodType(commonRep.getMethodType());
        rpcMessageDto.setRequest(true);


        //添加调用服务
        invokeService.addRequest(traceId,respTimeoutMills);

        //遍历channel
        //关闭当前线程,以获取对应的信息
        //使用序列化方式
        ByteBuf byteBuf = DelimiterUtil.getMessageDelimiterBuffer(rpcMessageDto);

        //负载均衡获取channel
        channel.writeAndFlush(byteBuf);

        String channelId = ChannelUtil.getChannelId(channel);
        log.debug("[Client] channelId {} 发送消息 {}",channelId,JSON.toJSON(rpcMessageDto));
        if(respClass == null){
            log.debug("[Client] 当前消息为 one-way消息,忽略响应");
            return null;
        }else {
            //channelHandler 中获取对应的响应
            RpcMessageDto messageDto = invokeService.getResponse(traceId);
            if(MqCommonRespCode.TIMEOUT.getCode().equals(messageDto.getRespCode())){
                throw new MqException(MqCommonRespCode.TIMEOUT);
            }

            String respJson = messageDto.getJson();
            return JSON.parseObject(respJson,respClass);
        }

    }
}
消费者处理默认实现

getSubscribeList 的逻辑可能稍微复杂点,其实就是消息过来,找到匹配的订阅消费者而已。

因为同一个 groupName 的消费者消息只消费一次,所以需要一次分组。

public class LocalBrokerConsumerService implements IBrokerConsumerService {


    public LocalBrokerConsumerService(){
        //120s扫描一次
        final long limitMills = 2*60*1000;

        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                for(Map.Entry entry : heartbeatMap.entrySet()){
                    String key = entry.getKey();
                    long lastAccessTime  = entry.getValue().getLastAccessTime();
                    long currentTime = System.currentTimeMillis();
                    if(currentTime - lastAccessTime > limitMills){
                        removeByChannelId(key);
                    }
                }
            }
        },2*60,2*60, TimeUnit.SECONDS);
    }

    /**
     * 根据channelId 移除信息
     * @param channelId
     */
    private void removeByChannelId(final String channelId){
        BrokerServiceEntryChannel channelRegister = registerMap.remove(channelId);
        log.info("移除注册信息 id :{},channel :{}",channelId, JSON.toJSON(channelId));
        BrokerServiceEntryChannel channelHeartBeat = heartbeatMap.remove(channelId);
        log.info("移除心跳信息 id :{},channel :{}",channelId,JSON.toJSON(channelHeartBeat));
    }

    @Override
    public void loadBalance(ILoadBalance loadBalance) {
        this.loadBalance = loadBalance;
    }

    @Override
    public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) {
        final String channelId = ChannelUtil.getChannelId(channel);
        BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);
        registerMap.put(channelId, entryChannel);

        entryChannel.setLastAccessTime(System.currentTimeMillis());
        heartbeatMap.put(channelId, entryChannel);

        MqCommonResp resp = new MqCommonResp();
        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return resp;
    }

    @Override
    public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) {
        final String channelId = ChannelUtil.getChannelId(channel);
        removeByChannelId(channelId);

        MqCommonResp resp = new MqCommonResp();
        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return resp;
    }

    @Override
    public MqCommonResp subscribe(ConsumerSubscribeReq serviceEntry, Channel clientChannel) {
        final String channelId = ChannelUtil.getChannelId(clientChannel);
        final String topicName = serviceEntry.getTopicName();

        final String consumerType = serviceEntry.getConsumerType();
        Map> subscribeMap = getSubscribeMapByConsumerType(consumerType);

        ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo();
        subscribeBo.setChannelId(channelId);
        subscribeBo.setGroupName(serviceEntry.getGroupName());
        subscribeBo.setTopicName(topicName);
        subscribeBo.setTagRegex(serviceEntry.getTagRegex());

        //放入集合
        MapUtil.putToSetMap(subscribeMap,topicName,subscribeBo);

        MqCommonResp resp = new MqCommonResp();
        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());

        return resp;

    }

    private Map> getSubscribeMapByConsumerType(String consumerType){
        return pushSubscribeMap;
    }


    @Override
    public MqCommonResp unSubscribe(ConsumerUnSubscribeReq serviceEntry, Channel clientChannel) {
        final String channelId = ChannelUtil.getChannelId(clientChannel);
        final String topicName = serviceEntry.getTopicName();
        final String consumerType = serviceEntry.getConsumerType();
        Map> subscribeMap = getSubscribeMapByConsumerType(consumerType);

        ConsumerSubscribeBo subscribeBo = new ConsumerSubscribeBo();
        subscribeBo.setChannelId(channelId);
        subscribeBo.setGroupName(serviceEntry.getGroupName());
        subscribeBo.setTopicName(topicName);
        subscribeBo.setTagRegex(serviceEntry.getTagRegex());


        //集合
        Set set = subscribeMap.get(topicName);
        if(CollectionUtil.isNotEmpty(set)){
            set.remove(subscribeBo);
        }

        MqCommonResp resp = new MqCommonResp();
        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return resp;
    }


    @Override
    public List getPushSubscribeList(MqMessage mqMessage) {
        final String topicName = mqMessage.getTopic();
        Set set = pushSubscribeMap.get(topicName);
        if(CollectionUtil.isEmpty(set)){
            return Collections.emptyList();
        }


        //2.获取匹配的tag列表
        final List tagNameList = mqMessage.getTags();

        Map> groupMap = new HashMap<>();
        for(ConsumerSubscribeBo bo:set){
            String tagRegex = bo.getTagRegex();

            if(RegexUtil.hasMatch(tagNameList,tagRegex)){
                String groupName = bo.getGroupName();

                MapUtil.putToListMap(groupMap,groupName,bo);
            }
        }

        //3.按照groupName分组之后,每一组只随机返回一个,最好应该调整为以shardingkey 选择
        final String shardingKey = mqMessage.getShardingKey();
        List channelGroupNameList = new ArrayList<>();

        for(Map.Entry> entry : groupMap.entrySet()){
            List list = entry.getValue();

            ConsumerSubscribeBo bo = RandomUtils.loadBalance(loadBalance,list,shardingKey);
            final String channelId = bo.getChannelId();
            BrokerServiceEntryChannel entryChannel = registerMap.get(channelId);
            if(entryChannel == null){
                log.warn("channelId :{} 对应的通道信息为空",channelId);
                continue;
            }

            final String groupName = entry.getKey();
            ChannelGroupNameDto channelGroupNameDto = ChannelGroupNameDto.of(groupName, entryChannel.getChannel());
            channelGroupNameList.add(channelGroupNameDto);

        }
        return channelGroupNameList;
    }

    @Override
    public void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) {
        final String channelId = ChannelUtil.getChannelId(channel);
        log.info("[HEARTBEAT] 接受消费者心跳 {}, channelId :{}",
                JSON.toJSON(mqHeartBeatReq),channelId);
    }

    @Override
    public void checkValid(String channelId) {
        if(!registerMap.containsKey(channelId)){
            log.error("channelId :{} 未注册",channelId);
            throw new MqException(MqBrokerRespCode.C_REGISTER_CHANNEL_NOT_VALID);
        }
    }

}
生产者处理默认实现
/**
 * 生产者注册服务类
 */
public class LocalBrokerProducerService implements IBrokerProducerService {

    private static final Log log = LogFactory.getLog(LocalBrokerProducerService.class);

    private final Map registerMap = new ConcurrentHashMap<>();

    @Override
    public MqCommonResp register(ServiceEntry serviceEntry, Channel channel) {
        final String channelId = ChannelUtil.getChannelId(channel);
        BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry,channel);
        registerMap.put(channelId,entryChannel);

        MqCommonResp resp = new MqCommonResp();
        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return resp;
    }

    @Override
    public MqCommonResp unRegister(ServiceEntry serviceEntry, Channel channel) {
        final String channelId = ChannelUtil.getChannelId(channel);
        registerMap.remove(channelId);

        MqCommonResp resp = new MqCommonResp();
        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return resp;
    }

    @Override
    public ServiceEntry getServiceEntry(String channelId) {
        return registerMap.get(channelId);
    }

    @Override
    public void checkValid(String channelId) {
        if(!registerMap.containsKey(channelId)){
            log.error("channelId :{} 未注册",channelId);
            throw new MqException(MqBrokerRespCode.P_REGISTER_CHANNEL_NOT_VALID);
        }
    }
}
调用管理类
/**
 * 调用服务接口
 */
public class InvokeService implements IInvokeService {

    private static final Log logger = LogFactory.getLog(IInvokeService.class);

    /**
     * 请求序列号map
     * (1)这里后期如果要添加超时检测,可以添加对应的超时时间
     * 可以把这里调整为map
     * 

* key:seqId 唯一标识一个请求 * value: 存入该请求最厂的有效时间,用于定时删除和超时判断 */ private final ConcurrentHashMap requestMap; /** * 响应结果 */ private final ConcurrentHashMap responseMap; public InvokeService() { this.requestMap = new ConcurrentHashMap<>(); this.responseMap = new ConcurrentHashMap<>(); final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap); Executors.newScheduledThreadPool(1).scheduleAtFixedRate(timeoutThread, 60, 60, TimeUnit.SECONDS); } @Override public IInvokeService addRequest(String seqId, long timeoutMills) { logger.debug("[Invoke] start add request for seqId :{}, timeoutMills:{}", seqId, timeoutMills); final long expireTime = System.currentTimeMillis() + timeoutMills; requestMap.putIfAbsent(seqId, expireTime); return this; } @Override public IInvokeService addResponse(String seqId, RpcMessageDto rpcResponse) { //1.判断是否有效 Long expireTime = this.requestMap.get(seqId); //如果为空,可能已经超时,被定时job移除之后,响应结果才过来,直接忽略 if (ObjectUtil.isNull(expireTime)) return this; //2.判断是否超时 if (System.currentTimeMillis() > expireTime) { logger.debug("[Invoke] seqId :{} 信息已超时,直接返回超时结果"); rpcResponse = RpcMessageDto.timeout(); } //这里放入之前可以添加判断 //如果seqId 必须处理请求集合中,才允许放入。或者直接忽略丢弃 //通知所有等待方 responseMap.putIfAbsent(seqId, rpcResponse); logger.debug("[Invoke] 获取结果信息,seqId:{},rpcResponse:{}", seqId, rpcResponse); logger.debug("[Invoke] seqId:{} 信息已放入,通知所有等待方", seqId); //移除对应的requestMap requestMap.remove(seqId); logger.debug("[Invoke] seqId:{} remove from request map,", seqId); //同步锁 synchronized (this) { this.notifyAll(); logger.debug("[Invoke] {} notifyAll()", seqId); } return this; } @Override public RpcMessageDto getResponse(String seqId) { try { RpcMessageDto rpcResponse = this.responseMap.get(seqId); if (ObjectUtil.isNotNull(rpcResponse)) { logger.debug("[Invoke] seq {} 对应结果已经获取: {}", seqId, rpcResponse); return rpcResponse; } //进入等待 while (rpcResponse == null) { logger.debug("[Invoke] seq {} 对应结果为空,进入等待", seqId); //同步等待锁 synchronized (this) { this.wait(); } logger.debug("[Invoke] {} wait has notified!", seqId); rpcResponse = this.responseMap.get(seqId); logger.debug("[Invoke] seq {} 对应结果已经获取 : {}", seqId, rpcResponse); } return rpcResponse; } catch (InterruptedException ex) { logger.error("获取响应异常", ex); throw new MqException(MqCommonRespCode.RPC_GET_RESP_FAILED); } } @Override public boolean remainRequest () { return this.requestMap.size() > 0; } }

这里 getResponse 获取不到会进入等待,直到 addResponse 唤醒。

但是问题是如果请求的响应丢失了怎么办?可以通过添加超时检测线程解决。

超时检测线程
/**
 * 超时检测线程
 */
public class TimeoutCheckThread implements Runnable {

    /**
     * 请求信息
     */
    private final ConcurrentHashMap requestMap;

    /**
     * 请求信息
     */
    private final ConcurrentHashMap responseMap;

    /**
     * 新建
     * @param requestMap 请求Map
     * @param responseMap 结果map
     */
    public TimeoutCheckThread(ConcurrentHashMap requestMap,
                              ConcurrentHashMap responseMap){
        ArgUtil.notNull(requestMap,"requestMap");
        this.requestMap = requestMap;
        this.responseMap = responseMap;
    }

    @Override
    public void run() {
        for(Map.Entry entry:requestMap.entrySet()){
            long expireTime = entry.getValue();
            long currentTime = System.currentTimeMillis();

            if(currentTime>expireTime){
                final String key = entry.getKey();
                //结果设置为超时,从请求map中一处
                responseMap.putIfAbsent(key,RpcMessageDto.timeout());
                requestMap.remove(key);
            }
        }
    }
}
解决netty粘包问题

通过指定分隔符解决

public class DelimiterUtil {

    private DelimiterUtil(){}


    /**
     * 分隔符
     */
    public static final String DELIMITER = "~!@#$%^&*";

    /**
     * 长度
     *
     * 这个长度是必须的,避免把缓冲区打爆
     */
    public static final int LENGTH = 65535;

    /**
     * 分隔符buffer
     */
    public static final ByteBuf DELIMITER_BUF = Unpooled.copiedBuffer(DELIMITER.getBytes());

    /**
     * 获取对应的字节缓存
     * @param text 文本
     * @return
     */
    public static ByteBuf getByteBuf(String text){
        return Unpooled.copiedBuffer(text.getBytes());
    }

    /**
     * 获取消息
     * @param rpcMessageDto 消息体
     * @return
     */
    public static ByteBuf getMessageDelimiterBuffer(RpcMessageDto rpcMessageDto){
        String json = JSON.toJSONString(rpcMessageDto);
        String jsonDelimiter = json+DELIMITER;

        return Unpooled.copiedBuffer(jsonDelimiter.getBytes());
    }


}
消息持久化

这里只是通过map持久化进行简化实现,还可以通过springboot + mysql实现。

/**
 * 本地持久化策略
 */
public class LocalMqBrokerPersist implements IMqBrokerPersist {

    private static final Log log = LogFactory.getLog(LocalMqBrokerPersist.class);

    /**
     * 队列
     * ps:这里只是简化实现,暂时不考虑并发等问题
     */
    private final Map> map = new ConcurrentHashMap<>();

    //1.接收
    //2.持久化
    //3.通知消费


    @Override
    public synchronized MqCommonResp put(MqMessagePersistPut mqMessage) {
        this.doPut(mqMessage);

        MqCommonResp commonResp = new MqCommonResp();
        commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return commonResp;
    }

    private void doPut(MqMessagePersistPut put){
        log.info("put elem :{}", JSON.toJSON(put));

        MqMessage mqMessage = put.getMqMessage();
        final String topic = mqMessage.getTopic();

        //放入元素
        MapUtil.putToListMap(map,topic,put);
    }

    @Override
    public MqCommonResp putBatch(List putList) {
        //构建列表
        for(MqMessagePersistPut put:putList){
            this.doPut(put);
        }

        MqCommonResp commonResp = new MqCommonResp();
        commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        return commonResp;
    }

    @Override
    public MqCommonResp updateStatus(String messageId, String consumerGroupName, String status) {
        //这里性能较差,所以不可以用于生产,仅作为测试验证
        this.doUpdateStatus(messageId,consumerGroupName,status);

        MqCommonResp commonResp = new MqCommonResp();
        commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return commonResp;
    }

    private void doUpdateStatus(String messageId, String consumerGroupName,String status){
        //这里性能比较差,所以不可以用于生产。仅作为测试验证
        for(List list : map.values()){
            for(MqMessagePersistPut put : list){
                MqMessage mqMessage = put.getMqMessage();
                if(mqMessage.getTraceId().equals(messageId)){
                    put.setMessageStatus(status);

                    break;
                }
            }
        }
    }

    @Override
    public MqCommonResp updateStatusBatch(List statusDtoList) {
        for(MqConsumerUpdateStatusDto statusDto : statusDtoList){
            this.doUpdateStatus(statusDto.getMessageId(),statusDto.getConsumerGroupName(),
                    statusDto.getMessageStatus());
        }

        MqCommonResp commonResp = new MqCommonResp();
        commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        return commonResp;
    }

    @Override
    public MqConsumerPullResp pull(MqConsumerPullReq pull, Channel channel) {
        //1.拉取匹配的信息
        //2.状态更新为代理中
        //3.如何更新对应的消费状态呢?

        //获取状态为W的订单
        final int fetchSize = pull.getSize();
        final String topic = pull.getTopicName();
        final String tagRegex = pull.getTagRegex();

        List resultList = new ArrayList<>(fetchSize);
        List putList = map.get(topic);
        //性能较差
        if(CollectionUtil.isNotEmpty(putList)){
            for(MqMessagePersistPut put : putList){
                if(!isEnableStatus(put)){
                    continue;
                }

                final MqMessage  mqMessage = put.getMqMessage();
                List tagList = mqMessage.getTags();
                if(RegexUtil.hasMatch(tagList,tagRegex)){
                    //设置为处理中
                    //TODO:消息的最终状态什么时候更新呢?
                    //可以给broker一个ACK

                    put.setMessageStatus(MessageStatusConst.TO_CONSUMER_PROCESS);
                    resultList.add(mqMessage);
                }
                if (resultList.size() >= fetchSize){
                    break;
                }
            }
        }

        MqConsumerPullResp resp = new MqConsumerPullResp();
        resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
        resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
        resp.setList(resultList);
        return resp;
    }


    private boolean isEnableStatus(final MqMessagePersistPut persistPut){
        final String status = persistPut.getMessageStatus();

        //数据库可以设计一个字段,比如待消费时间,进行排序
        //这里只是简化时间,仅用于测试
        List statusList = Arrays.asList(MessageStatusConst.WAIT_CONSUMER,MessageStatusConst.CONSUMER_LATER);
        return statusList.contains(status);
    }
}
实现优雅关闭
public class DefaultShutdownHook extends AbstractShutdownHook { 
  /**
     * (1)设置status 状态为关闭
     * (2)查看是否{@link IInvokeService#remainRequest()}包含请求
     * (3)超时检测-可以不添加,如果难以关闭成功,直接强制关闭即可
     * (4)关闭所有线程池资源信息
     * (5)设置状态为成功关闭
     */
    @Override
    protected void doHook() {
        //设置状态为等待关闭
        statusManager.status(false);
        logger.info("[Shutdown] set status to wait for shutdown.");

        //循环等待当前执行的请求执行完成
        long startMills = System.currentTimeMillis();
        while (invokeService.remainRequest()){
            long currentMills = System.currentTimeMillis();
            long costMills = currentMills - startMills;
            if(costMills >= waitMillsForRemainRequest){
                logger.warn("[Shutdown] still remains request, but timeout, break.");
                break;
            }
            logger.debug("[Shutdown] still remains request,wait for a while");
            DateUtil.sleep(100);
        }

        //销毁
        destroyable.destroyAll();

        //设置状态为关闭成功
        statusManager.status(false);
        logger.info("[Shutdown] set status to shutdown success");

    }
}
心跳检测 心跳实现

心跳可以是一个很简单的消息体。

@Override
public void heartbeat() {
    final MqHeartBeatReq req = new MqHeartBeatReq();
    final String traceId = IdHelper.uuid32();
    req.setTraceId(traceId);
    req.setMethodType(MethodType.C_HEARTBEAT);
    req.setAddress(NetUtil.getLocalHost());
    req.setPort(0);
    req.setTime(System.currentTimeMillis());

    log.debug("[HEARTBEAT] 往服务端发送心跳包 {}", JSON.toJSON(req));

    // 通知全部
    for(RpcChannelFuture channelFuture : channelFutureList) {
        try {
            Channel channel = channelFuture.getChannelFuture().channel();
            callServer(channel, req, null);
        } catch (Exception exception) {
            log.error("[HEARTBEAT] 往服务端处理异常", exception);
        }
    }
}

消费者把心跳通知所有的 broker.

心跳的定时执行

我们启动一个定时任务,5S 钟执行一次。

/**
 * 初始化心跳
 * @since 0.0.6
 */
private void initHeartbeat() {
    //5S 发一次心跳
    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            heartbeat();
        }
    }, 5, 5, TimeUnit.SECONDS);
}

心跳是在连接到 broker 之后就开始启动:

@Override
public void initChannelFutureList(ConsumerBrokerConfig config) {
    //1. 配置初始化
    //...

    //2. 初始化
    this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
            initChannelHandler(), check);

    //3. 初始化心跳
    this.initHeartbeat();
}
Broker 实现

消费者定时发送消息,生产者肯定是需要接受的。

接收心跳

为了简单,我们让心跳是 ONE-WAY 的。

// 消费者心跳
if(MethodType.C_HEARTBEAT.equals(methodType)) {
    MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class);
    registerConsumerService.heartbeat(req, channel);
    return null;
}
hearbeat 处理

每次收到消息,我们把请求的 channelId 记录下来,并保存最新的访问时间

@Override
public void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) {
    final String channelId = ChannelUtil.getChannelId(channel);
    log.info("[HEARTBEAT] 接收消费者心跳 {}, channelId: {}",
            JSON.toJSON(mqHeartBeatReq), channelId);

    ServiceEntry serviceEntry = new ServiceEntry();
    serviceEntry.setAddress(mqHeartBeatReq.getAddress());
    serviceEntry.setPort(mqHeartBeatReq.getPort());

    BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);
    entryChannel.setLastAccessTime(mqHeartBeatReq.getTime());
    heartbeatMap.put(channelId, entryChannel);
}
移除消费者

如果一些消费者长时间没有心跳,我们就认为服务已经挂了。

在 LocalBrokerConsumerService 服务启动的时候,同时启用一个定时清理任务。

public LocalBrokerConsumerService() {
    //120S 扫描一次
    final long limitMills = 2 * 60 * 1000;

    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            for(Map.Entry entry : heartbeatMap.entrySet()) {
                String key  = entry.getKey();
                long lastAccessTime = entry.getValue().getLastAccessTime();
                long currentTime = System.currentTimeMillis();
                if(currentTime - lastAccessTime > limitMills) {
                    removeByChannelId(key);
                }
            }
        }
    }, 2 * 60, 2 * 60, TimeUnit.SECONDS);
}

这个任务 2min 执行一次,如果 2min 都没有心跳,这移除对应的消费者。

负载均衡

负载均衡

之前发的博客中有写过,这里直接调接口就行。此处不再详细介绍。

MQ 中用到负载均衡的地方 生产者发送

生产者发送消息时,可以发送给任一 broker。

broker 推送给消费者

broker 接收到消息以后,在推送给消费者时,也可以任一选择一个。

消费者的消费 ACK

消费者消费完,状态回执给 broker,可以选择任一一个。

消息黏连

有些消息比较特殊,比如需要保证消费的有序性,可以通过 shardingKey 的方式,在负载的时候固定到指定的片区。

注册鉴权

我们前面实现了 mq 的基本功能,不过还是存在一个问题。

那就是 mq 没有进行鉴权。

这就会导致如果部署在公网,任何一个机器都可以连接我们的服务,这显然是不合理的。

作者使用的是通过设置用户密码的方式,在生产者注册到中间人的时候,添加这两个属性。消费者也是同理,在发送消息,推送消息,拉取消息时,进行账号密码的验证。十分简单。

同时也可以通过token的方式实现鉴权,更加的安全,不过实现可能复杂一些。

总结一些项目特性如下
  •  check broker 启动检测

  •  关闭时通知 register center

  •  优雅关闭添加超时设置

  •  heartbeat 心跳检测机制

  •  完善 load-balance 实现 + shardingkey 粘性消费、请求

  •  失败重试的拓展

  •  消费者 pull 策略实现

  •  pull 消息消费的 ACK 处理

  •  broker springboot 实现

  •  消息的 ack 处理,要基于 groupName 进行处理

  •  消息的回溯消费 offset

  •  消息的批量发送,批量 ACK

  •  添加注册鉴权,保证安全性

后续更新
  •  顺序消息

  •  事务消息

  •  定时消息

  •  流量控制 back-press 反压

  •  消息可靠性

  •  offline message 离线消息

  •  dead message 死信队列

  •  断线重连

项目地址:Zzzzs1/MQ-demo: java实现mq (github.com)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/916554.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存