ZooKeeper源码分析之完整网络通信流程(二)

ZooKeeper源码分析之完整网络通信流程(二),第1张

ZooKeeper源码分析之完整网络通信流程(二)

文章目录
  • 2021SC@SDUSC
  • 前言
  • SelectorThread
  • 总结

2021SC@SDUSC 前言

本章进入源码世界来一步一步分析客户端与服务端之间是如何通过ClientCnxn/ServerCnxn来建立起网络通信的。在上一章中已经介绍客户端是如何将请求发送到服务端的,本章将介绍服务端是如何响应客户端请求的,下一章将介绍客户端收到服务端的响应之后是如何 *** 作的。

SelectorThread

在上一章讲述到客户端将请求发送到服务端时,会将传输的请求包写入缓冲区中。此时服务端将通过SelectorThread去读缓冲区的内容,进而执行后续 *** 作。SelectorThread默认为NIO实现的SelectorThread,显然这是受服务连接工厂管理的,即在NIOServerCnxnFactory中实现。 SelectorThread具体实现细节在之前分析NIOServerCnxnFactory源码时讲述过,这里不再进行说明,只对涉及到本章的内容进行讲解。

①在NIOServerCnxnFactory源码分析中介绍到SelectorThread的run()方法中,首先会调用select()方法去监控连接完成的socket是否有读或写事件,在这情景下,显然select()方法检测到有读事件。即满足以下条件,会调用handleIO()方法。

    if (key.isReadable() || key.isWritable()) {
          handleIO(key);
      }

②在handleIO()中,会启动woker线程池中的一个worker来处理这个事件,处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。

第一步:调用workerPool.schedule(workRequest);

        
        private void handleIO(SelectionKey key) {
            IOWorkRequest workRequest = new IOWorkRequest(this, key);
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
            //在处理其连接时停止选择此键
            cnxn.disableSelectable();
            key.interestOps(0);
            touchCnxn(cnxn);
            //启动worker线程池中的一个worker
            workerPool.schedule(workRequest);
        }

第二步:调用schedule(workRequest, 0);

    public void schedule(WorkRequest workRequest) {
        schedule(workRequest, 0);
    }

第三步:如果有工作池则worker.execute(scheduledWorkRequest); 如果工作池里没有worker则scheduledWorkRequest.run();

    public void schedule(WorkRequest workRequest, long id) {
        if (stopped) {
            workRequest.cleanup();
            return;
        }
        // 调用工作请求
        ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);

        //如果工作池里有worker,那就使用工作池,如果没有,那就直接工作
        int size = workers.size();
        if (size > 0) {
            try {
                // make sure to map negative ids as well to [0, size-1]
                int workerNum = ((int) (id % size) + size) % size;
                ExecutorService worker = workers.get(workerNum);
                worker.execute(scheduledWorkRequest);
            } catch (RejectedExecutionException e) {
                LOG.warn("ExecutorService rejected execution", e);
                workRequest.cleanup();
            }
        } else {
            // When there is no worker thread pool, do the work directly
            // and wait for its completion
            scheduledWorkRequest.run();
        }
    }

第四步:调用workRequest.doWork();

        public void run() {
            try {
                // Check if stopped while request was on queue
                if (stopped) {
                    workRequest.cleanup();
                    return;
                }
                //处理请求
                workRequest.doWork();
            } catch (Exception e) {
                LOG.warn("Unexpected exception", e);
                workRequest.cleanup();
            }
        }

    }

第五步:而在基于NIO连接实现的WorkRequest,在NIOServerCnxnFactory中,即IOWorkRequest类。 其详细功能在NIOServerCnxnFactory中有介绍,这里不再阐述。主要就是为了让worker开始工作。而工作的核心 *** 作就是doIO()方法,调用cnxn.doIO(key);

        public void doWork() throws InterruptedException {
            if (!key.isValid()) {
                selectorThread.cleanupSelectionKey(key);
                return;
            }

            if (key.isReadable() || key.isWritable()) {
            	//进行IO *** 作,核心 *** 作
                cnxn.doIO(key);

                // 检查是否关闭或doIO()是否关闭了此连接
                if (stopped) {
                    cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
                    return;
                }
                if (!key.isValid()) {
                    selectorThread.cleanupSelectionKey(key);
                    return;
                }
                touchCnxn(cnxn);
            }

            //将此连接再次标记为可供选择
            cnxn.enableSelectable();
            // 在队列上推送更新请求以继续选择
            // 在当前感兴趣的 *** 作集上,可能已更改
            // 作为我们刚才执行的I/O *** 作的结果。
            if (!selectorThread.addInterestOpsUpdateRequest(key)) {
                cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
            }
        }

第六步:基于NIO连接的doIO()方法在NIOServerCnxn源码分析中进行详细解读过。 主要就是对缓冲区中的内容进行读或写 *** 作。在这里给定的情形下,显然是可读 *** 作,其中读取数据内容(有效载荷)是readPayLoad()方法。

    void doIO(SelectionKey k) throws InterruptedException {
						……
						……
            if (k.isReadable()) {//key可读
                //将内容从socket写入incoming缓冲
                int rc = sock.read(incomingBuffer);
                if (rc < 0) {//流结束异常,无法从客户端读取数据
                    handleFailedRead();
                }
                //缓冲区已经写满
                if (incomingBuffer.remaining() == 0) {
                    boolean isPayload;
                    //读取下个请求
                    if (incomingBuffer == lenBuffer) {
                        //翻转缓冲区,可读
                        incomingBuffer.flip();
                        //读取lenBuffer的前四个字节,当读取的是内容长度时则为true,否则为false
                        isPayload = readLength(k);
                        //清除缓冲
                        incomingBuffer.clear();
                    } else {
                        //因为在readLength中根据Len已经重新分配了incomingBuffer
                        isPayload = true;
                    }
                    if (isPayload) { //不为四个字母,为实际内容
                        //读取数据内容
                        readPayload();
                    } else {
                        //四个字母,为四字母的命令
                        return;
                    }
                }
            }
  					      ……
  					      ……

第七步:readPayload()在NIOServerCnxn源码分析中也有写到,具体内容如下,主要功能就是读取请求包中携带的有效载荷。到这里服务端就已经完整接收到了客户端发送过来的 连接请求包(请求包),然后服务端会调用readConnectRequest()方法 (readRequest()方法)读取这个连接请求包(请求包)。 根据给定的场景,显然这里是调用readConnectRequest()方法。

    
    private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
        // 表示还未读取完socket中内容
        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
            // 将socket的内容读入缓冲
            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
            // 流结束异常,无法从客户端读取数据
            if (rc < 0) {
                handleFailedRead();
            }
        }
        // 表示已经读取完了Socket中内容
        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
            incomingBuffer.flip();
            // 接收到packet
            packetReceived(4 + incomingBuffer.remaining());
            // 未初始化
            if (!initialized) {
                // 读取连接请求
                readConnectRequest();
            } else {
                // 读取请求
                readRequest();
            }
            // 清除缓冲
            lenBuffer.clear();
            // 赋值incomingBuffer,即清除incoming缓冲
            incomingBuffer = lenBuffer;
        }
    }

第八步:调用processConnectRequest()方法,该方法在ZooKeeperServer中,这是很显然的,每一个ZooKeeperServer必然都能够处理客户端发送的连接请求或者是其他请求

    private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
        if (!isZKServerRunning()) {
            throw new IOException("ZooKeeperServer not running");
        }
        // 处理连接请求
        zkServer.processConnectRequest(this, incomingBuffer);
        initialized = true;
    }

第九步:处理连接请求,调用createSession()方法创建会话

    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
        throws IOException, ClientCnxnLimitException {
        // ConnectReq的packet是没有header的,所以直接读内容,反序列化
        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
        ConnectRequest connReq = new ConnectRequest();
        connReq.deserialize(bia, "connect");
        LOG.debug(
            "Session establishment request from client {} client's lastZxid is 0x{}",
            cnxn.getRemoteSocketAddress(),
            Long.toHexString(connReq.getLastZxidSeen()));
        // 生成会话ID
        long sessionId = connReq.getSessionId();
        int tokensNeeded = 1;
        if (connThrottle.isConnectionWeightEnabled()) {
            if (sessionId == 0) {
                if (localSessionEnabled) {
                    tokensNeeded = connThrottle.getRequiredTokensForLocal();
                } else {
                    tokensNeeded = connThrottle.getRequiredTokensForGlobal();
                }
            } else {
                tokensNeeded = connThrottle.getRequiredTokensForRenew();
            }
        }

        if (!connThrottle.checkLimit(tokensNeeded)) {
            throw new ClientCnxnLimitException();
        }
        ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());

        ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);

        boolean readonly = false;
        try {
            readonly = bia.readBool("readOnly");
            cnxn.isOldClient = false;
        } catch (IOException e) {
            // this is ok -- just a packet from an old client which
            // doesn't contain readonly field
            LOG.warn(
                "Connection request from old client {}; will be dropped if server is in r-o mode",
                cnxn.getRemoteSocketAddress());
        }
        if (!readonly && this instanceof ReadOnlyZooKeeperServer) {
            String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
            LOG.info(msg);
            throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
        }
        if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
            String msg = "Refusing session request for client "
                         + cnxn.getRemoteSocketAddress()
                         + " as it has seen zxid 0x"
                         + Long.toHexString(connReq.getLastZxidSeen())
                         + " our last zxid is 0x"
                         + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                         + " client must try another server";

            LOG.info(msg);
            throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
        }
        // 设置客户端请求的session相关参数
        int sessionTimeout = connReq.getTimeOut();
        byte[] passwd = connReq.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        //暂时先不读后续请求,直到session建立,类似于 *** 作系统的关中断
        cnxn.disableRecv();
        if (sessionId == 0) {
            // 创建会话
            long id = createSession(cnxn, passwd, sessionTimeout);
            LOG.debug(
                "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                Long.toHexString(id),
                Long.toHexString(connReq.getLastZxidSeen()),
                connReq.getTimeOut(),
                cnxn.getRemoteSocketAddress());
        } else {
            long clientSessionId = connReq.getSessionId();
                LOG.debug(
                    "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                    Long.toHexString(clientSessionId),
                    Long.toHexString(connReq.getLastZxidSeen()),
                    connReq.getTimeOut(),
                    cnxn.getRemoteSocketAddress());
            if (serverCnxnFactory != null) {
                serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            if (secureServerCnxnFactory != null) {
                secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
            }
            //可以读取后续请求,类似于 *** 作系统的开中断
            cnxn.setSessionId(sessionId);
            //重新打开会话
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
            ServerMetrics.getMetrics().CONNECTION_RevalIDATE_COUNT.add(1);

        }
    }

第十步:创建完会话后,会调用 submitRequest()方法,将请求提交给服务端执行链,执行链会开始执行请求

    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        if (passwd == null) {
            // Possible since it's just deserialized from a packet on the wire.
            passwd = new byte[0];
        }
        // 创建会话,会话ID自增
        long sessionId = sessionTracker.createSession(timeout);
        //随机密码
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        ByteBuffer to = ByteBuffer.allocate(4);
        to.putInt(timeout);
        //每个服务连接都只有一个会话ID
        cnxn.setSessionId(sessionId);
        //提交请求给后面的执行链
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
        submitRequest(si);
        return sessionId;
    }

第十一步:请求链,请求处理器会调用requestThrottler.submitRequest(si);

    public void submitRequest(Request si) {
        enqueueRequest(si);
    }

    public void enqueueRequest(Request si) {
        if (requestThrottler == null) {
            synchronized (this) {
                try {
                    //由于所有请求都传递给请求处理器,因此它应该等待设置请求处理器链。安装后,状态将更新为正在运行。
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (requestThrottler == null) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        requestThrottler.submitRequest(si);
    }

第十二步:当服务端开始提交请求,会调用submitRequestNow()方法,里面执行了firstProcessor.processRequest() *** 作,第一个Processor开始对请求进行预处理。

    public void submitRequestNow(Request si) {
    ……
    ……
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                setLocalSessionFlag(si);
                //提交给后续的processor执行,一般用异步以提升性能
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type {}", si.type);
                // Update request accounting/throttling limits
                requestFinished(si);
                new UnimplementedRequestProcessor().processRequest(si);
            }
            ……
            ……
 }

第十三步:PrepRequestProcessor继承了ZooKeeperCriticalThread,即PrepRequestProcessor也是一个线程,在十二步中调用了该线程处理请求 *** 作,当该线程运行时即调用了run()方法,会从提交上来的请求链中获取请求,然后调用pRequest()方法对请求进行加工预处理。

    public void run() {
    ……
            while (true) {
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
                // 处理提交上来的请求
                Request request = submittedRequests.take();
                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
                    .add(Time.currentElapsedTime() - request.prepQueueStartTime);
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }
                //预处理开始时间
                request.prepStartTime = Time.currentElapsedTime();
                //预处理请求
                pRequest(request);
            }
            ……
 }

第十四步:pRequest()预处理方法中,有很多 *** 作码。这里给定的场景的创建会话。所以分析预处理创建会话的 *** 作码。 在ZooKeeper中创建或者删除会话时,不能申请请求 *** 作码。核心是将需要预处理的请求进行重新组装,方便后续Processor进行处理,调用pRequest2Txn()方法。

    protected void pRequest(Request request) throws RequestProcessorException {
   	 ……
   	            case OpCode.createSession:
           		case OpCode.closeSession:
                if (!request.isLocalSession()) {
                    //在这里,组装了Request的header和txh实现,方便后续processor处理
                    pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
                }
                break;
 	 ……
		 	    //请求的zxid
		        request.zxid = zks.getZxid();
		        ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
		        //让后续processor处理,这里一般是异步以提高性能
		        nextProcessor.processRequest(request);

第十五步:由于所给定的场景是创建会话,所以这里分析创建会话 *** 作码的具体 *** 作。

 protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
 ……
        case OpCode.createSession:
            //设置会话超时时间
            request.request.rewind();
            int to = request.request.getInt();
            //组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理
            request.setTxn(new CreateSessionTxn(to));
            request.request.rewind();
            // only add the global session tracker but not to ZKDb
            zks.sessionTracker.trackSession(request.sessionId, to);
            zks.setOwner(request.sessionId, request.getOwner());
            break;
 ……
 }

第十六步:由十四步中调用的nextProcessor.processRequest(request),即下一个SyncRequestProcessor对重装的请求进行处理。与预处理类似,当SyncRequestProcessor运行时,即执行run()方法,核心功能是执行请求返回数据。

    public void run() {
        try {
            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            resetSnapshotStats();
            lastFlushTime = Time.currentElapsedTime();
            while (true) {
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
                //获取预处理过的请求
                long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
                Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                //如果没有请求,则flush,阻塞等待,代表之前的请求都被处理了
                if (si == null) {
                    flush();
                    si = queuedRequests.take();
                }
                //如果是消亡请求,则直接退出
                if (si == REQUEST_OF_DEATH) {
                    break;
                }
                //开始处理时的时间
                long startProcessTime = Time.currentElapsedTime();
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);

                //将Request append到log输出流,先序列化再append,此时request还没flush到磁盘,还在内存
                if (zks.getZKDatabase().append(si)) {
                    if (shouldSnapshot()) {
                        resetSnapshotStats();
                        //将内存中的log flush到磁盘
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (!snapThreadMutex.tryAcquire()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            new ZooKeeperThread("Snapshot Thread") {
                                public void run() {
                                    try {
                                        zks.takeSnapshot();
                                    } catch (Exception e) {
                                        LOG.warn("Unexpected exception", e);
                                    } finally {
                                        snapThreadMutex.release();
                                    }
                                }
                            }.start();
                        }
                    }
                } else if (toFlush.isEmpty()) {//如果是写请求,而且flush队列为空,执行往下执行
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable) nextProcessor).flush();
                        }
                    }
                    continue;
                }
                //写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理
                toFlush.add(si);
                //如果系统压力大,可能需要多个request才会flush,flush之后可以被后续processor处理
                if (shouldFlush()) {
                    flush();
                }
                ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        }
        LOG.info("SyncRequestProcessor exited!");
    }

第十七步:每当请求flush到磁盘后,会调用下一个Processor,即FinalRequestProcessor。

    private void flush() throws IOException, RequestProcessorException {
        if (this.toFlush.isEmpty()) {
            return;
        }

        ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
        //flush开始时间
        long flushStartTime = Time.currentElapsedTime();
        //将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄
        zks.getZKDatabase().commit();
        ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
        //如果没有下一个处理线程,则清理队列里的请求
        if (this.nextProcessor == null) {
            this.toFlush.clear();
        } else {
            //如果队列里有请求
            while (!this.toFlush.isEmpty()) {
                final Request i = this.toFlush.remove();
                long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
                //执行后面的processor
                this.nextProcessor.processRequest(i);
            }
            if (this.nextProcessor instanceof Flushable) {
                ((Flushable) this.nextProcessor).flush();
            }
            //flush结束的时间
            lastFlushTime = Time.currentElapsedTime();
        }
    }

第十八步:在FinalRequestProcessor拿到database的处理结果后,调用zks.finishSessionInit(request.cnxn, true)将响应通过NIOServerCnxn写回给客户端。

            case OpCode.createSession: {
                lastOp = "SESS";
                updateStats(request, lastOp, lastZxid);

                zks.finishSessionInit(request.cnxn, true);
                return;
            }

第十九步:向客户端的缓冲区中写入内容

    public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
    ……
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
            bb.putInt(bb.remaining() - 4).rewind();
            //通过channel写回
            cnxn.sendBuffer(bb);

            if (valid) {
                LOG.debug(
                    "Established session 0x{} with negotiated timeout {} for client {}",
                    Long.toHexString(cnxn.getSessionId()),
                    cnxn.getSessionTimeout(),
                    cnxn.getRemoteSocketAddress());
                //打开selector的读事件
                cnxn.enableRecv();
               
      }
       ……
 }

第二十步:通过NIOServerCnxn的sendBuffer()方法写入客户端的缓冲区中。

    
    public void sendBuffer(ByteBuffer... buffers) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Add a buffer to outgoingBuffers, sk {} is valid: {}", sk, sk.isValid());
        }

        synchronized (outgoingBuffers) {
            for (ByteBuffer buffer : buffers) {
                outgoingBuffers.add(buffer);
            }
            outgoingBuffers.add(packetSentinel);
        }
        requestInterestOpsUpdate();
    }
总结

以上就服务端响应客户端连接请求的所有过程,服务端响应客户端其他请求也是如此,只是其中的 *** 作码改变以及一些细微的方法调用差异。下一章将介绍客户端收到服务端的响应后的执行流程。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5669781.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存