- 2021SC@SDUSC
- 前言
- SelectorThread
- 总结
本章进入源码世界来一步一步分析客户端与服务端之间是如何通过ClientCnxn/ServerCnxn来建立起网络通信的。在上一章中已经介绍客户端是如何将请求发送到服务端的,本章将介绍服务端是如何响应客户端请求的,下一章将介绍客户端收到服务端的响应之后是如何 *** 作的。
SelectorThread在上一章讲述到客户端将请求发送到服务端时,会将传输的请求包写入缓冲区中。此时服务端将通过SelectorThread去读缓冲区的内容,进而执行后续 *** 作。SelectorThread默认为NIO实现的SelectorThread,显然这是受服务连接工厂管理的,即在NIOServerCnxnFactory中实现。 SelectorThread具体实现细节在之前分析NIOServerCnxnFactory源码时讲述过,这里不再进行说明,只对涉及到本章的内容进行讲解。
①在NIOServerCnxnFactory源码分析中介绍到SelectorThread的run()方法中,首先会调用select()方法去监控连接完成的socket是否有读或写事件,在这情景下,显然select()方法检测到有读事件。即满足以下条件,会调用handleIO()方法。
if (key.isReadable() || key.isWritable()) { handleIO(key); }
②在handleIO()中,会启动woker线程池中的一个worker来处理这个事件,处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。
第一步:调用workerPool.schedule(workRequest);
private void handleIO(SelectionKey key) { IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); //在处理其连接时停止选择此键 cnxn.disableSelectable(); key.interestOps(0); touchCnxn(cnxn); //启动worker线程池中的一个worker workerPool.schedule(workRequest); }
第二步:调用schedule(workRequest, 0);
public void schedule(WorkRequest workRequest) { schedule(workRequest, 0); }
第三步:如果有工作池则worker.execute(scheduledWorkRequest); 如果工作池里没有worker则scheduledWorkRequest.run();
public void schedule(WorkRequest workRequest, long id) { if (stopped) { workRequest.cleanup(); return; } // 调用工作请求 ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); //如果工作池里有worker,那就使用工作池,如果没有,那就直接工作 int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.run(); } }
第四步:调用workRequest.doWork();
public void run() { try { // Check if stopped while request was on queue if (stopped) { workRequest.cleanup(); return; } //处理请求 workRequest.doWork(); } catch (Exception e) { LOG.warn("Unexpected exception", e); workRequest.cleanup(); } } }
第五步:而在基于NIO连接实现的WorkRequest,在NIOServerCnxnFactory中,即IOWorkRequest类。 其详细功能在NIOServerCnxnFactory中有介绍,这里不再阐述。主要就是为了让worker开始工作。而工作的核心 *** 作就是doIO()方法,调用cnxn.doIO(key);
public void doWork() throws InterruptedException { if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } if (key.isReadable() || key.isWritable()) { //进行IO *** 作,核心 *** 作 cnxn.doIO(key); // 检查是否关闭或doIO()是否关闭了此连接 if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } touchCnxn(cnxn); } //将此连接再次标记为可供选择 cnxn.enableSelectable(); // 在队列上推送更新请求以继续选择 // 在当前感兴趣的 *** 作集上,可能已更改 // 作为我们刚才执行的I/O *** 作的结果。 if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } }
第六步:基于NIO连接的doIO()方法在NIOServerCnxn源码分析中进行详细解读过。 主要就是对缓冲区中的内容进行读或写 *** 作。在这里给定的情形下,显然是可读 *** 作,其中读取数据内容(有效载荷)是readPayLoad()方法。
void doIO(SelectionKey k) throws InterruptedException { …… …… if (k.isReadable()) {//key可读 //将内容从socket写入incoming缓冲 int rc = sock.read(incomingBuffer); if (rc < 0) {//流结束异常,无法从客户端读取数据 handleFailedRead(); } //缓冲区已经写满 if (incomingBuffer.remaining() == 0) { boolean isPayload; //读取下个请求 if (incomingBuffer == lenBuffer) { //翻转缓冲区,可读 incomingBuffer.flip(); //读取lenBuffer的前四个字节,当读取的是内容长度时则为true,否则为false isPayload = readLength(k); //清除缓冲 incomingBuffer.clear(); } else { //因为在readLength中根据Len已经重新分配了incomingBuffer isPayload = true; } if (isPayload) { //不为四个字母,为实际内容 //读取数据内容 readPayload(); } else { //四个字母,为四字母的命令 return; } } } …… ……
第七步:readPayload()在NIOServerCnxn源码分析中也有写到,具体内容如下,主要功能就是读取请求包中携带的有效载荷。到这里服务端就已经完整接收到了客户端发送过来的 连接请求包(请求包),然后服务端会调用readConnectRequest()方法 (readRequest()方法)读取这个连接请求包(请求包)。 根据给定的场景,显然这里是调用readConnectRequest()方法。
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException { // 表示还未读取完socket中内容 if (incomingBuffer.remaining() != 0) { // have we read length bytes? // 将socket的内容读入缓冲 int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok // 流结束异常,无法从客户端读取数据 if (rc < 0) { handleFailedRead(); } } // 表示已经读取完了Socket中内容 if (incomingBuffer.remaining() == 0) { // have we read length bytes? incomingBuffer.flip(); // 接收到packet packetReceived(4 + incomingBuffer.remaining()); // 未初始化 if (!initialized) { // 读取连接请求 readConnectRequest(); } else { // 读取请求 readRequest(); } // 清除缓冲 lenBuffer.clear(); // 赋值incomingBuffer,即清除incoming缓冲 incomingBuffer = lenBuffer; } }
第八步:调用processConnectRequest()方法,该方法在ZooKeeperServer中,这是很显然的,每一个ZooKeeperServer必然都能够处理客户端发送的连接请求或者是其他请求
private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } // 处理连接请求 zkServer.processConnectRequest(this, incomingBuffer); initialized = true; }
第九步:处理连接请求,调用createSession()方法创建会话
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException, ClientCnxnLimitException { // ConnectReq的packet是没有header的,所以直接读内容,反序列化 BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); LOG.debug( "Session establishment request from client {} client's lastZxid is 0x{}", cnxn.getRemoteSocketAddress(), Long.toHexString(connReq.getLastZxidSeen())); // 生成会话ID long sessionId = connReq.getSessionId(); int tokensNeeded = 1; if (connThrottle.isConnectionWeightEnabled()) { if (sessionId == 0) { if (localSessionEnabled) { tokensNeeded = connThrottle.getRequiredTokensForLocal(); } else { tokensNeeded = connThrottle.getRequiredTokensForGlobal(); } } else { tokensNeeded = connThrottle.getRequiredTokensForRenew(); } } if (!connThrottle.checkLimit(tokensNeeded)) { throw new ClientCnxnLimitException(); } ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); boolean readonly = false; try { readonly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { // this is ok -- just a packet from an old client which // doesn't contain readonly field LOG.warn( "Connection request from old client {}; will be dropped if server is in r-o mode", cnxn.getRemoteSocketAddress()); } if (!readonly && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" + Long.toHexString(connReq.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); } // 设置客户端请求的session相关参数 int sessionTimeout = connReq.getTimeOut(); byte[] passwd = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); //暂时先不读后续请求,直到session建立,类似于 *** 作系统的关中断 cnxn.disableRecv(); if (sessionId == 0) { // 创建会话 long id = createSession(cnxn, passwd, sessionTimeout); LOG.debug( "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(id), Long.toHexString(connReq.getLastZxidSeen()), connReq.getTimeOut(), cnxn.getRemoteSocketAddress()); } else { long clientSessionId = connReq.getSessionId(); LOG.debug( "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(clientSessionId), Long.toHexString(connReq.getLastZxidSeen()), connReq.getTimeOut(), cnxn.getRemoteSocketAddress()); if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } if (secureServerCnxnFactory != null) { secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } //可以读取后续请求,类似于 *** 作系统的开中断 cnxn.setSessionId(sessionId); //重新打开会话 reopenSession(cnxn, sessionId, passwd, sessionTimeout); ServerMetrics.getMetrics().CONNECTION_RevalIDATE_COUNT.add(1); } }
第十步:创建完会话后,会调用 submitRequest()方法,将请求提交给服务端执行链,执行链会开始执行请求
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { if (passwd == null) { // Possible since it's just deserialized from a packet on the wire. passwd = new byte[0]; } // 创建会话,会话ID自增 long sessionId = sessionTracker.createSession(timeout); //随机密码 Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); //每个服务连接都只有一个会话ID cnxn.setSessionId(sessionId); //提交请求给后面的执行链 Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); submitRequest(si); return sessionId; }
第十一步:请求链,请求处理器会调用requestThrottler.submitRequest(si);
public void submitRequest(Request si) { enqueueRequest(si); } public void enqueueRequest(Request si) { if (requestThrottler == null) { synchronized (this) { try { //由于所有请求都传递给请求处理器,因此它应该等待设置请求处理器链。安装后,状态将更新为正在运行。 while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (requestThrottler == null) { throw new RuntimeException("Not started"); } } } requestThrottler.submitRequest(si); }
第十二步:当服务端开始提交请求,会调用submitRequestNow()方法,里面执行了firstProcessor.processRequest() *** 作,第一个Processor开始对请求进行预处理。
public void submitRequestNow(Request si) { …… …… touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { setLocalSessionFlag(si); //提交给后续的processor执行,一般用异步以提升性能 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type {}", si.type); // Update request accounting/throttling limits requestFinished(si); new UnimplementedRequestProcessor().processRequest(si); } …… …… }
第十三步:PrepRequestProcessor继承了ZooKeeperCriticalThread,即PrepRequestProcessor也是一个线程,在十二步中调用了该线程处理请求 *** 作,当该线程运行时即调用了run()方法,会从提交上来的请求链中获取请求,然后调用pRequest()方法对请求进行加工预处理。
public void run() { …… while (true) { ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size()); // 处理提交上来的请求 Request request = submittedRequests.take(); ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME .add(Time.currentElapsedTime() - request.prepQueueStartTime); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { break; } //预处理开始时间 request.prepStartTime = Time.currentElapsedTime(); //预处理请求 pRequest(request); } …… }
第十四步:pRequest()预处理方法中,有很多 *** 作码。这里给定的场景的创建会话。所以分析预处理创建会话的 *** 作码。 在ZooKeeper中创建或者删除会话时,不能申请请求 *** 作码。核心是将需要预处理的请求进行重新组装,方便后续Processor进行处理,调用pRequest2Txn()方法。
protected void pRequest(Request request) throws RequestProcessorException { …… case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { //在这里,组装了Request的header和txh实现,方便后续processor处理 pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); } break; …… //请求的zxid request.zxid = zks.getZxid(); ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime); //让后续processor处理,这里一般是异步以提高性能 nextProcessor.processRequest(request);
第十五步:由于所给定的场景是创建会话,所以这里分析创建会话 *** 作码的具体 *** 作。
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { …… case OpCode.createSession: //设置会话超时时间 request.request.rewind(); int to = request.request.getInt(); //组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理 request.setTxn(new CreateSessionTxn(to)); request.request.rewind(); // only add the global session tracker but not to ZKDb zks.sessionTracker.trackSession(request.sessionId, to); zks.setOwner(request.sessionId, request.getOwner()); break; …… }
第十六步:由十四步中调用的nextProcessor.processRequest(request),即下一个SyncRequestProcessor对重装的请求进行处理。与预处理类似,当SyncRequestProcessor运行时,即执行run()方法,核心功能是执行请求返回数据。
public void run() { try { // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time resetSnapshotStats(); lastFlushTime = Time.currentElapsedTime(); while (true) { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); //获取预处理过的请求 long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); //如果没有请求,则flush,阻塞等待,代表之前的请求都被处理了 if (si == null) { flush(); si = queuedRequests.take(); } //如果是消亡请求,则直接退出 if (si == REQUEST_OF_DEATH) { break; } //开始处理时的时间 long startProcessTime = Time.currentElapsedTime(); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); //将Request append到log输出流,先序列化再append,此时request还没flush到磁盘,还在内存 if (zks.getZKDatabase().append(si)) { if (shouldSnapshot()) { resetSnapshotStats(); //将内存中的log flush到磁盘 zks.getZKDatabase().rollLog(); // take a snapshot if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { snapThreadMutex.release(); } } }.start(); } } } else if (toFlush.isEmpty()) {//如果是写请求,而且flush队列为空,执行往下执行 if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } } continue; } //写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理 toFlush.add(si); //如果系统压力大,可能需要多个request才会flush,flush之后可以被后续processor处理 if (shouldFlush()) { flush(); } ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); } } catch (Throwable t) { handleException(this.getName(), t); } LOG.info("SyncRequestProcessor exited!"); }
第十七步:每当请求flush到磁盘后,会调用下一个Processor,即FinalRequestProcessor。
private void flush() throws IOException, RequestProcessorException { if (this.toFlush.isEmpty()) { return; } ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size()); //flush开始时间 long flushStartTime = Time.currentElapsedTime(); //将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄 zks.getZKDatabase().commit(); ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime); //如果没有下一个处理线程,则清理队列里的请求 if (this.nextProcessor == null) { this.toFlush.clear(); } else { //如果队列里有请求 while (!this.toFlush.isEmpty()) { final Request i = this.toFlush.remove(); long latency = Time.currentElapsedTime() - i.syncQueueStartTime; ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency); //执行后面的processor this.nextProcessor.processRequest(i); } if (this.nextProcessor instanceof Flushable) { ((Flushable) this.nextProcessor).flush(); } //flush结束的时间 lastFlushTime = Time.currentElapsedTime(); } }
第十八步:在FinalRequestProcessor拿到database的处理结果后,调用zks.finishSessionInit(request.cnxn, true)将响应通过NIOServerCnxn写回给客户端。
case OpCode.createSession: { lastOp = "SESS"; updateStats(request, lastOp, lastZxid); zks.finishSessionInit(request.cnxn, true); return; }
第十九步:向客户端的缓冲区中写入内容
public void finishSessionInit(ServerCnxn cnxn, boolean valid) { …… ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); //通过channel写回 cnxn.sendBuffer(bb); if (valid) { LOG.debug( "Established session 0x{} with negotiated timeout {} for client {}", Long.toHexString(cnxn.getSessionId()), cnxn.getSessionTimeout(), cnxn.getRemoteSocketAddress()); //打开selector的读事件 cnxn.enableRecv(); } …… }
第二十步:通过NIOServerCnxn的sendBuffer()方法写入客户端的缓冲区中。
public void sendBuffer(ByteBuffer... buffers) { if (LOG.isTraceEnabled()) { LOG.trace("Add a buffer to outgoingBuffers, sk {} is valid: {}", sk, sk.isValid()); } synchronized (outgoingBuffers) { for (ByteBuffer buffer : buffers) { outgoingBuffers.add(buffer); } outgoingBuffers.add(packetSentinel); } requestInterestOpsUpdate(); }总结
以上就服务端响应客户端连接请求的所有过程,服务端响应客户端其他请求也是如此,只是其中的 *** 作码改变以及一些细微的方法调用差异。下一章将介绍客户端收到服务端的响应后的执行流程。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)