broker 启动需要指定启动参数,-c D:IDEA_ProjectRulerocketmqconfbroker.conf, 其中的配置文件,主要是包括存储消息的路径、nameServer 地址,刷盘方式等,如下, 然后执行 org.apache.rocketmq.broker.BrokerStartup#start 完成启动。
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH # namesrvAddr地址 namesrvAddr=127.0.0.1:9876 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH autoCreateTopicEnable=true autoCreateTopicEnable=true # 存储路径 storePathRootDir=D:IDEA_ProjectRulerocketmqdatarocketmqdataDir # commitLog路径 storePathCommitLog=D:IDEA_ProjectRulerocketmqdatarocketmqdataDircommitlog # 消息队列存储路径 storePathConsumeQueue=D:IDEA_ProjectRulerocketmqdatarocketmqdataDirconsumequeue # 消息索引存储路径 storePathIndex=D:IDEA_ProjectRulerocketmqdatarocketmqdataDirindex # checkpoint文件路径 storeCheckpoint=D:IDEA_ProjectRulerocketmqdatarocketmqdataDircheckpoint # abort文件存储路径 abortFile=D:IDEA_ProjectRulerocketmqdatarocketmqdataDirabort
-
启动大致流程,broker 的启动与 nameServer 很类似,都是先实例化一个控制器,然后调用 org.apache.rocketmq.broker.BrokerController#initialize 初始化控制器,最后调用控制器启动。
public static void main(String[] args) { start(createBrokerController(args)); } public static BrokerController start(BrokerController controller) { // 控制器启动 controller.start(); // 打印日志 String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", " + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); if (null != controller.getBrokerConfig().getNamesrvAddr()) { tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr(); } log.info(tip); System.out.printf("%s%n", tip); return controller; }
org.apache.rocketmq.broker.BrokerStartup#createBrokerController:创建 broker 控制器,设置存储消息的存储路径、nameServer 地址等。
-
设置 netty 的发送、接收配置, brokerServer 的监听端口,broker 角色等。
// 设置默认 netty 的 socket 发送配置,128字节 if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { NettySystemConfig.socketSndbufSize = 131072; } // 设置默认 netty 的 socket 接收配置,128字节 if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { NettySystemConfig.socketRcvbufSize = 131072; } // broker 服务的监听端口为 10911 nettyServerConfig.setListenPort(10911); final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); // 默认为异步 master if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); }
-
解析启动参数,将参数中对应的配置文件内容,填充到对应的配置信息中,最后实例化 brokerController。
if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); // 加载配置文件 properties.load(in); properties2SystemEnv(properties); // 填充 broker 配置信息 MixAll.properties2Object(properties, brokerConfig); // 填充 nettyServer 配置信息 MixAll.properties2Object(properties, nettyServerConfig); // 填充 nettyClient 配置信息 MixAll.properties2Object(properties, nettyClientConfig); // 填充消息存储配置信息 MixAll.properties2Object(properties, messageStoreConfig); // 设置 broker 文件地址 BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } // 实例化 broker final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig);
调用 org.apache.rocketmq.broker.BrokerController#initialize:初始化 brokerController。
-
加载 config 路径下的配置文件,例如主题配置、消费偏移量等配置文件,最后实例化消息存储服务。
// 主题配置管理加载,配置文件为:xx/config/topics.json boolean result = this.topicConfigManager.load(); // 主题消费偏移量加载,配置文件为:xx/config/consumerOffset.json result = result && this.consumerOffsetManager.load(); // 订阅组加载,配置文件为:xx/config/subscriptionGroup.json result = result && this.subscriptionGroupManager.load(); // 消费过滤加载,配置文件为:xx/config/consumerFilter.json result = result && this.consumerFilterManager.load(); if (result) { // 上述文件都加载成功,实例化消息存储服务 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } public boolean load() { String fileName = null; try { // ScheduleMessageService: 路径在 xx/config/delayOffset.json // TopicConfigManager:路径在 xx/config/topics.json // .... fileName = this.configFilePath(); String jsonString = MixAll.file2String(fileName); // 获取不到,则从备份文件中获取 if (null == jsonString || jsonString.length() == 0) { return this.loadBak(); } else { this.decode(jsonString); log.info("load " + fileName + " OK"); return true; } } catch (Exception e) { log.error("load " + fileName + " failed, and try to load backup file", e); return this.loadBak(); } }
-
调用 org.apache.rocketmq.store.MessageStore#load:使用 messageStore 消息存储服务,加载 commitlog、消费队列文件。
// 加载具体的进度文件,commitlog、主题消费队列文件等 result = result && this.messageStore.load(); public boolean load() { boolean result = true; // 加载 abort 文件,判断是否正常退出,正常退出会产生 abort 文件 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); // 加载延迟级别对应偏移量文件 delayOffset.json if (null != scheduleMessageService) { result = result && this.scheduleMessageService.load(); } // load Commit Log result = result && this.commitLog.load(); // load Consume Queue result = result && this.loadConsumeQueue(); if (result) { // 加载 checkpoint 文件 this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); // 加载 index 文件,如果不是正常退出 并且比 checkpoint 文件时间大,则删除 index 文件 this.indexService.load(lastExitOK); // 还原 cosumequeue 文件,接着根据是否异常退出,恢复不正常的 commitlog 文件 this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); } return result; }
-
启动 broker 远程服务,并且启动 vip 通道服务。
// 启动 broker 服务 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); // vip 通道的监听端口-2 fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
-
定义线程池,用于发送消息、拉取消息、查询消息、监控 broker 、心跳检测、消费者管理线程池。并注册处理器,其中的请求任务,放入刚定义好的线程池中。
// 发送消息线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); // 拉取消息线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); // 查询消息线程池 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); // 监控 broker 线程池 this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); // 客户端管理线程池 this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); // 发送心跳线程池 this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getHeartbeatThreadPoolNums(), this.brokerConfig.getHeartbeatThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_",true)); // 消费者管理线程池 this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); // 注册处理器 this.registerProcessor();
-
定义一系列的定时任务,按照频率检查 broker状态、统计接收、拉取消息数量、持久化消息文件、修改 nameServer 地址等。
final long period = 1000 * 60 * 60 * 24; // 每隔 1天定时记录 broker 状态:发送、拉取消息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); // 每隔 10s 持久化消息队列偏移文件 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); // 每隔 10s 持久化消费者过滤文件 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); // 每隔 3min 检查消费队列是否消费缓慢,如果disableConsumeIfConsumerReadSlowly=true,则停止消费 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); // 每隔 1s 打印统计的流控日志 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); // 每隔 1min 记录剩余需要重放消息字节数 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); if (this.brokerConfig.getNamesrvAddr() != null) { // 设置修改 nameServer 地址 this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { // 每隔 2min 获取 nameServer 地址 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); }
-
从 master 节点同步 broker 的 salve 节点的数据。
// broker 为从节点时,更新主节点的 HA if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } // 每隔 1min,从节点从 master 节点同步数据 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask syncAll slave exception", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } else { // 每隔 1min,打印主从节点的差异 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); }
-
实例化文件监听服务,并且初始化事务消息服务。
fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); ((NettyRemotingServer) fastRemotingServer).loadSslContext(); } }); private void initialTransaction() { // 实例化事务消息服务 this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class); if (null == this.transactionalMessageService) { this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore())); log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName()); } // 实例化事务消息回查服务 this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class); if (null == this.transactionalMessageCheckListener) { this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener(); log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName()); } this.transactionalMessageCheckListener.setBrokerController(this); this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); }
broker 正常关闭时,会执行钩子方法,其中会停止上述实例化、初始化的服务、向 nameServer 请求注销服务,删除 abort 文件。
// broker 正常 shutdown 时,执行该钩子方法 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; private AtomicInteger shutdownTimes = new AtomicInteger(0); @Override public void run() { synchronized (this) { log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long beginTime = System.currentTimeMillis(); // 下线 *** 作 controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - beginTime; log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } }, "ShutdownHook"));2.4 启动 broker 控制器
调用 org.apache.rocketmq.broker.BrokerController#start:开启实例化时注册的各种的服务,并注册 broker。
public void start() throws Exception { // 开启消息存储服务 if (this.messageStore != null) { this.messageStore.start(); } // 开启与 nameServer 通信服务 if (this.remotingServer != null) { this.remotingServer.start(); } // 开启 vip 通道,与 nameServer 通信服务 if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); } // 文件监听服务 if (this.fileWatchService != null) { this.fileWatchService.start(); } if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); } if (this.filterServerManager != null) { this.filterServerManager.start(); } // 注册 broker this.registerBrokerAll(true, false, true); // 每隔最少 10s 注册 broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); } // master 节点开启事务回查服务 if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) { if (this.transactionalMessageCheckService != null) { log.info("Start transaction service!"); this.transactionalMessageCheckService.start(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)