- 2021SC@SDUSC
- NIOServerCnxnFactory工作流程(一)
- NIOServerCnxnFactory工作流程(二)
- NIOServerCnxnFactory.configure
- NIOServerCnxnFactory.start()启动服务该函数用来启动后台服务
- AcceptThread
- SelectorThread
- ConnectionExpirerThread
- 总结
在讲述NIOServerCnxnFactory之前,先了解一下NIOServerCnxnFactory的工作流程。
NIOServerCnxnFactory工作流程(一)说明:AcceptThread:监听端口接收连接
Select:连接放到这个队列上
Selector:通过Selector取出相应连接放到工作池上
workpool:处理连接请求
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } configureSaslLogin(); maxClientCnxns = maxcc; initMaxCnxns(); sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); cnxnExpiryQueue = new ExpiryQueueNIOServerCnxnFactory.start()启动服务该函数用来启动后台服务(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread(); int numCores = Runtime.getRuntime().availableProcessors(); // 32核的最佳点是4个选择器线程 numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); if (numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000); String logMsg = "Configuring NIO connection handler with " + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, " + numSelectorThreads + " selector thread(s), " + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers.")); LOG.info(logMsg); for (int i = 0; i < numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } listenBacklog = backlog; this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port {}", addr); if (listenBacklog == -1) { ss.socket().bind(addr); } else { ss.socket().bind(addr, listenBacklog); } ss.configureBlocking(false); acceptThread = new AcceptThread(ss, addr, selectorThreads); }
主要完成以下事项:
(1) 创建一个WorkerService,该线程执行器的创建和管理。
(2) 启动所有的SelectorThread线程,处理已经和客户端建立好的连接发送过来的连接请求。
(3) 启动AcceptThread线程,该线程用来接收客户端的连接请求,完成连接,并把完成的连接交给SelectorThread线程接手
(4) 启动expirerThread线程,该线程用来处理断开,或产生异常的连接
public void start() { stopped = false; // 启动worker线程池 if (workerPool == null) { workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } // 启动selecotr线程池 for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // 确保线程只启动一次, 启动accept线程 if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } // 启动终止线程 if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }AcceptThread
功能:该线程主要是接收来自客户端的连接请求,并完成三次握手,建立tcp连接。主要完成以下事项:
(1) 在run()函数中实现线程的主要逻辑。在run()函数中主要调用select()函数。
public void run() { try { while (!stopped && !acceptSocket.socket().isClosed()) { try { //调用select,将连接加入队列中 select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // 这将唤醒选择器线程,并告诉工作线程池将开始关闭. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } }
(2) 在select()函数中,会调用java的nio库中的函数:selector.select()对多个socket进行监控,看是否有读、写事件发生。若没有读、写事件发生,该函数会一直阻塞。
private void select() { try { selector.select(); IteratorselectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); // 未获取key即无读写事件发生,阻塞 if (!key.isValid()) { continue; } // 获取到key,即有读写事件发生 if (key.isAcceptable()) { if (!doAccept()) { // 如果无法从服务器上拔出新连接,请接受 // 排队,暂停接受,给我们自由时间 // 启动文件描述符,因此接受线程 // 不会在一个紧密的循环中旋转。 pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } }
(3)若有能够accepted事件发生,则调用doAccept()函数进行处理。在函数doAccept中,会调用socket的accept函数,来完成和客户端的三次握手,建立起tcp连接。然后把已经完成连接的socket,设置成非阻塞:sc.configureBlocking(false); 接下来选择一个selector线程,并把连接好的socket添加到该selector线程的acceptedQueue队列中。 可见,accepted队列是一个阻塞队列,添加到该队列后,就需要selector线程来接管已连接socket的后续的消息,所以需要唤醒selector队列。在addAcceptedConnection把已连接socket添加到阻塞队列中后,调用wakeupSelector();唤醒对应的selector线程。
private boolean doAccept() { // 阻塞 boolean accepted = false; SocketChannel sc = null; try { //完成和客户端的三次握手,建立起tcp连接 sc = acceptSocket.accept(); //非阻塞 accepted = true; if (limitTotalNumberOfCnxns()) { throw new IOException("Too many connections max allowed is " + maxCnxns); } InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); // 设置成非阻塞 sc.configureBlocking(false); // 循环将此连接分配给选择器线程 if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); //唤醒对应的selector线程 if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // 接受,maxClientCnxns,配置阻止 ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } }SelectorThread
该线程接管连接完成的socket,接收来自该socket的命令处理命令,把处理结果返回给客户端。在主流程中,会调用select()函数来监控socket是否有读和写事件,若有读和写事件会调用handleIO(key)函数对事件进行处理。
private void select() { try { selector.select(); Setselected = selector.selectedKeys(); ArrayList selectedList = new ArrayList (selected); Collections.shuffle(selectedList); Iterator selectedKeys = selectedList.iterator(); //获取选择key while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selected.remove(key); //如果key无效 if (!key.isValid()) { cleanupSelectionKey(key); continue; } //拥有key且有可读或者可写事件 if (key.isReadable() || key.isWritable()) { handleIO(key); } else { LOG.warn("Unexpected ops in select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } }
在handleIO中,会启动woker线程池中的一个worker来处理这个事件,处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。
private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); //在处理其连接时停止选择此键 cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); workerPool.schedule(workRequest); }
在IOWorkRequest.doWork()中会判断key的合法性,然后调用NIOServerCnxn.doIO(key)来处理事件,在doIO函数中,对读的事件会调用readPayload()函数来处理,对于写事件会调用handleWrite(k)来处理。其中doIO是NIOServerCnxn中的一个具体函数。通过doIO完成整个处理过程。
private class IOWorkRequest extends WorkerService.WorkRequest { private final SelectorThread selectorThread; private final SelectionKey key; private final NIOServerCnxn cnxn; IOWorkRequest(SelectorThread selectorThread, SelectionKey key) { this.selectorThread = selectorThread; this.key = key; this.cnxn = (NIOServerCnxn) key.attachment(); } public void doWork() throws InterruptedException { if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } //判断key的合法性 if (key.isReadable() || key.isWritable()) { cnxn.doIO(key); // 检查是否关闭或doIO()是否关闭了此连接 if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } touchCnxn(cnxn); } //将此连接再次标记为可供选择 cnxn.enableSelectable(); // 在队列上推送更新请求以继续选择 // 在当前感兴趣的 *** 作集上,可能已更改 // 作为我们刚才执行的I/O *** 作的结果。 if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } }ConnectionExpirerThread
该类的主要任务是从ExpiryQueue cnxnExpiryQueue这个终止队列中获取已经终止的session,并对这些session和连接进行关闭。
private class ConnectionExpirerThread extends ZooKeeperThread { ConnectionExpirerThread() { super("ConnnectionExpirer"); } public void run() { try { while (!stopped) { long waitTime = cnxnExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) { ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1); conn.close(ServerCnxn.DisconnectReason.CONNECTION_EXPIRED); } } } catch (InterruptedException e) { LOG.info("ConnnectionExpirerThread interrupted"); } } }总结
本章讲述了NIOServerCnxnFactory的运行流程以及源码执行过程
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)