创建完会话之后,就要进行具体 *** 作请求了。本文就重点来分析下服务端如何处理创建节点请求,也就是如何解析CreateRequest,如下图
1.Zookeeper server监听create请求具体见上一篇 server处理会话创建的博客,本文不再赘述。
public class NIOServerCnxn extends ServerCnxn { private void readPayload() throws IOException, InterruptedException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from client sessionid 0x" + Long.toHexString(sessionId) + ", likely client has closed socket"); } } // remaining()==0,说明已经读取到len个字节,数据已经全部读取到 if (incomingBuffer.remaining() == 0) { // have we read length bytes? packetReceived(); incomingBuffer.flip(); // 如果initialized初始化状态为false,说明是第一次请求,那么这个请求就是创建Session的请求 if (!initialized) { // 上文已经分析过的处理创建会话请求 readConnectRequest(); } else { // 这里是本文分析的重点,处理其他类型的请求都在这 readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; } } private void readRequest() throws IOException { // 交由ZookeeperServer来处理 zkServer.processPacket(this, incomingBuffer); } }2.ZooKeeperServer.processPacket()
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); // 解析请求头,请求头中的type代表了不同的请求类型 RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); incomingBuffer = incomingBuffer.slice(); // 权限控制相关,非本文重点,忽略 if (h.getType() == OpCode.auth) { ... } else { // sasl相关,非重点 if (h.getType() == OpCode.sasl) { ... } else { // 最终我们的其他类型请求都在这里进行处理 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); // 包装后的Request,交由submitRequest()处理 submitRequest(si); } } cnxn.incrOutstandingRequests(h); } // public void submitRequest(Request si) { ... try { // session过期时间处理,每一次新请求的到来都会延迟session的过期 touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { // 还是交由requestProcessor处理 firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } } }
3.RequestProcess处理请求根据之前的分析,RequestProcessor的处理顺序为 PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
3.1 PrepRequestProcessor.pRequest()public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { protected void pRequest(Request request) throws RequestProcessorException { // LOG.info("Prep>>> cxid = " + request.cxid + " type = " + // request.type + " id = 0x" + Long.toHexString(request.sessionId)); request.hdr = null; request.txn = null; try { switch (request.type) { case OpCode.create: CreateRequest createRequest = new CreateRequest(); // 创建节点请求处理 pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true); break; } ... } request.zxid = zks.getZxid(); nextProcessor.processRequest(request); } // 具体处理在这里 protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { case OpCode.create: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CreateRequest createRequest = (CreateRequest)record; if(deserialize) // 将客户端的请求体反序列化到CreateRequest对象中 ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); // path检查 String path = createRequest.getPath(); int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1 || path.indexOf('') != -1 || failCreate) { LOG.info("Invalid path " + path + " with session 0x" + Long.toHexString(request.sessionId)); throw new KeeperException.BadArgumentsException(path); } // ACL权限检查 List listACL = removeDuplicates(createRequest.getAcl()); if (!fixupACL(request.authInfo, listACL)) { throw new KeeperException.InvalidACLException(path); } String parentPath = path.substring(0, lastSlash); ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); // 根据创建节点类型,重置path信息 CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isSequential()) { path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try { if (getRecordForPath(path) != null) { throw new KeeperException.NodeExistsException(path); } } catch (KeeperException.NonodeException e) { // ignore this one } // 检查父节点是否临时节点 boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0; if (ephemeralParent) { throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; // 补充request的txn对象信息,后续requestProcessor会用到 request.txn = new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(), newCversion); StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) { s.setEphemeralOwner(request.sessionId); } // 修改父节点的stat信息 parentRecord = parentRecord.duplicate(request.hdr.getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL)); break; } ... }
在PrepRequestProcessor的处理中,主要是对节点创建信息的一系列校验,path是否合法,父节点是否临时节点等等,后续处理交由SyncRequestProcessor 执行
3.2 SyncRequestProcessor之前的博客中有关于事务日志分析和快照日志分析,有详细的介绍过SyncRequestProcessor 的相关方法,本质上是交由run()方法执行的,
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor { public void run() { try { ... while (true) { // 获取到本次请求,也就是创建节点请求 Request si = null; if (toFlush.isEmpty()) { si = queuedRequests.take(); } else { si = queuedRequests.poll(); if (si == null) { flush(toFlush); continue; } } if (si == requestOfDeath) { break; } if (si != null) { // 直接添加到ZKDatabase中 if (zks.getZKDatabase().append(si)) { ... } } } } } }
3.3 FinalRequestProcessor.processRequest()public class FinalRequestProcessor implements RequestProcessor { public void processRequest(Request request) { ... ProcessTxnResult rc = null; synchronized (zks.outstandingChanges) { while (!zks.outstandingChanges.isEmpty() && zks.outstandingChanges.get(0).zxid <= request.zxid) { ChangeRecord cr = zks.outstandingChanges.remove(0); if (cr.zxid < request.zxid) { LOG.warn("Zxid outstanding " + cr.zxid + " is less than current " + request.zxid); } if (zks.outstandingChangesForPath.get(cr.path) == cr) { zks.outstandingChangesForPath.remove(cr.path); } } if (request.hdr != null) { TxnHeader hdr = request.hdr; Record txn = request.txn; // 事务信息处理,具体见3.3.1 rc = zks.processTxn(hdr, txn); } // do not add non quorum packets to the queue. if (Request.isQuorum(request.type)) { zks.getZKDatabase().addCommittedProposal(request); } } ... switch (request.type) { case OpCode.create: { lastOp = "CREA"; // 创建响应对象CreateResponse rsp = new CreateResponse(rc.path); err = Code.get(rc.err); break; } ... }
3.3.1 ZookeeperServer.processTxn()
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); // 交由ZKDatabase处理 rc = getZKDatabase().processTxn(hdr, txn); ... return rc; } }
3.3.2 ZKDatabase.processTxn()
public class ZKDatabase { public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { return dataTree.processTxn(hdr, txn); } } public class DataTree { public ProcessTxnResult processTxn(TxnHeader header, Record txn){ ProcessTxnResult rc = new ProcessTxnResult(); try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; rc.multiResult = null; switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); // 直接创建节点信息,添加到DataTree中 createNode( createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime()); break; ... } } } }
总结:Zookeeper server端处理客户端创建节点请求,也是按照监听READ事件 --> 根据请求类型交由不同的方法处理 --> 检查创建节点请求各种合法性 --> 交由RequestProcessor处理 ,最终将节点信息保存到ZKDatabase中,并添加相关的事务日志信息(和快照日志信息)。