上一篇文章中,本地调试,发送的消息类型是SEND_MESSAGE_V2
broker和nameSrv一样,都是同个NettyRemotingServer来监听连接
在接收到消息之后,通过serverHandler处理
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pairmatched = this.processorTable.get(cmd.getCode()); // DefaultRequestProcessor final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); log.info("处理消息:[{}]{}", opaque, cmd); ... }
可以看到,当matched为空的时候,会用defaultRequestProcessor去处理,来看看broker的processorTable处理逻辑
在BrokerController.initialize()方法中调用了 this.registerProcessor();
public void registerProcessor() { SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this); replyMessageProcessor.registerSendMessageHook(sendMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor); ClientManageProcessor clientProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor); ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.UPDATe_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor); AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor); }
@Override public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pairpair = new Pair (processor, executorThis); this.processorTable.put(requestCode, pair); }
通过registerProcessor()方法,将requestCode和processor保存在processorTable中,
SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
可以看到SEND_MESSAGE_V2对应的SendMessageProcessor来处理
SendMessageProcessor.processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { RemotingCommand response = null; try { response = asyncProcessRequest(ctx, request).get(); } catch (InterruptedException | ExecutionException e) { log.error("process SendMessage error, request : " + request.toString(), e); } return response; }
public CompletableFutureasyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return CompletableFuture.completedFuture(null); } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); } } }
private CompletableFutureasyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { final RemotingCommand response = preSend(ctx, request, requestHeader); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); if (response.getCode() != -1) { return CompletableFuture.completedFuture(response); } final byte[] body = request.getBody(); int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); // 构造msgInner ... if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message. // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it. String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); } else { msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); } CompletableFuture putMessageResult = null; String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (transFlag != null && Boolean.parseBoolean(transFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return CompletableFuture.completedFuture(response); } putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { // 调用messageStore保存 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); }
brokerController中messageStore默认为DefaultMessageStore
DefaultMessageStore.asyncPutMessage
public CompletableFutureasyncPutMessage(MessageExtBrokerInner msg) { PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } PutMessageStatus msgCheckStatus = this.checkMessage(msg); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } long beginTime = this.getSystemClock().now(); // 调用commitLog保存 CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); putResultFuture.thenAccept((result) -> { long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } }); return putResultFuture; }
到这里,broker已经接收到producer生产的消息了
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)