OpCode.createSession&zks.finishSessionInit(request.cnxn, true);FinalRequestProcessor
LOG.debug("Processing request:: {}", request); // 日志记录 if (LOG.isTraceEnabled()) { long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } ZooTrace.logRequest(LOG, traceMask, 'E', request, ""); } ProcessTxnResult rc = null; // 请求未被过滤--对其处理 if (!request.isThrottled()) { rc = applyRequest(request); } // 需要进行回复的请求对象会包含cnxn if (request.cnxn == null) { return; } // 获取请求中的ServerCcxn以便进行回复 ServerCnxn cnxn = request.cnxn; // 获得LastProcessedZxid long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); String lastOp = "NA"; // 更新zk服务对象处理中请求个数 zks.decInProcess(); // zk服务对象执行请求完成处理 zks.requestFinished(request); // 大请求长度 int largeRequestLength = request.getLargeRequestSize(); if (largeRequestLength != -1) { // 更新currentLargeRequestBytes currentLargeRequestBytes.addAndGet(-largeRequestLength); } Code err = Code.OK; Record rsp = null; String path = null; int responseSize = 0; try { // 如果请求的事务头不是null // 且 请求的事务头的类型为错误 // 表示请求处理出错了 if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) { AuditHelper.addAuditLog(request, rc, true); // 获取请求中的异常对象 if (request.getException() != null) { // 再次抛出 throw request.getException(); } else { // 构造异常对象并抛出 throw KeeperException. create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr())); } } // 获取请求中的异常对象 KeeperException ke = request.getException(); // 如果是会话移动异常 if (ke instanceof SessionMovedException) { // 再次抛出 throw ke; } // 如果请求存在异常对象,且不是复合请求 if (ke != null && request.type != OpCode.multi) { // 再次抛出 throw ke; } LOG.debug("{}", request); // 如果请求已经过期, if (request.isStale()) { // 统计 ServerMetrics.getMetrics().STALE_REPLIES.add(1); } // 如果请求被过滤 if (request.isThrottled()) { // 抛出异常 throw KeeperException.create(Code.THROTTLEDOP); } AuditHelper.addAuditLog(request, rc); // 请求类型 switch (request.type) { // 请求为ping case OpCode.ping: { lastOp = "PING"; updateStats(request, lastOp, lastZxid); // 发送回复 responseSize = cnxn.sendResponse( new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response"); return; } // 请求为createSession case OpCode.createSession: { lastOp = "SESS"; updateStats(request, lastOp, lastZxid); zks.finishSessionInit(request.cnxn, true); return; } case OpCode.multi: { lastOp = "MULT"; rsp = new MultiResponse(); for (ProcessTxnResult subTxnResult : rc.multiResult) { OpResult subResult; switch (subTxnResult.type) { case OpCode.check: subResult = new CheckResult(); break; case OpCode.create: subResult = new CreateResult(subTxnResult.path); break; case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: subResult = new CreateResult(subTxnResult.path, subTxnResult.stat); break; case OpCode.delete: case OpCode.deleteContainer: subResult = new DeleteResult(); break; case OpCode.setdata: subResult = new SetDataResult(subTxnResult.stat); break; case OpCode.error: subResult = new ErrorResult(subTxnResult.err); if (subTxnResult.err == Code.SESSIONMOVED.intValue()) { throw new SessionMovedException(); } break; default: throw new IOException("Invalid type of op"); } ((MultiResponse) rsp).add(subResult); } break; } case OpCode.multiRead: { lastOp = "MLTR"; MultiOperationRecord multiReadRecord = new MultiOperationRecord(); ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord); rsp = new MultiResponse(); OpResult subResult; for (Op readOp : multiReadRecord) { try { Record rec; switch (readOp.getType()) { case OpCode.getChildren: rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo); subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren()); break; case OpCode.getdata: rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo); GetDataResponse gdr = (GetDataResponse) rec; subResult = new GetDataResult(gdr.getData(), gdr.getStat()); break; default: throw new IOException("Invalid type of readOp"); } } catch (KeeperException e) { subResult = new ErrorResult(e.code().intValue()); } ((MultiResponse) rsp).add(subResult); } break; } case OpCode.create: { lastOp = "CREA"; rsp = new CreateResponse(rc.path); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { lastOp = "CREA"; rsp = new Create2Response(rc.path, rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.delete: case OpCode.deleteContainer: { lastOp = "DELE"; err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.setdata: { lastOp = "SETD"; rsp = new SetDataResponse(rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.reconfig: { lastOp = "RECO"; rsp = new GetDataResponse(((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(UTF_8), rc.stat); err = Code.get(rc.err); break; } case OpCode.setACL: { lastOp = "SETA"; rsp = new SetACLResponse(rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.closeSession: { lastOp = "CLOS"; err = Code.get(rc.err); break; } case OpCode.sync: { lastOp = "SYNC"; SyncRequest syncRequest = new SyncRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest); rsp = new SyncResponse(syncRequest.getPath()); requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath()); break; } case OpCode.check: { lastOp = "CHEC"; rsp = new SetDataResponse(rc.stat); err = Code.get(rc.err); break; } case OpCode.exists: { lastOp = "EXIS"; ExistsRequest existsRequest = new ExistsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); path = existsRequest.getPath(); if (path.indexOf('') != -1) { throw new KeeperException.BadArgumentsException(); } Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.getdata: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); path = getDataRequest.getPath(); rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.setWatches: { lastOp = "SETW"; SetWatches setWatches = new SetWatches(); request.request.rewind(); ByteBufferInputStream.byteBuffer2Record(request.request, setWatches); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), Collections.emptyList(), Collections.emptyList(), cnxn); break; } case OpCode.setWatches2: { lastOp = "STW2"; SetWatches2 setWatches = new SetWatches2(); request.request.rewind(); ByteBufferInputStream.byteBuffer2Record(request.request, setWatches); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), setWatches.getPersistentWatches(), setWatches.getPersistentRecursiveWatches(), cnxn); break; } case OpCode.addWatch: { lastOp = "ADDW"; AddWatchRequest addWatcherRequest = new AddWatchRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, addWatcherRequest); zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode()); rsp = new ErrorResponse(0); break; } case OpCode.getACL: { lastOp = "GETA"; GetACLRequest getACLRequest = new GetACLRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest); path = getACLRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NonodeException(); } zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path, null); Stat stat = new Stat(); List acl = zks.getZKDatabase().getACL(path, stat); requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath()); try { zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.ADMIN, request.authInfo, path, null); rsp = new GetACLResponse(acl, stat); } catch (KeeperException.NoAuthException e) { List acl1 = new ArrayList(acl.size()); for (ACL a : acl) { if ("digest".equals(a.getId().getScheme())) { Id id = a.getId(); Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x")); acl1.add(new ACL(a.getPerms(), id1)); } else { acl1.add(a); } } rsp = new GetACLResponse(acl1, stat); } break; } case OpCode.getChildren: { lastOp = "GETC"; GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest); path = getChildrenRequest.getPath(); rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.getAllChildrenNumber: { lastOp = "GETACN"; GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest); path = getAllChildrenNumberRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NonodeException(); } zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null); int number = zks.getZKDatabase().getAllChildrenNumber(path); rsp = new GetAllChildrenNumberResponse(number); break; } case OpCode.getChildren2: { lastOp = "GETC"; GetChildren2Request getChildren2Request = new GetChildren2Request(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request); Stat stat = new Stat(); path = getChildren2Request.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NonodeException(); } zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null); ListapplyRequestchildren = zks.getZKDatabase().getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null); rsp = new GetChildren2Response(children, stat); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.checkWatches: { lastOp = "CHKW"; CheckWatchesRequest checkWatches = new CheckWatchesRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches); WatcherType type = WatcherType.fromInt(checkWatches.getType()); path = checkWatches.getPath(); boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn); if (!containsWatcher) { String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type); throw new KeeperException.NoWatcherException(msg); } requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath()); break; } case OpCode.removeWatches: { lastOp = "REMW"; RemoveWatchesRequest removeWatches = new RemoveWatchesRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches); WatcherType type = WatcherType.fromInt(removeWatches.getType()); path = removeWatches.getPath(); boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn); if (!removed) { String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type); throw new KeeperException.NoWatcherException(msg); } requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath()); break; } case OpCode.whoAmI: { lastOp = "HOMI"; rsp = new WhoAmIResponse(AuthUtil.getClientInfos(request.authInfo)); break; } case OpCode.getEphemerals: { lastOp = "GETE"; GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals); String prefixPath = getEphemerals.getPrefixPath(); Set allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId); List ephemerals = new ArrayList<>(); if (prefixPath == null || prefixPath.trim().isEmpty() || "/".equals(prefixPath.trim())) { ephemerals.addAll(allEphems); } else { for (String p : allEphems) { if (p.startsWith(prefixPath)) { ephemerals.add(p); } } } rsp = new GetEphemeralsResponse(ephemerals); break; } } } catch (SessionMovedException e) { cnxn.sendCloseSession(); return; } catch (KeeperException e) { err = e.code(); } catch (Exception e) { LOG.error("Failed to process {}", request, e); StringBuilder sb = new StringBuilder(); ByteBuffer bb = request.request; bb.rewind(); while (bb.hasRemaining()) { sb.append(Integer.toHexString(bb.get() & 0xff)); } LOG.error("Dumping request buffer: 0x{}", sb.toString()); err = Code.MARSHALLINGERROR; } ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); updateStats(request, lastOp, lastZxid); try { if (path == null || rsp == null) { responseSize = cnxn.sendResponse(hdr, rsp, "response"); } else { int opCode = request.type; Stat stat = null; switch (opCode) { case OpCode.getdata: { GetDataResponse getDataResponse = (GetDataResponse) rsp; stat = getDataResponse.getStat(); responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode); break; } case OpCode.getChildren2 : { GetChildren2Response getChildren2Response = (GetChildren2Response) rsp; stat = getChildren2Response.getStat(); responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode); break; } default: responseSize = cnxn.sendResponse(hdr, rsp, "response"); } } if (request.type == OpCode.closeSession) { cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG", e); } finally { ServerMetrics.getMetrics().RESPONSE_BYTES.add(responseSize); }
// 对请求进行处理 ProcessTxnResult rc = zks.processTxn(request); // 关闭会话 if (request.type == OpCode.closeSession && connClosedByClient(request)) { if (closeSession(zks.serverCnxnFactory, request.sessionId) || closeSession(zks.secureServerCnxnFactory, request.sessionId)) { return rc; } } // 请求是写请求 if (request.getHdr() != null) { // 统计 long propagationLatency = Time.currentWallTime() - request.getHdr().getTime(); if (propagationLatency >= 0) { ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency); } } return rc;finishSessionInit
try { if (valid) { if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { serverCnxnFactory.registerConnection(cnxn); } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { secureServerCnxnFactory.registerConnection(cnxn); } } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } try { ConnectResponse rsp = new ConnectResponse( 0, valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); if (!cnxn.isOldClient) { bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly"); } baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); cnxn.sendBuffer(bb); if (valid) { LOG.debug("Established session 0x{} with negotiated timeout {} for client {}", Long.toHexString(cnxn.getSessionId()), cnxn.getSessionTimeout(), cnxn.getRemoteSocketAddress()); // 开启连接上数据接收 cnxn.enableRecv(); } else { LOG.info("Invalid session 0x{} for client {}, probably expired", Long.toHexString(cnxn.getSessionId()), cnxn.getRemoteSocketAddress()); // 发送会话终止标记 cnxn.sendBuffer(ServerCnxnFactory.closeConn); } } catch (Exception e) { LOG.warn("Exception while establishing session, closing", e); cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT); }processTxn
// 获取请求的事务头 TxnHeader hdr = request.getHdr(); // 针对创建会话&关闭会话进行处理 processTxnForSessionEvents(request, hdr, request.getTxn()); // 写请求 final boolean writeRequest = (hdr != null); // 集群请求 final boolean quorumRequest = request.isQuorum(); // 不是写请求 // 且不是集群请求 if (!writeRequest && !quorumRequest) { // 直接返回处理结果 return new ProcessTxnResult(); } synchronized (outstandingChanges) { ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); if (hdr == null) { return new ProcessTxnResult(); } else { return getZKDatabase().processTxn(hdr, txn, digest); dataTree.processTxn(hdr, txn, digest); ProcessTxnResult result = processTxn(header, txn); this.processTxn(header, txn, false); compareDigest(header, txn, digest); return result; } // 如果是写请求 if (writeRequest) { // 获取zxid long zxid = hdr.getZxid(); // 如果 存在处理中ChangeRecord // 且 首个ChangeRecord的zxid小于这里请求的zxid while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) { // 移除 ChangeRecord cr = outstandingChanges.remove(); ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); if (cr.zxid < zxid) { LOG.warn("Zxid outstanding 0x{} is less than current 0x{}", Long.toHexString(cr.zxid), Long.toHexString(zxid)); } // 处理路径的最新状态信息 if (outstandingChangesForPath.get(cr.path) == cr) { // path的关联项从outstandingChangesForPath移除 // 因为path的关联项已经持久化到数据实体对象中了 outstandingChangesForPath.remove(cr.path); } } } // 集群请求 if (quorumRequest) { getZKDatabase().addCommittedProposal(request); } return rc; }addCommittedProposal
对最近一定数量的已经完成最后处理的请求对象,
构造关联提议对象,
将其存储起来,以备需要时使用
// 写锁 WriteLock wl = logLock.writeLock(); try { // 锁定 wl.lock(); // 容器中提议对象数量超过commitLogCount if (committedLog.size() > commitLogCount) { // 从容器移除一个元素 committedLog.remove(); // 获取首个提议对象对应集群包中的zxid---容器中所有提议中最小的zxid minCommittedLog = committedLog.peek().packet.getZxid(); } // committedLog变空了 if (committedLog.isEmpty()) { // 容器中所有提议中最小的zxid,最大的zxid此时均为处理掉请求的zxid minCommittedLog = request.zxid; maxCommittedLog = request.zxid; } // 请求对象序列化 byte[] data = SerializeUtils.serializeRequest(request); // 构造集群包 // 包类型Leader.PROPOSAL // 包zxid为处理掉对象的zxid // 包数据为对象序列化所得 QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null); // 动态构造新的提议对象 Proposal p = new Proposal(); p.packet = pp; p.request = request; // 将提议对象加入committedLog committedLog.add(p); // 更新committedLog中所有提议的最大的zxid maxCommittedLog = p.packet.getZxid(); } finally { wl.unlock(); }processTxnForSessionEvents(request, hdr, request.getTxn());
// 请求类型 int opCode = (request == null) ? hdr.getType() : request.type; // 客户端id long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; // 创建会话 if (opCode == OpCode.createSession) { // 事务头非空 // 且 事务体为CreateSessionTxn if (hdr != null && txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; // 向会话追踪提交 会话id--超时设置 sessionTracker.commitSession(sessionId, cst.getTimeOut()); } // 请求为空 // 且 请求不是本地会话 else if (request == null || !request.isLocalSession()) { LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString()); } } // 关闭会话 else if (opCode == OpCode.closeSession) { // 向会话追踪移除会话 sessionTracker.removeSession(sessionId); }集群请求判断
public boolean isQuorum() { switch (this.type) { // 读取类 case OpCode.exists: case OpCode.getACL: case OpCode.getChildren: case OpCode.getAllChildrenNumber: case OpCode.getChildren2: case OpCode.getdata: case OpCode.getEphemerals: case OpCode.multiRead: case OpCode.whoAmI: return false; // 写入类 case OpCode.create: case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: case OpCode.error: case OpCode.delete: case OpCode.deleteContainer: case OpCode.setACL: case OpCode.setdata: case OpCode.check: case OpCode.multi: case OpCode.reconfig: return true; // 会话创建,管理类在请求非本地会话时,属于集群 case OpCode.closeSession: case OpCode.createSession: return !this.isLocalSession; default: return false; } }processTxn
利用事务头,事务体,实际处理写请求
// 动态构造ProcessTxnResult对象 ProcessTxnResult rc = new ProcessTxnResult(); try { // 事务头包含客户端id rc.clientId = header.getClientId(); // 事务头包含cxid rc.cxid = header.getCxid(); // 事务头包含zxid rc.zxid = header.getZxid(); // 事务头包含类别 rc.type = header.getType(); // 事务头包含错误码 rc.err = 0; // 事务头包含multiResult rc.multiResult = null; switch (header.getType()) { // 创建节点 case OpCode.create: // 事务体 CreateTxn createTxn = (CreateTxn) txn; // 节点路径 rc.path = createTxn.getPath(); createNode( // 路径 createTxn.getPath(), // 数据 createTxn.getData(), // 权限 createTxn.getAcl(), // 临时节点包含所属客户端id createTxn.getEphemeral() ? header.getClientId() : 0, // 父节点累计改变版本 createTxn.getParentCVersion(), // zxid header.getZxid(), // 时间 header.getTime(), null); break; case OpCode.create2: CreateTxn create2Txn = (CreateTxn) txn; rc.path = create2Txn.getPath(); Stat stat = new Stat(); createNode( create2Txn.getPath(), create2Txn.getData(), create2Txn.getAcl(), create2Txn.getEphemeral() ? header.getClientId() : 0, create2Txn.getParentCVersion(), header.getZxid(), header.getTime(), stat); // 获取节点状态 rc.stat = stat; break; case OpCode.createTTL: CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn; rc.path = createTtlTxn.getPath(); stat = new Stat(); createNode( createTtlTxn.getPath(), createTtlTxn.getData(), createTtlTxn.getAcl(), EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), createTtlTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat); // 获取节点状态 rc.stat = stat; break; case OpCode.createContainer: CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn; rc.path = createContainerTxn.getPath(); stat = new Stat(); createNode( createContainerTxn.getPath(), createContainerTxn.getData(), createContainerTxn.getAcl(), EphemeralType.CONTAINER_EPHEMERAL_OWNER, createContainerTxn.getParentCVersion(), header.getZxid(), header.getTime(), stat); rc.stat = stat; break; case OpCode.delete: case OpCode.deleteContainer: DeleteTxn deleteTxn = (DeleteTxn) txn; rc.path = deleteTxn.getPath(); deleteNode(deleteTxn.getPath(), header.getZxid()); break; case OpCode.reconfig: case OpCode.setdata: SetDataTxn setDataTxn = (SetDataTxn) txn; rc.path = setDataTxn.getPath(); rc.stat = setData( setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(), header.getZxid(), header.getTime()); break; case OpCode.setACL: SetACLTxn setACLTxn = (SetACLTxn) txn; rc.path = setACLTxn.getPath(); rc.stat = setACL( setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); break; case OpCode.closeSession: long sessionId = header.getClientId(); if (txn != null) { killSession( sessionId, header.getZxid(), ephemerals.remove(sessionId), ((CloseSessionTxn) txn).getPaths2Delete()); } else { killSession(sessionId, header.getZxid()); } break; case OpCode.error: ErrorTxn errTxn = (ErrorTxn) txn; // 错误码 rc.err = errTxn.getErr(); break; case OpCode.check: // 检查 CheckVersionTxn checkTxn = (CheckVersionTxn) txn; rc.path = checkTxn.getPath(); break; case OpCode.multi: // 复合 *** 作 // 复合 *** 作的事务体 MultiTxn multiTxn = (MultiTxn) txn; // 取出复合的多个Txn Listtxns = multiTxn.getTxns(); // 动态构造用于存储多个处理结果的容器 rc.multiResult = new ArrayList (); boolean failed = false; // 依次处理每个子事务 for (Txn subtxn : txns) { // 一旦有一个子事务之前已经判定为失败 if (subtxn.getType() == OpCode.error) { // 整体失败,后续不处理 failed = true; break; } } boolean post_failed = false; // 对每个子事务依次处理 for (Txn subtxn : txns) { // 字节流 ByteBuffer bb = ByteBuffer.wrap(subtxn.getData()); Record record = null; // 子事务类型 switch (subtxn.getType()) { // 创建 case OpCode.create: // 动态构造 record = new CreateTxn(); break; case OpCode.createTTL: record = new CreateTTLTxn(); break; case OpCode.createContainer: record = new CreateContainerTxn(); break; case OpCode.delete: case OpCode.deleteContainer: record = new DeleteTxn(); break; case OpCode.setdata: record = new SetDataTxn(); break; case OpCode.error: record = new ErrorTxn(); post_failed = true; break; case OpCode.check: record = new CheckVersionTxn(); break; default: throw new IOException("Invalid type of op: " + subtxn.getType()); } assert (record != null); // 反向序列化得到请求实体 ByteBufferInputStream.byteBuffer2Record(bb, record); // 如果某个子事务判断为失败 // 且 当前子事务非失败 if (failed && subtxn.getType() != OpCode.error) { // 对此子事务,OK int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue(); // 设置子事务类型为失败 subtxn.setType(OpCode.error); // 动态构造ErrorTxn record = new ErrorTxn(ec); } assert !failed || (subtxn.getType() == OpCode.error); // 动态构造事务头 TxnHeader subHdr = new TxnHeader( // 客户端id header.getClientId(), // cxid header.getCxid(), // zxid header.getZxid(), // 时间 header.getTime(), // 类型 subtxn.getType()); // 对子事务进行处理---若存在某个子事务为失败,所有其他子事务的事务体会设置为ErrorTxn ProcessTxnResult subRc = processTxn(subHdr, record, true); // 加入处理结果 rc.multiResult.add(subRc); // 子事务结果为失败 // 整体结果不为失败 if (subRc.err != 0 && rc.err == 0) { // 设置整体结果为失败的那个子事务的结果 rc.err = subRc.err; } } break; } } catch (KeeperException e) { LOG.debug("Failed: {}:{}", header, txn, e); rc.err = e.code().intValue(); } catch (IOException e) { LOG.debug("Failed: {}:{}", header, txn, e); } // 类型为创建 // 失败原因为节点不存在 if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) { LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err); int lastSlash = rc.path.lastIndexOf('/'); String parentName = rc.path.substring(0, lastSlash); CreateTxn cTxn = (CreateTxn) txn; try { setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid()); } catch (KeeperException.NonodeException e) { LOG.error("Failed to set parent cversion for: {}", parentName, e); rc.err = e.code().intValue(); } } else if (rc.err != Code.OK.intValue()) { LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err); } if (!isSubTxn) { if (rc.zxid > lastProcessedZxid) { lastProcessedZxid = rc.zxid; } if (digestFromLoadedSnapshot != null) { compareSnapshotDigests(rc.zxid); } else { logZxidDigest(rc.zxid, getTreeDigest()); } } return rc;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)