ZooKeeper源码分析之NIOServerCnxnFactory

ZooKeeper源码分析之NIOServerCnxnFactory,第1张

ZooKeeper源码分析之NIOServerCnxnFactory

文章目录
  • 2021SC@SDUSC
  • NIOServerCnxnFactory工作流程(一)
  • NIOServerCnxnFactory工作流程(二)
  • NIOServerCnxnFactory.configure
  • NIOServerCnxnFactory.start()启动服务该函数用来启动后台服务
  • AcceptThread
  • SelectorThread
  • ConnectionExpirerThread
  • 总结

2021SC@SDUSC

在讲述NIOServerCnxnFactory之前,先了解一下NIOServerCnxnFactory的工作流程。

NIOServerCnxnFactory工作流程(一)

说明:AcceptThread:监听端口接收连接
Select:连接放到这个队列上
Selector:通过Selector取出相应连接放到工作池上
workpool:处理连接请求

NIOServerCnxnFactory工作流程(二) NIOServerCnxnFactory.configure
    public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
        if (secure) {
            throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
        }
        configureSaslLogin();

        maxClientCnxns = maxcc;
        initMaxCnxns();
        sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
        
        cnxnExpiryQueue = new ExpiryQueue(sessionlessCnxnTimeout);
        expirerThread = new ConnectionExpirerThread();

        int numCores = Runtime.getRuntime().availableProcessors();
        // 32核的最佳点是4个选择器线程
        numSelectorThreads = Integer.getInteger(
            ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
            Math.max((int) Math.sqrt((float) numCores / 2), 1));
        if (numSelectorThreads < 1) {
            throw new IOException("numSelectorThreads must be at least 1");
        }

        numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
        workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

        String logMsg = "Configuring NIO connection handler with "
            + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
            + numSelectorThreads + " selector thread(s), "
            + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
            + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
        LOG.info(logMsg);
        for (int i = 0; i < numSelectorThreads; ++i) {
            selectorThreads.add(new SelectorThread(i));
        }

        listenBacklog = backlog;
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port {}", addr);
        if (listenBacklog == -1) {
            ss.socket().bind(addr);
        } else {
            ss.socket().bind(addr, listenBacklog);
        }
        ss.configureBlocking(false);
        acceptThread = new AcceptThread(ss, addr, selectorThreads);
    }
NIOServerCnxnFactory.start()启动服务该函数用来启动后台服务

主要完成以下事项:
(1) 创建一个WorkerService,该线程执行器的创建和管理。
(2) 启动所有的SelectorThread线程,处理已经和客户端建立好的连接发送过来的连接请求。
(3) 启动AcceptThread线程,该线程用来接收客户端的连接请求,完成连接,并把完成的连接交给SelectorThread线程接手
(4) 启动expirerThread线程,该线程用来处理断开,或产生异常的连接

    public void start() {
        stopped = false;
        // 启动worker线程池
        if (workerPool == null) {
            workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
        }
        // 启动selecotr线程池
        for (SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
        // 确保线程只启动一次, 启动accept线程
        if (acceptThread.getState() == Thread.State.NEW) {
            acceptThread.start();
        }
        // 启动终止线程
        if (expirerThread.getState() == Thread.State.NEW) {
            expirerThread.start();
        }
    }
AcceptThread

功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接。主要完成以下事项:
(1) 在run()函数中实现线程的主要逻辑。在run()函数中主要调用select()函数。

     public void run() {
         try {
             while (!stopped && !acceptSocket.socket().isClosed()) {
                 try {
                 //调用select,将连接加入队列中
                     select();
                 } catch (RuntimeException e) {
                     LOG.warn("Ignoring unexpected runtime exception", e);
                 } catch (Exception e) {
                     LOG.warn("Ignoring unexpected exception", e);
                 }
             }
         } finally {
             closeSelector();
 			// 这将唤醒选择器线程,并告诉工作线程池将开始关闭.
             if (!reconfiguring) {
                 NIOServerCnxnFactory.this.stop();
             }
             LOG.info("accept thread exitted run method");
         }
     }

(2) 在select()函数中,会调用java的nio库中的函数:selector.select()对多个socket进行监控,看是否有读、写事件发生。若没有读、写事件发生,该函数会一直阻塞。

     private void select() {
         try {
             selector.select();

             Iterator selectedKeys = selector.selectedKeys().iterator();
             while (!stopped && selectedKeys.hasNext()) {
                 SelectionKey key = selectedKeys.next();
                 selectedKeys.remove();
                 // 未获取key即无读写事件发生,阻塞
                 if (!key.isValid()) {
                     continue;
                 }
                 // 获取到key,即有读写事件发生
                 if (key.isAcceptable()) {
                     if (!doAccept()) {
                         // 如果无法从服务器上拔出新连接,请接受
                         // 排队,暂停接受,给我们自由时间
                         // 启动文件描述符,因此接受线程
                         // 不会在一个紧密的循环中旋转。
                         pauseAccept(10);
                     }
                 } else {
                     LOG.warn("Unexpected ops in accept select {}", key.readyOps());
                 }
             }
         } catch (IOException e) {
             LOG.warn("Ignoring IOException while selecting", e);
         }
     }

(3)若有能够accepted事件发生,则调用doAccept()函数进行处理。在函数doAccept中,会调用socket的accept函数,来完成和客户端的三次握手,建立起tcp连接。然后把已经完成连接的socket,设置成非阻塞:sc.configureBlocking(false); 接下来选择一个selector线程,并把连接好的socket添加到该selector线程的acceptedQueue队列中。 可见,accepted队列是一个阻塞队列,添加到该队列后,就需要selector线程来接管已连接socket的后续的消息,所以需要唤醒selector队列。在addAcceptedConnection把已连接socket添加到阻塞队列中后,调用wakeupSelector();唤醒对应的selector线程。

        private boolean doAccept() {
            // 阻塞
            boolean accepted = false;
            SocketChannel sc = null;
            try {
            	//完成和客户端的三次握手,建立起tcp连接
                sc = acceptSocket.accept();
                //非阻塞
                accepted = true;
                if (limitTotalNumberOfCnxns()) {
                    throw new IOException("Too many connections max allowed is " + maxCnxns);
                }
                InetAddress ia = sc.socket().getInetAddress();
                int cnxncount = getClientCnxnCount(ia);

                if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
                    throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
                }

                LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());
							// 设置成非阻塞
                sc.configureBlocking(false);

                // 循环将此连接分配给选择器线程
                if (!selectorIterator.hasNext()) {
                    selectorIterator = selectorThreads.iterator();
                }
                SelectorThread selectorThread = selectorIterator.next();
                //唤醒对应的selector线程
                if (!selectorThread.addAcceptedConnection(sc)) {
                    throw new IOException("Unable to add connection to selector queue"
                                          + (stopped ? " (shutdown in progress)" : ""));
                }
                acceptErrorLogger.flush();
            } catch (IOException e) {
                // 接受,maxClientCnxns,配置阻止
                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
                fastCloseSock(sc);
            }
            return accepted;
        }

    }
SelectorThread

该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。

        private void select() {
            try {
                selector.select();

                Set selected = selector.selectedKeys();
                ArrayList selectedList = new ArrayList(selected);
                Collections.shuffle(selectedList);
                Iterator selectedKeys = selectedList.iterator();
                //获取选择key
                while (!stopped && selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selected.remove(key);
									//如果key无效
                    if (!key.isValid()) {
                        cleanupSelectionKey(key);
                        continue;
                    }
                    //拥有key且有可读或者可写事件
                    if (key.isReadable() || key.isWritable()) {
                        handleIO(key);
                    } else {
                        LOG.warn("Unexpected ops in select {}", key.readyOps());
                    }
                }
            } catch (IOException e) {
                LOG.warn("Ignoring IOException while selecting", e);
            }
        }

在handleIO中,会启动woker线程池中的一个worker来处理这个事件,处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。

        
        private void handleIO(SelectionKey key) {
            IOWorkRequest workRequest = new IOWorkRequest(this, key);
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            //在处理其连接时停止选择此键
            cnxn.disableSelectable();
            key.interestOps(0);
            touchCnxn(cnxn);
            workerPool.schedule(workRequest);
        }

在IOWorkRequest.doWork()中会判断key的合法性,然后调用NIOServerCnxn.doIO(key)来处理事件,在doIO函数中,对读的事件会调用readPayload()函数来处理,对于写事件会调用handleWrite(k)来处理。其中doIO是NIOServerCnxn中的一个具体函数。通过doIO完成整个处理过程。

    
    private class IOWorkRequest extends WorkerService.WorkRequest {

        private final SelectorThread selectorThread;
        private final SelectionKey key;
        private final NIOServerCnxn cnxn;

        IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
            this.selectorThread = selectorThread;
            this.key = key;
            this.cnxn = (NIOServerCnxn) key.attachment();
        }

        public void doWork() throws InterruptedException {
            if (!key.isValid()) {
                selectorThread.cleanupSelectionKey(key);
                return;
            }
					//判断key的合法性
            if (key.isReadable() || key.isWritable()) {
                cnxn.doIO(key);

                // 检查是否关闭或doIO()是否关闭了此连接
                if (stopped) {
                    cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                    return;
                }
                if (!key.isValid()) {
                    selectorThread.cleanupSelectionKey(key);
                    return;
                }
                touchCnxn(cnxn);
            }

            //将此连接再次标记为可供选择
            cnxn.enableSelectable();
            // 在队列上推送更新请求以继续选择
            // 在当前感兴趣的 *** 作集上,可能已更改
            // 作为我们刚才执行的I/O *** 作的结果。
            if (!selectorThread.addInterestOpsUpdateRequest(key)) {
                cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
            }
        }
ConnectionExpirerThread

该类的主要任务是从ExpiryQueue cnxnExpiryQueue这个终止队列中获取已经终止的session,并对这些session和连接进行关闭。

    
    private class ConnectionExpirerThread extends ZooKeeperThread {

        ConnectionExpirerThread() {
            super("ConnnectionExpirer");
        }

        public void run() {
            try {
                while (!stopped) {
                    long waitTime = cnxnExpiryQueue.getWaitTime();
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                        continue;
                    }
                    for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
                        ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1);
                        conn.close(ServerCnxn.DisconnectReason.CONNECTION_EXPIRED);
                    }
                }

            } catch (InterruptedException e) {
                LOG.info("ConnnectionExpirerThread interrupted");
            }
        }

    }
总结

本章讲述了NIOServerCnxnFactory的运行流程以及源码执行过程

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5179298.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-18
下一篇 2022-11-18

发表评论

登录后才能评论

评论列表(0条)

保存