-
当topic数量增多到100+时,kafka的单个broker的TPS降低了1个数量级,而RocketMQ在海量topic的场景下,依然保持较高的TPS?
kafka如果是topic比较少tps会有上万的TPS,但是topic比较多就会下降到1~2千的TPS。但是 Rocket在topic比较多,就算上万个topic也能保持很高的TPS
-
CommitLog的”随机读”对性能的影响?
RocketMQ是基于文件存储,所有消息的本体都保存在Commitlog上,消息的生产是顺序写,效率很高,但是消费的时候是基于主题的,一个主题的消息随机分布式在Commitlog上,所以这个是是随机读,这个对RocketMQ有什么影响?
Kafka 中文件的布局是以 Topic/partition ,每一个分区一个物理文件夹,在分区文件级别实现文件顺序写,如果一个Kafka集群中拥有成百上千个主题,每一个主题拥有上百个分区,消息在高并发写入时,其IO *** 作就会显得零散(消息分散的落盘策略会导致磁盘IO竞争激烈成为瓶颈),其 *** 作相当于随机IO,即 Kafka在消息写入时的IO性能会随着 topic 、分区数量的增长,其写入性能会先上升,然后下降。
而 RocketMQ在消息写入时追求极致的顺序写,所有的消息不分主题一律顺序写入 commitlog 文件,并不会随着 topic 和 分区数量的增加而影响其顺序性。
在消息发送端,消费端共存的场景下,随着Topic数的增加Kafka吞吐量会急剧下降,而RocketMQ则表现稳定。因此Kafka适合Topic和消费端都比较少的业务场景(少时候吞吐量更大),而RocketMQ更适合多Topic,多消费端的业务场景(做平台工具更加适用,建的topic很多,华为就是使用的MQS(RocketMQ的包装版))。
CommitLog之Message格式 Store架构设计之消息发送 整个存储设计层次非常清晰,大致的层次如下图:
-
业务层,也可以称之为网络层,就是收到消息之后,一般交给SendMessageProcessor来分配(交给哪个业务来处理)
-
DefaultMessageStore,这个是存储层最核心的入口。
-
另外还有一个重要的是CommitLog.
以上就是三个核心类。
Store层处理消息的入口 这个存储层处理消息的过程就是一次RPC请求,所以我们找入口。当然还是由Broker启动
// org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException { //todo 加载Broker中的主题信息 json boolean result = this.topicConfigManager.load(); //todo 加载消费进度 result = result && this.consumerOffsetManager.load(); //todo 加载订阅信息 result = result && this.subscriptionGroupManager.load(); //todo 加载订消费者过滤信息 result = result && this.consumerFilterManager.load(); if (result) { try { //创建消息存储管理组件 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); //如果开启了多副本机制,会为集群节点选主器添加roleChangeHandler事件处理器,即节点发送变更后的事件处理器。 if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()) .getdLedgerServer().getdLedgerLeaderElector() .addRoleChangeHandler(roleChangeHandler); } ... } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } result = result && this.messageStore.load(); if (result) { ... this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_")); //注册各种处理消息请求 this.registerProcessor(); // here ... } return result; } public void registerProcessor() { //todo 收到消息之后的核心处理 SendMessageProcessor sendProcessor = new SendMessageProcessor(this); sendProcessor.registerSendMessageHook(sendMessageHookList); sendProcessor.registerConsumeMessageHook(consumeMessageHookList); //注册一系列的任务请求 //todo 注册消息发送 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor); } public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) { ExecutorService executorThis = executor; if (null == executor) { executorThis = this.publicExecutor; } Pairpair = new Pair (processor, executorThis); this.processorTable.put(requestCode, pair); }
// org.apache.rocketmq.broker.BrokerController#initialize public boolean initialize() throws CloneNotSupportedException { if (result) { //构建netty服务端 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); //todo 这里构建出专门处理消息发送的线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_")); //注册各种处理消息请求 this.registerProcessor(); } ... }SendMessageProcessor.processRequest
RocketMQ使用Netty处理网络,框架收到请求的处理就会进入processRequest
// SendMessageProcessor#processRequest //todo 收到请求的处理 @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { RemotingCommand response = null; try { response = asyncProcessRequest(ctx, request).get(); } catch (InterruptedException | ExecutionException e) { log.error("process SendMessage error, request : " + request.toString(), e); } return response; } // SendMessageProcessor#asyncProcessRequest() public CompletableFutureDefaultMessageStore.asyncPutMessageasyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) { final SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return CompletableFuture.completedFuture(null); } //todo 这部分是实现消息轨迹(还没有实现好) mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); if (requestHeader.isBatch()) { //todo 批量消息 return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { //todo 非批量消息(一般处理) return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader); // here } } } // private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { //todo 在发送前的处理,标识RPC的SeqNumber,检查队列的读写权限、自动创建Topic等等 final RemotingCommand response = preSend(ctx, request, requestHeader); // here final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); if (response.getCode() != -1) { return CompletableFuture.completedFuture(response); } ... //todo 如果没有指定QueueID,系统随机指定一个 if (queueIdInt < 0) { queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); } //todo 构造Broker内部使用的Message(包装一层) MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); //主题 msgInner.setQueueId(queueIdInt); //queueId //todo 死信消息的处理逻辑--如果是消费重试次数达到上线,就会进入死信队列 if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return CompletableFuture.completedFuture(response); } ... if (transFlag != null && Boolean.parseBoolean(transFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { return CompletableFuture.completedFuture(response); } //todo 事务消息的Prepare消息 putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { //todo 处理普通消息,和事务消息的Commit/Rollback消息 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); }
// org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage public CompletableFutureCommitLog. asyncPutMessageasyncPutMessage(MessageExtBrokerInner msg) { PutMessageStatus checkStoreStatus = this.checkStoreStatus(); if (checkStoreStatus != PutMessageStatus.PUT_OK) { return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null)); } PutMessageStatus msgCheckStatus = this.checkMessage(msg); if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null)); } long beginTime = this.getSystemClock().now(); //这里会进入commitLog的消息处理逻辑 CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg); // here putResultFuture.thenAccept((result) -> { long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } }); return putResultFuture; }
// org.apache.rocketmq.store.CommitLog#asyncPutMessage public CompletableFutureasyncPutMessage(final MessageExtBrokerInner msg) { //Message存储到Broker的时间 msg.setStoreTimestamp(System.currentTimeMillis()); //Message body的crc校验码,防止消息内容被篡改和破坏 msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; //存储耗时相关的信息,可以收集这些指标,上报给监控系统 StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); //处理延时消息(定时消息) final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore. getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } ... } } long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; //获取最近的一个CommitLog文件的内容映射文件(零拷贝) MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //todo putMessgae会有多个线程并行处理,需要上锁,可以在broker中配置是可重入锁还是自旋锁 //默认是false,使用自旋锁。异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; //拿到锁之后,再设置一次,这样可以做到全局有序 msg.setStoreTimestamp(beginLockTimestamp); //最近的Commtlog文件写满了,再创建一个新的。 if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } //todo 3.1把Broker内部的这个Message刷新到MappedFile的内存(注意,这个时候还没有刷盘) result = mappedFile.appendMessage(msg, this.appendMessageCallback); } }
存储到MappedFileQueue的MappedFile
//todo 3.1把Broker内部的这个Message刷新到MappedFile的内存(注意,这个时候还没有刷盘) result = mappedFile.appendMessage(msg, this.appendMessageCallback); // org.apache.rocketmq.store.MappedFile#appendMessagesInner public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; //当前这个MaapedFile的写入位置 int currentPos = this.wrotePosition.get(); // here if (currentPos < this.fileSize) { //异步输盘时还有两种刷盘模式可以选择 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { //todo 非批量处理 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } // 这里就不详细讲了,无非就是数据的一些格式处理。
同步刷盘:GroupCommitService(独立的线程)
刷盘是在commitlog的构造方法中就启动了独立的线程处理
// org.apache.rocketmq.store.CommitLog#CommitLog public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), ... this.defaultMessageStore = defaultMessageStore; //todo 同步刷盘 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this.flushCommitLogService = new GroupCommitService(); } else { //todo 异步刷盘 //FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘 this.flushCommitLogService = new FlushRealTimeService(); } //CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中 this.commitLogService = new CommitRealTimeService(); ... } // CommitLog.GroupCommitService#run public void run() { while (!this.isStopped()) { try { this.waitForRunning(10); this.doCommit(); // here } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } this.doCommit(); // here } // org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { //加锁 synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); //todo 这里因为消息可能跨文件(满了又生成了一个,所以这里要刷两次) for (int i = 0; i < 2 && !flushOK; i++) { //刷盘 CommitLog.this.mappedFileQueue.flush(0); //todo 刷盘指针大于等于提交指针,代表刷盘成功 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } //唤醒发送消息客户端 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } //更新刷盘监测点 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
异步刷盘:CommitRealTimeService/FlushCommitLogService(独立的线程)
// org.apache.rocketmq.store.CommitLog#CommitLog public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), ... this.defaultMessageStore = defaultMessageStore; //todo 同步刷盘 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this.flushCommitLogService = new GroupCommitService(); } else { //todo 异步刷盘 //FlushRealTimeService线程默认每500ms将MappedByteBuffer中新追加的内存刷写到磁盘 this.flushCommitLogService = new FlushRealTimeService(); } //CommitRealTimeService线程每隔200ms将ByteBuffer新追加内容提交到MappedByteBuffer中 this.commitLogService = new CommitRealTimeService(); ... }Store架构设计之消息消费
CommitLog的随机读写对性能的影响?
RocketMQ中,所有的队列存储一个文件(commitlog)中,所以rocketmq是顺序写io,随机读。每次读消息时先读逻辑队列consumQue中的元数据,再从commitlog中找到消息体。增加了开销。
那么在RocketMQ中是怎么优化的?
-
本身无论是Commitlog文件还是Consumequeue文件,都通过MMAP内存映射。(一次零拷贝技术)
-
本身存储Commitlog采用写时复制的容器处理,实现读写分离,所以很大程度上可以提高一些效率。(写时复制)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)