ZooKeeper源码分析之ServerCnxnFactory

ZooKeeper源码分析之ServerCnxnFactory,第1张

ZooKeeper源码分析之ServerCnxnFactory

文章目录
  • 2021SC@SDUSC
    • 类图
    • ServerCnxnFactory源码分析
    • 总结

2021SC@SDUSC

服务端为每一个客户端连接维护了一个ServerCnxn来进行网络IO *** 作。而ZooKeeper中使用ServerCnxnFactory来管理与维护客户端的连接。

类图 ServerCnxnFactory源码分析

(1)ServerCnxnFactory的属性,主要说明ServerCnxn管理器的信息。

	//表明此管理器为ServerCnxnFactory
    public static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "zookeeper.serverCnxnFactory";
    //管理的最大连接数量
    private static final String ZOOKEEPER_MAX_CONNECTION = "zookeeper.maxCnxns";
    //当不存在连接时连接数量为0
    public static final int ZOOKEEPER_MAX_CONNECTION_DEFAULT = 0;
    //获取日志
    private static final Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class);
    //说明此管理器是否启用了SSL
    protected boolean secure;
	
    static final ByteBuffer closeConn = ByteBuffer.allocate(0);
	// ZooKeeper服务器接受的连接总数
    protected int maxCnxns;
    //SaslServer调用返回处理
    protected SaslServerCallbackHandler saslServerCallbackHandler;
    //登录状态
    public Login login;
    //zk服务器
    protected ZooKeeperServer zkServer;
    //登录的用户
    private static String loginUser = Login.SYSTEM_USER;
    

(2)核心函数——抽象方法,具体让NIO和Netty实现。

    //获取本地接口
    public abstract int getLocalPort();
    //获取连接
    public abstract Iterable getConnections();
    //管理器配置
    public abstract void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException;
    //重配置
    public abstract void reconfigure(InetSocketAddress addr);
    
    public abstract int getMaxClientCnxnsPerHost();

    
    public abstract void setMaxClientCnxnsPerHost(int max);
    //这种方法是为了保持启动(zks)的兼容性并实现zks的共享
    //当我们添加secureCnxnFactory时。
    public abstract void startup(ZooKeeperServer zkServer, boolean startServer) throws IOException, InterruptedException;
    
    public abstract int getSocketListenBacklog();
    //休眠
    public abstract void join() throws InterruptedException;
    //关机
    public abstract void shutdown();
    //开机
    public abstract void start();
    //释放所有资源
    public abstract void closeAll(ServerCnxn.DisconnectReason reason);
        //通过不同参数,有四种创建管理器的方法
    public static ServerCnxnFactory createFactory(int clientPort, int maxClientCnxns) throws IOException {
        return createFactory(new InetSocketAddress(clientPort), maxClientCnxns, -1);
    }

    public static ServerCnxnFactory createFactory(int clientPort, int maxClientCnxns, int backlog) throws IOException {
        return createFactory(new InetSocketAddress(clientPort), maxClientCnxns, backlog);
    }

    public static ServerCnxnFactory createFactory(InetSocketAddress addr, int maxClientCnxns) throws IOException {
        return createFactory(addr, maxClientCnxns, -1);
    }

    public static ServerCnxnFactory createFactory(InetSocketAddress addr, int maxClientCnxns, int backlog) throws IOException {
        ServerCnxnFactory factory = createFactory();
        factory.configure(addr, maxClientCnxns, backlog);
        return factory;
    }
    //获取本地地址
    public abstract InetSocketAddress getLocalAddress();
    //重置所有连接数据
    public abstract void resetAllConnectionStats();
    //获取所有连接信息
    public abstract Iterable> getAllConnectionInfo(boolean brief);
    

(3)核心函数——具体方法,确保管理器正常获取服务端的数据以及自身的服务内容。

    //添加会话
    public void addSession(long sessionId, ServerCnxn cnxn) {
        sessionMap.put(sessionId, cnxn);
    }
        //添加会话
    public void addSession(long sessionId, ServerCnxn cnxn) {
        sessionMap.put(sessionId, cnxn);
    }
    //移除从会话映射的连接
    public void removeCnxnFromSessionMap(ServerCnxn cnxn) {
        long sessionId = cnxn.getSessionId();
        if (sessionId != 0) {
            sessionMap.remove(sessionId);
        }
    }
    
    //关闭会话
    public boolean closeSession(long sessionId, ServerCnxn.DisconnectReason reason) {
        ServerCnxn cnxn = sessionMap.remove(sessionId);
        if (cnxn != null) {
            try {
                cnxn.close(reason);
            } catch (Exception e) {
                LOG.warn("exception during session close", e);
            }
            return true;
        }
        return false;
    }
    //获取活跃的连接数量
    public int getNumAliveConnections() {
        return cnxns.size();
    }
    //获取服务器
    public final ZooKeeperServer getZooKeeperServer() {
        return zkServer;
    }
    //配置
    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
        configure(addr, maxcc, -1);
    }
        
    public void configure(InetSocketAddress addr, int maxcc, int backlog) throws IOException {
        configure(addr, maxcc, backlog, false);
    }
    //判断是否启用SSL
    public boolean isSecure() {
        return secure;
    }
    //为服务器启动SSL
    public final void setZooKeeperServer(ZooKeeperServer zks) {
        this.zkServer = zks;
        if (zks != null) {
            if (secure) {
                zks.setSecureServerCnxnFactory(this);
            } else {
                zks.setServerCnxnFactory(this);
            }
        }
    }
    //创建管理器
    public static ServerCnxnFactory createFactory() throws IOException {
        String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
        if (serverCnxnFactoryName == null) {
            serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
        }
        try {
            ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
                                                                           .getDeclaredConstructor()
                                                                           .newInstance();
            LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
            return serverCnxnFactory;
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName, e);
            throw ioe;
        }
    }
    //用户无注册的连接
    public void unregisterConnection(ServerCnxn serverCnxn) {
        ConnectionBean jmxConnectionBean = connectionBeans.remove(serverCnxn);
        if (jmxConnectionBean != null) {
            MBeanRegistry.getInstance().unregister(jmxConnectionBean);
        }
    }
    //用户有注册的连接
    public void registerConnection(ServerCnxn serverCnxn) {
        if (zkServer != null) {
            ConnectionBean jmxConnectionBean = new ConnectionBean(serverCnxn, zkServer);
            try {
                MBeanRegistry.getInstance().register(jmxConnectionBean, zkServer.jmxServerBean);
                connectionBeans.put(serverCnxn, jmxConnectionBean);
            } catch (JMException e) {
                LOG.warn("Could not register connection", e);
            }
        }

    }
    

    protected void configureSaslLogin() throws IOException {
        String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY, ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME);


        //此处的“配置”指的是javax.security.auth.login.Configuration。
        AppConfigurationEntry[] entries = null;
        SecurityException securityException = null;
        try {
            entries = Configuration.getConfiguration().getAppConfigurationEntry(serverSection);
        } catch (SecurityException e) {
            //如果用户不打算使用JAAS身份验证,可能是无害的。
            securityException = e;
        }

        // jaas.conf中没有条目
        // 如果在获取jaas文件和用户通过指定LOGIN_CONTEXT_NAME_KEY或jaas文件要求sasl
        // 那么抛出一个异常,否则将在没有身份验证的情况下继续。
        if (entries == null) {
            String jaasFile = System.getProperty(Environment.JAAS_CONF_KEY);
            String loginContextName = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY);
            if (securityException != null && (loginContextName != null || jaasFile != null)) {
                String errorMessage = "No JAAS configuration section named '" + serverSection + "' was found";
                if (jaasFile != null) {
                    errorMessage += " in '" + jaasFile + "'.";
                }
                if (loginContextName != null) {
                    errorMessage += " But " + ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + " was set.";
                }
                LOG.error(errorMessage);
                throw new IOException(errorMessage);
            }
            return;
        }
        
        //jaas.conf条目可用 
        try {
            saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());
            login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig());
            setLoginUser(login.getUserName());
            login.startThreadIfNeeded();
        } catch (LoginException e) {
            throw new IOException("Could not configure server because SASL configuration did not allow the "
                                  + " ZooKeeper server to authenticate itself properly: "
                                  + e);
        }
    }
     //创建此方法是为了避免 ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD 找到bug问题
    private static void setLoginUser(String name) {
        loginUser = name;
    }
    
    public static String getUserName() {
        return loginUser;
    }
    
    public int getMaxCnxns() {
        return maxCnxns;
    }
    //初始化最大连接数
    protected void initMaxCnxns() {
        maxCnxns = Integer.getInteger(ZOOKEEPER_MAX_CONNECTION, ZOOKEEPER_MAX_CONNECTION_DEFAULT);
        if (maxCnxns < 0) {
            maxCnxns = ZOOKEEPER_MAX_CONNECTION_DEFAULT;
            LOG.warn("maxCnxns should be greater than or equal to 0, using default vlaue {}.",
                    ZOOKEEPER_MAX_CONNECTION_DEFAULT);
        } else if (maxCnxns == ZOOKEEPER_MAX_CONNECTION_DEFAULT) {
            LOG.warn("maxCnxns is not configured, using default value {}.",
                    ZOOKEEPER_MAX_CONNECTION_DEFAULT);
        } else {
            LOG.info("maxCnxns configured value is {}.", maxCnxns);
        }
    }
    
    protected boolean limitTotalNumberOfCnxns() {
        if (maxCnxns <= 0) {
            // maxCnxns limit is disabled
            return false;
        }
        int cnxns = getNumAliveConnections();
        if (cnxns >= maxCnxns) {
            LOG.error("Too many connections " + cnxns + " - max is " + maxCnxns);
            return true;
        }
        return false;
    }
总结

Zookeeper作为一个服务器,自然要与客户端进行网络通信,如何高效的与客户端进行通信,让网络IO不成为ZooKeeper的瓶颈是ZooKeeper急需解决的问题。
ZooKeeper中使用ServerCnxnFactory管理与客户端的连接,其有两个实现,一个是NIOServerCnxnFactory,使用Java原生NIO实现;一个是NettyServerCnxnFactory,使用netty实现。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4659636.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存