namesrv官方介绍启动流程启动入口`NamesrvStartup`
1.创建NamesrvController2.`NamesrvController`构造函数3.NamesrvController初始化`initialize()`
3.1注册消息处理器`registerProcessor()` 4.NamesrvController的start()
namesrv官方介绍NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。
主要包括两个功能:
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
启动流程 启动入口NamesrvStartupnamesrv启动的入口是在NamesrvStartup,方法是main
public static NamesrvController main0(String[] args) { try { //创建NamesrvController NamesrvController controller = createNamesrvController(args); //启动namesrc start(controller); }1.创建NamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { //设置Rokcetmq的版本号 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); //命令行选项解析 Options options = ServerUtil.buildCommandlineOptions(new Options()); //解析命令行为 ‘mqnamesrv’的参数 commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); //如果为空,直接退出 if (null == commandLine) { System.exit(-1); return null; } //创建namesrv,netty的相关配置对象 final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //设置netty的服务端监听的端口 9876 nettyServerConfig.setListenPort(9876); //解析命令行参数'-c':指定namesrv的配置文件路径 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } //解析命令行参数'-p':启动时候日志打印配置信息 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //初始化logback配置 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //初始化NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard // 记住所有的配置以防止丢弃 controller.getConfiguration().registerConfig(properties); return controller; }2.NamesrvController构造函数
构造入参说明
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; //kv的配置管理类 this.kvConfigManager = new KVConfigManager(this); //路由信息管理类 this.routeInfoManager = new RouteInfoManager(); //心跳连接处理类 this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }
重点看一下这个RouteInfoManager构造方法
public RouteInfoManager() { //topic跟queue的映射关系,存储每个topic对应的broker名称、queue数量等信息 this.topicQueueTable = new HashMap3.NamesrvController初始化initialize()>(1024); //同一个BrokerName可能对应多台机器,一个Master和多个Slave。这个结构储存着一个BrokerName对应的属性信息,包括所属Cluster名称,一个Master Broker地址和多个SlaveBroker的地址等 this.brokerAddrTable = new HashMap (128); //储存集群中Cluster的信息,一个cluster下的多个BrokerName this.clusterAddrTable = new HashMap >(32); //存储这台Broker机器的实时状态,包括上次更新状态的时间戳,NameServer会定期检查这个时间戳,超时没有更新就认为这个Broker无效了,将其从Broker列表里清除。 this.brokerLiveTable = new HashMap (256); //过滤服务器,是RocketMQ的一种服务端过滤方式,一个Broker可以有一个或多个FilterServer。这个结构的Key是Broker的地址,Value是和这个Broker关联的多个Filter Server的地址 this.filterServerTable = new HashMap >(256); }
public boolean initialize() { //加载namesrvController指定的kvConfig配置文件(常为xxx/kvConfig.json)到内存 this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注册默认的处理器 this.registerProcessor(); //定时扫描不活跃的broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //定时打印configTable信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; }3.1注册消息处理器registerProcessor()
该方法主要是给remotingServer注册默认的DefaultRequestProcessor处理器,当有请求到达nettyServer时会根据RequestCode处理不同的逻辑
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { //省略...... //重点是这个,broker注册请求 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } }
请求最后会转发到RouteInforManager类的registerBroker()方法中
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final ListfilterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly(); Set brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet (); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap ()); this.brokerAddrTable.put(brokerName, brokerData); } Map brokerAddrsMap = brokerData.getBrokerAddrs(); //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator > it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
createAndUpdateQueueData方法
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSysFlag(topicConfig.getTopicSysFlag()); ListqueueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { queueDataList = new linkedList (); queueDataList.add(queueData); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData); } else { boolean addNewOne = true; Iterator it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); if (qd.getBrokerName().equals(brokerName)) { if (qd.equals(queueData)) { addNewOne = false; } else { log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); } } } if (addNewOne) { queueDataList.add(queueData); } } }
我们以RequestCode.REGISTER_BROKER大致以图的形式总结一下broker注册请求的流程
start方法相对来说比较简单
public void start() throws Exception { //开启服务端 this.remotingServer.start(); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)