- 一、NameServer作用
- 1.1 通信
- 二、NameServer 的启动
- 2.1 创建NamesrvController
- 解析配置
- NamesrvConfig 默认配置
- NettyServerConfig 默认配置
- 初始化NamesrvController
- 2.2通过 NamesrvController 启动
- 2.2.1 初始化
- 2.2.2 添加优雅关闭线程池的钩子函数
- 2.2.3 启动NettyServer
- 三、路由管理器RouteInfoManager
- 3.1 路由元数据信息
- 3.2 移除不活跃Broker
- 四、网络请求处理器DefaultRequestProcessor
- 总结
NameServer作为 RocketMQ 的服务路由中心,其主要起到了如下的作用:
-
服务注册
Broker 在启动的时候会向 NameServer 注册自己的信息,Broker 宕机时 NameServer 也会剔除该 Broker 信息
-
路由发现
Produer 发送消息或 Consumer 拉取消息,都需要从 NameServer 获取路由信息
-
通信、维持心跳
Producer、Consumer和 Broker 都会与 NameServer 进行通信或定时发送心跳
(NameServer 集群互相独立,因此就可能会造成某时刻数据不一致,但不会产生太大的影响,最多是某时刻proder 发送的消息不均衡)
1.1 通信- Broker 会每隔30s 向 NameServer 发送心跳(如果是 NameServer 集群,则会发送给每个 NameServer),同时包含自己的相关信息,如名称、地址、topic等,NameServer会维护每个 Broker 的相关信息,并记录收到心跳时的时间戳,用于判断 Broker 是否宕机(默认120s 内没有收到 Broker 的心跳则会将其从列表中剔除);
- Producer 和 Consumer 会每30s 从 NameServer 获取最新的 topic 信息更新到本地,Producer 会用于负载均衡处理消息发送
- NameServer 和所有的 Broker 保持一个长连接,且每隔10s 会扫描一次本地维护Broker 信息的 table,比较每个 broker 上次收到心跳的时间戳,如果已经超过120s 了,说明120s 内没收到该 broker 的心跳,则会将该 broker 的信息移除,更新 topic 的路由信息。
启动类:org.apache.rocketmq.namesrv.NamesrvStartup
public static NamesrvController main0(String[] args) { try { NamesrvController controller = createNamesrvController(args); start(controller); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
整个流程很简单,分量大步:
- 创建NamesrvController
- 启动(NamesrvController)
NamesrvController是 NameServer 的核心组件,用于接收网络请求。
2.1 创建NamesrvController 解析配置首先要做的就是解析传入的配置参数等信息,将其保存到NamesrvConfig和NettyServerConfig两个类中。
-
NamesrvConfig
主要包含了 NameServer 自身运行的参数
-
NettyServerConfig
主要包含了 Netty 服务端的配置参数
首先看下两个类的默认配置:
根据 debug的显示,在创建后就有的默认配置:
NamesrvConfig 默认配置//RocketMQ的主目录 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); //NameServer 存储 KV 配置属性的文件地址 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // 默认配置文件的地址,指定配置文件启动通过-c configPath 命令 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; //生产环境名称 private String productEnvName = "center"; //是否启动集群测试 private boolean clusterTest = false; //是否支持有序消息,默认不支持 private boolean orderMessageEnable = false;NettyServerConfig 默认配置
//端口号,下面启动时被默认覆盖为9876 private int listenPort = 8888; //Netty工作线程数 private int serverWorkerThreads = 8; //Netty的public线程池的线程数,默认是0 private int serverCallbackExecutorThreads = 0; //Netty的IO线程池线程数量。主要负责处理网络请求,解析请求包,再转发到各个业务线程池。最后返回结果 private int serverSelectorThreads = 3; //Broker端的两个配置参数 //sendoneWay 消息请求的最大并发度 private int serveronewaySemaphorevalue = 256; // 异步消息发送的最大并发数 private int serverAsyncSemaphorevalue = 64; //网络连接最大空闲时间默认120s,超过该空闲时间的连接被关闭 private int serverChannelMaxIdleTimeSeconds = 120; //网络Socket发送缓冲区大小 默认64KB,下同 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; //接收端缓存区大小 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; // 是否开启 ByteBuffer 缓存 private boolean serverPooledByteBufAllocatorEnable = true; //是否启用Epoll模型,Linux 下默认是开启的 private boolean useEpollNativeSelector = false;
-
serverCallbackExecutorThreads
puclic 任务线程数。Netty 根据 RequestCode 确定业务类型,内部每个业务类型对应一个线程池处理,如果没有对应的 RequestCode,则由该 public 线程池处理
在2.2.1中创建 NettyServer 网络处理器时,如果为0,则会改为4个
1:-c 处理
- 设置NameServer 监听的端口,硬编码:9876
- 如果指定了-c,即指定配置文件启动,则进行加载,并设置到两个配置类中(具体怎么做的就不深究了)
final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876); //解析三个配置对象 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); in.close(); } }
2:-p 指令处理
指定了-p 的话,则仅将当前的配置打印出来就退出。
if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } // 形如 --property value 形式的指令加载到配置中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
最后根据两个参数配置类创建 NamesrvController后返回“
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); ........... return controller;初始化NamesrvController
以下是 NamesrvController 的构造方法:
初始化了一系列组件
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; this.nettyServerConfig = nettyServerConfig; //kv 配置管理器 this.kvConfigManager = new KVConfigManager(this); // 路由信息管理器 this.routeInfoManager = new RouteInfoManager(); this.brokerHousekeepingService = new BrokerHousekeepingService(this); //包含两个配置类的所有配置的类Configuration,会保存到内部的Properties对象 this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }2.2通过 NamesrvController 启动
//初始化,主要是初始化几个定时任务 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); controller.start();
分三步走:
- controller.initialize();进行初始化工作
- 添加优雅关闭线程池的钩子
- 启动
-
从 kv 文件中加载配置到 kv 管理器
-
创建 NettyServer 网络处理对象
- 初始化 Netty 相关组件,如ServerBootstrap、ChannelEventListener等
- 初始化 public 线程池,线程数量为4默认;
- 初始化eventLoopGroupBoss和eventLoopGroupSelector【这块需要了解下 Netty 基础知识】判断是否为 Linux 机器,如果是 Linux 且配置开启 Epoll,则使用 Epoll 模型EpollEventLoopGroup,否则使用Nio 模型NioEventLoopGroup,
-
初始化 Netty 工作线程池
-
注册Processor,把remotingExecutor注入到remotingServer中
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
其中,DefaultRequestProcessor组件会处理 NettyServer 接收到的所有网络请求
-
创建两个定时任务:
-
定时任务1:
每10s 扫描一次 Broker,移除不活跃的 Broker
-
定时任务2:
每10min 打印一次 KV 配置信息
-
public boolean initialize() { //1.加载KV配置 this.kvConfigManager.load(); //2.创建NettyServer网络处理对象 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //3.Netty服务器的工作线程池 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); /, List> topicQueueTable; private final HashMap brokerAddrTable; private final HashMap > clusterAddrTable; private final HashMap brokerLiveTable; private final HashMap > filterServerTable;
以上是保存对应信息的数据结构:
-
topicQueueTable
topic 消息队列的路由信息,Produer 就根据该路由表进行负载均衡的发送消息。
key 是 topic 名,value 是 topic 下的队列信息,结构如下:
//broker 名 private String brokerName; //读队列数 private int readQueueNums; //写队列数 private int writeQueueNums; //perm 值 读写的权限 private int perm; //topic 的同步标记 private int topicSynFlag;
-
brokerAddrTable
key 是 brokerName,value 是 BrokerData,其结构如下:
//1.集群名称 private String cluster; //2. broker 名称 private String brokerName; //3. brokerId 和 Broker 地址的映射 private HashMap
brokerAddrs; -
clusterAddrTable
集群的信息。key 为集群名,value 为集群下的所有 broker 名
-
brokerLiveTable
Broker 的状态信息。key 为 broker 的地址,value 为 Broker 存活的状态信息:
//上次收到该 Broker 心跳的时间戳 【主要是这个时间戳重要】 private long lastUpdateTimestamp; //版本 private DataVersion dataVersion; //channel private Channel channel; //高可用地址 private String haServerAddr;
Broker每30s 发一次心跳,NameServer 收到心跳后,就会将对应的时间戳保存到该 Broker 下的状态信息中,用于后面每10s 扫描判断是否超过120s 未收到心跳。
在 Controller 的初始化方法中,创建了一个定时任务(10s 一次),会调用路由管理器内的扫描不活跃的 Broker 方法:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS);
看下如何做的:
public void scanNotActiveBroker() { //扫描的就是这个BrokerLiveTable Iterator> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); //根据心跳时间判断是否存活的核心逻辑。 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
主要就是两步:
- 扫描brokerLiveTable,上面介绍了,内部保存了每个 broker 的上次心跳的时间;
- 上次心跳时间加120秒,如果小于当前时间,说明已经120s 没有收到该 broker 的心跳了;则直接remove 掉
- 在onChannelDestroy方法中,就是从上面5个元数据中找到该 broker 有关的信息然后都删除掉
该处理器就是专门负责处理 NameServer 收到的网络请求,对应的方法为processRequest:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); //先关注注册Broker的请求 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { //注册Broker的实际方法 return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINFO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETe_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }
可以看到,就是根据不同的请求类型调用对应的处理逻辑,这里不关注具体处理细节,只看下有请求类型RequestCode:
- KV配置相关的(添加、获取、删除)
- 注册 Broker
- 根据 topic 获取路由信息
- TOPIC 相关的(获取所有 topic 列表、删除 topic、获取系统 topic 等等)
- 获取以及更新 NameServer 配置
我们看到有一个注册 Broker 的请求,该请求肯定是在 Broker 启动时由 Broker 端调用来把自己注册到 NameServer 的;
而根据 topic 获取路由信息肯定就是用于路由发现,如 Producer 发消息时进行调用实现负载均衡。
总结NameServer 启动流程总体还是比较简单的,这里简单梳理下:
- 解析配置,保存到 NamesrvConfig 和 NettyServerConfig,并初始化 NamesrvController
- NamesrvController 是 NameServer 端的核心组件,里面比较重要的是这个路由管理器RouteInfoManager;
- 路由管理器内维护了 Broker 路由的元数据信息,具体结构可以参考3.1节;
- NamesrvController 中会开启一个定时任务,10s扫描一次路由管理器内的Broker 活跃列表,并移除超过120s 未发送心跳的 broker 元数据信息;
- NameServer 端会处理各类从 Broker 或 Produer 或 Consumer 发送来的网络请求,这些请求都是在网络请求处理器DefaultRequestProcessor中处理,并通过RequestCode区分不同的请求
下一节将带大家梳理 Broker 端的启动流程。
更多内容请关注公众号:JavaBeat
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)