- 2021SC@SDUSC
- 类图
- ServerCnxnFactory源码分析
- 总结
服务端为每一个客户端连接维护了一个ServerCnxn来进行网络IO *** 作。而ZooKeeper中使用ServerCnxnFactory来管理与维护客户端的连接。
类图 ServerCnxnFactory源码分析(1)ServerCnxnFactory的属性,主要说明ServerCnxn管理器的信息。
//表明此管理器为ServerCnxnFactory public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory"; //管理的最大连接数量 private static final String ZOOKEEPER_MAX_CONNECTION = "zookeeper.maxCnxns"; //当不存在连接时连接数量为0 public static final int ZOOKEEPER_MAX_CONNECTION_DEFAULT = 0; //获取日志 private static final Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class); //说明此管理器是否启用了SSL protected boolean secure; static final ByteBuffer closeConn = ByteBuffer.allocate(0); // ZooKeeper服务器接受的连接总数 protected int maxCnxns; //SaslServer调用返回处理 protected SaslServerCallbackHandler saslServerCallbackHandler; //登录状态 public Login login; //zk服务器 protected ZooKeeperServer zkServer; //登录的用户 private static String loginUser = Login.SYSTEM_USER;
(2)核心函数——抽象方法,具体让NIO和Netty实现。
//获取本地接口 public abstract int getLocalPort(); //获取连接 public abstract IterablegetConnections(); //管理器配置 public abstract void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException; //重配置 public abstract void reconfigure(InetSocketAddress addr); public abstract int getMaxClientCnxnsPerHost(); public abstract void setMaxClientCnxnsPerHost(int max); //这种方法是为了保持启动(zks)的兼容性并实现zks的共享 //当我们添加secureCnxnFactory时。 public abstract void startup(ZooKeeperServer zkServer, boolean startServer) throws IOException, InterruptedException; public abstract int getSocketListenBacklog(); //休眠 public abstract void join() throws InterruptedException; //关机 public abstract void shutdown(); //开机 public abstract void start(); //释放所有资源 public abstract void closeAll(ServerCnxn.DisconnectReason reason); //通过不同参数,有四种创建管理器的方法 public static ServerCnxnFactory createFactory(int clientPort, int maxClientCnxns) throws IOException { return createFactory(new InetSocketAddress(clientPort), maxClientCnxns, -1); } public static ServerCnxnFactory createFactory(int clientPort, int maxClientCnxns, int backlog) throws IOException { return createFactory(new InetSocketAddress(clientPort), maxClientCnxns, backlog); } public static ServerCnxnFactory createFactory(InetSocketAddress addr, int maxClientCnxns) throws IOException { return createFactory(addr, maxClientCnxns, -1); } public static ServerCnxnFactory createFactory(InetSocketAddress addr, int maxClientCnxns, int backlog) throws IOException { ServerCnxnFactory factory = createFactory(); factory.configure(addr, maxClientCnxns, backlog); return factory; } //获取本地地址 public abstract InetSocketAddress getLocalAddress(); //重置所有连接数据 public abstract void resetAllConnectionStats(); //获取所有连接信息 public abstract Iterable
(3)核心函数——具体方法,确保管理器正常获取服务端的数据以及自身的服务内容。
//添加会话 public void addSession(long sessionId, ServerCnxn cnxn) { sessionMap.put(sessionId, cnxn); } //添加会话 public void addSession(long sessionId, ServerCnxn cnxn) { sessionMap.put(sessionId, cnxn); } //移除从会话映射的连接 public void removeCnxnFromSessionMap(ServerCnxn cnxn) { long sessionId = cnxn.getSessionId(); if (sessionId != 0) { sessionMap.remove(sessionId); } } //关闭会话 public boolean closeSession(long sessionId, ServerCnxn.DisconnectReason reason) { ServerCnxn cnxn = sessionMap.remove(sessionId); if (cnxn != null) { try { cnxn.close(reason); } catch (Exception e) { LOG.warn("exception during session close", e); } return true; } return false; } //获取活跃的连接数量 public int getNumAliveConnections() { return cnxns.size(); } //获取服务器 public final ZooKeeperServer getZooKeeperServer() { return zkServer; } //配置 public void configure(InetSocketAddress addr, int maxcc) throws IOException { configure(addr, maxcc, -1); } public void configure(InetSocketAddress addr, int maxcc, int backlog) throws IOException { configure(addr, maxcc, backlog, false); } //判断是否启用SSL public boolean isSecure() { return secure; } //为服务器启动SSL public final void setZooKeeperServer(ZooKeeperServer zks) { this.zkServer = zks; if (zks != null) { if (secure) { zks.setSecureServerCnxnFactory(this); } else { zks.setServerCnxnFactory(this); } } } //创建管理器 public static ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName) .getDeclaredConstructor() .newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e); throw ioe; } } //用户无注册的连接 public void unregisterConnection(ServerCnxn serverCnxn) { ConnectionBean jmxConnectionBean = connectionBeans.remove(serverCnxn); if (jmxConnectionBean != null) { MBeanRegistry.getInstance().unregister(jmxConnectionBean); } } //用户有注册的连接 public void registerConnection(ServerCnxn serverCnxn) { if (zkServer != null) { ConnectionBean jmxConnectionBean = new ConnectionBean(serverCnxn, zkServer); try { MBeanRegistry.getInstance().register(jmxConnectionBean, zkServer.jmxServerBean); connectionBeans.put(serverCnxn, jmxConnectionBean); } catch (JMException e) { LOG.warn("Could not register connection", e); } } } protected void configureSaslLogin() throws IOException { String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY, ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME); //此处的“配置”指的是javax.security.auth.login.Configuration。 AppConfigurationEntry[] entries = null; SecurityException securityException = null; try { entries = Configuration.getConfiguration().getAppConfigurationEntry(serverSection); } catch (SecurityException e) { //如果用户不打算使用JAAS身份验证,可能是无害的。 securityException = e; } // jaas.conf中没有条目 // 如果在获取jaas文件和用户通过指定LOGIN_CONTEXT_NAME_KEY或jaas文件要求sasl // 那么抛出一个异常,否则将在没有身份验证的情况下继续。 if (entries == null) { String jaasFile = System.getProperty(Environment.JAAS_CONF_KEY); String loginContextName = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY); if (securityException != null && (loginContextName != null || jaasFile != null)) { String errorMessage = "No JAAS configuration section named '" + serverSection + "' was found"; if (jaasFile != null) { errorMessage += " in '" + jaasFile + "'."; } if (loginContextName != null) { errorMessage += " But " + ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + " was set."; } LOG.error(errorMessage); throw new IOException(errorMessage); } return; } //jaas.conf条目可用 try { saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration()); login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig()); setLoginUser(login.getUserName()); login.startThreadIfNeeded(); } catch (LoginException e) { throw new IOException("Could not configure server because SASL configuration did not allow the " + " ZooKeeper server to authenticate itself properly: " + e); } } //创建此方法是为了避免 ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD 找到bug问题 private static void setLoginUser(String name) { loginUser = name; } public static String getUserName() { return loginUser; } public int getMaxCnxns() { return maxCnxns; } //初始化最大连接数 protected void initMaxCnxns() { maxCnxns = Integer.getInteger(ZOOKEEPER_MAX_CONNECTION, ZOOKEEPER_MAX_CONNECTION_DEFAULT); if (maxCnxns < 0) { maxCnxns = ZOOKEEPER_MAX_CONNECTION_DEFAULT; LOG.warn("maxCnxns should be greater than or equal to 0, using default vlaue {}.", ZOOKEEPER_MAX_CONNECTION_DEFAULT); } else if (maxCnxns == ZOOKEEPER_MAX_CONNECTION_DEFAULT) { LOG.warn("maxCnxns is not configured, using default value {}.", ZOOKEEPER_MAX_CONNECTION_DEFAULT); } else { LOG.info("maxCnxns configured value is {}.", maxCnxns); } } protected boolean limitTotalNumberOfCnxns() { if (maxCnxns <= 0) { // maxCnxns limit is disabled return false; } int cnxns = getNumAliveConnections(); if (cnxns >= maxCnxns) { LOG.error("Too many connections " + cnxns + " - max is " + maxCnxns); return true; } return false; }总结
Zookeeper作为一个服务器,自然要与客户端进行网络通信,如何高效的与客户端进行通信,让网络IO不成为ZooKeeper的瓶颈是ZooKeeper急需解决的问题。
ZooKeeper中使用ServerCnxnFactory管理与客户端的连接,其有两个实现,一个是NIOServerCnxnFactory,使用Java原生NIO实现;一个是NettyServerCnxnFactory,使用netty实现。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)