zookeeper源码解析--请求处理--FinalRequestProcessor

zookeeper源码解析--请求处理--FinalRequestProcessor,第1张

zookeeper源码解析--请求处理--FinalRequestProcessor todo
OpCode.createSession&zks.finishSessionInit(request.cnxn, true);
FinalRequestProcessor
LOG.debug("Processing request:: {}", request);
// 日志记录
if (LOG.isTraceEnabled()) 
{
    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
    if (request.type == OpCode.ping) 
    {
        traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
    }
    
    ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}

ProcessTxnResult rc = null;
// 请求未被过滤--对其处理
if (!request.isThrottled()) 
{
  rc = applyRequest(request);
}

// 需要进行回复的请求对象会包含cnxn
if (request.cnxn == null) 
{
    return;
}

// 获取请求中的ServerCcxn以便进行回复
ServerCnxn cnxn = request.cnxn;
// 获得LastProcessedZxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
String lastOp = "NA";
// 更新zk服务对象处理中请求个数
zks.decInProcess();
// zk服务对象执行请求完成处理
zks.requestFinished(request);
	// 大请求长度
	int largeRequestLength = request.getLargeRequestSize();
    if (largeRequestLength != -1) 
    {
    	// 更新currentLargeRequestBytes
        currentLargeRequestBytes.addAndGet(-largeRequestLength);
    }
Code err = Code.OK;
Record rsp = null;
String path = null;
int responseSize = 0;
try 
{
	// 如果请求的事务头不是null
	// 且 请求的事务头的类型为错误
	// 表示请求处理出错了
    if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) 
    {
        AuditHelper.addAuditLog(request, rc, true);
        // 获取请求中的异常对象
        if (request.getException() != null) 
        {
        	// 再次抛出
            throw request.getException();
        } 
        else 
        {
        	// 构造异常对象并抛出
            throw KeeperException.
            	create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));
        }
    }
	
	// 获取请求中的异常对象
    KeeperException ke = request.getException();
    // 如果是会话移动异常
    if (ke instanceof SessionMovedException) 
    {
    	// 再次抛出
        throw ke;
    }
    
    // 如果请求存在异常对象,且不是复合请求
    if (ke != null && request.type != OpCode.multi) 
    {
    	// 再次抛出
        throw ke;
    }

    LOG.debug("{}", request);
    // 如果请求已经过期,
    if (request.isStale()) 
    {
    	// 统计
        ServerMetrics.getMetrics().STALE_REPLIES.add(1);
    }
	
	// 如果请求被过滤
    if (request.isThrottled()) 
    {
      // 抛出异常
      throw KeeperException.create(Code.THROTTLEDOP);
    }
	
    AuditHelper.addAuditLog(request, rc);
    // 请求类型
    switch (request.type) 
    {
    	// 请求为ping
        case OpCode.ping: 
        {
            lastOp = "PING";
            updateStats(request, lastOp, lastZxid);
            // 发送回复
            responseSize = cnxn.sendResponse(
            	new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
            return;
        }
        // 请求为createSession
        case OpCode.createSession: 
        {
            lastOp = "SESS";
            updateStats(request, lastOp, lastZxid);
            zks.finishSessionInit(request.cnxn, true);
            return;
        }
        case OpCode.multi: 
        {
            lastOp = "MULT";
            rsp = new MultiResponse();
            for (ProcessTxnResult subTxnResult : rc.multiResult) 
            {
                OpResult subResult;
                switch (subTxnResult.type) 
                {
                case OpCode.check:
                    subResult = new CheckResult();
                    break;
                case OpCode.create:
                    subResult = new CreateResult(subTxnResult.path);
                    break;
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                    subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);
                    break;
                case OpCode.delete:
                case OpCode.deleteContainer:
                    subResult = new DeleteResult();
                    break;
                case OpCode.setdata:
                    subResult = new SetDataResult(subTxnResult.stat);
                    break;
                case OpCode.error:
                    subResult = new ErrorResult(subTxnResult.err);
                    if (subTxnResult.err == Code.SESSIONMOVED.intValue()) 
                    {
                        throw new SessionMovedException();
                    }
                    break;
                default:
                    throw new IOException("Invalid type of op");
                }

                ((MultiResponse) rsp).add(subResult);
            }

            break;
        }
        case OpCode.multiRead: 
        {
            lastOp = "MLTR";
            MultiOperationRecord multiReadRecord = new MultiOperationRecord();
            ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
            rsp = new MultiResponse();
            OpResult subResult;
            for (Op readOp : multiReadRecord) 
            {
                try 
                {
                    Record rec;
                    switch (readOp.getType()) 
                    {
                    case OpCode.getChildren:
                        rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                        subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren());
                        break;
                    case OpCode.getdata:
                        rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
                        GetDataResponse gdr = (GetDataResponse) rec;
                        subResult = new GetDataResult(gdr.getData(), gdr.getStat());
                        break;
                    default:
                        throw new IOException("Invalid type of readOp");
                    }
                } 
                catch (KeeperException e) 
                {
                    subResult = new ErrorResult(e.code().intValue());
                }

                ((MultiResponse) rsp).add(subResult);
            }
            break;
        }
        case OpCode.create: 
        {
            lastOp = "CREA";
            rsp = new CreateResponse(rc.path);
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.create2:
        case OpCode.createTTL:
        case OpCode.createContainer: 
        {
            lastOp = "CREA";
            rsp = new Create2Response(rc.path, rc.stat);
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.delete:
        case OpCode.deleteContainer: 
        {
            lastOp = "DELE";
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.setdata: 
        {
            lastOp = "SETD";
            rsp = new SetDataResponse(rc.stat);
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.reconfig: 
        {
            lastOp = "RECO";
            rsp = new GetDataResponse(((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(UTF_8), rc.stat);
            err = Code.get(rc.err);
            break;
        }
        case OpCode.setACL: 
        {
            lastOp = "SETA";
            rsp = new SetACLResponse(rc.stat);
            err = Code.get(rc.err);
            requestPathMetricsCollector.registerRequest(request.type, rc.path);
            break;
        }
        case OpCode.closeSession: 
        {
            lastOp = "CLOS";
            err = Code.get(rc.err);
            break;
        }
        case OpCode.sync: 
        {
            lastOp = "SYNC";
            SyncRequest syncRequest = new SyncRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
            rsp = new SyncResponse(syncRequest.getPath());
            requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
            break;
        }
        case OpCode.check: 
        {
            lastOp = "CHEC";
            rsp = new SetDataResponse(rc.stat);
            err = Code.get(rc.err);
            break;
        }
        case OpCode.exists: 
        {
            lastOp = "EXIS";
            ExistsRequest existsRequest = new ExistsRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
            path = existsRequest.getPath();
            if (path.indexOf('') != -1) 
            {
                throw new KeeperException.BadArgumentsException();
            }
            
            Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
            rsp = new ExistsResponse(stat);
            requestPathMetricsCollector.registerRequest(request.type, path);
            break;
        }
        case OpCode.getdata: 
        {
            lastOp = "GETD";
            GetDataRequest getDataRequest = new GetDataRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
            path = getDataRequest.getPath();
            rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
            requestPathMetricsCollector.registerRequest(request.type, path);
            break;
        }
        case OpCode.setWatches: 
        {
            lastOp = "SETW";
            SetWatches setWatches = new SetWatches();
            request.request.rewind();
            ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
            long relativeZxid = setWatches.getRelativeZxid();
            zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), Collections.emptyList(), Collections.emptyList(), cnxn);
            break;
        }
        case OpCode.setWatches2: 
        {
            lastOp = "STW2";
            SetWatches2 setWatches = new SetWatches2();
            request.request.rewind();
            ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
            long relativeZxid = setWatches.getRelativeZxid();
            zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), setWatches.getPersistentWatches(), setWatches.getPersistentRecursiveWatches(), cnxn);
            break;
        }
        case OpCode.addWatch: 
        {
            lastOp = "ADDW";
            AddWatchRequest addWatcherRequest = new AddWatchRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, addWatcherRequest);
            zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
            rsp = new ErrorResponse(0);
            break;
        }
        case OpCode.getACL: 
        {
            lastOp = "GETA";
            GetACLRequest getACLRequest = new GetACLRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
            path = getACLRequest.getPath();
            DataNode n = zks.getZKDatabase().getNode(path);
            if (n == null) 
            {
                throw new KeeperException.NonodeException();
            }

            zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path, null);
            Stat stat = new Stat();
            List acl = zks.getZKDatabase().getACL(path, stat);
            requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath());

            try 
            {
                zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.ADMIN, request.authInfo, path, null);
                rsp = new GetACLResponse(acl, stat);
            } 
            catch (KeeperException.NoAuthException e) 
            {
                List acl1 = new ArrayList(acl.size());
                for (ACL a : acl) 
                {
                    if ("digest".equals(a.getId().getScheme())) 
                    {
                        Id id = a.getId();
                        Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x"));
                        acl1.add(new ACL(a.getPerms(), id1));
                    } 
                    else 
                    {
                        acl1.add(a);
                    }
                }

                rsp = new GetACLResponse(acl1, stat);
            }
            break;
        }
        case OpCode.getChildren: 
        {
            lastOp = "GETC";
            GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
            path = getChildrenRequest.getPath();
            rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
            requestPathMetricsCollector.registerRequest(request.type, path);
            break;
        }
        case OpCode.getAllChildrenNumber: 
        {
            lastOp = "GETACN";
            GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
            path = getAllChildrenNumberRequest.getPath();
            DataNode n = zks.getZKDatabase().getNode(path);
            if (n == null) 
            {
                throw new KeeperException.NonodeException();
            }
            
            zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null);
            int number = zks.getZKDatabase().getAllChildrenNumber(path);
            rsp = new GetAllChildrenNumberResponse(number);
            break;
        }
        case OpCode.getChildren2: 
        {
            lastOp = "GETC";
            GetChildren2Request getChildren2Request = new GetChildren2Request();
            ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
            Stat stat = new Stat();
            path = getChildren2Request.getPath();
            DataNode n = zks.getZKDatabase().getNode(path);
            if (n == null) 
            {
                throw new KeeperException.NonodeException();
            }
            
            zks.checkACL(request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null);
            List children = zks.getZKDatabase().getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null);
            rsp = new GetChildren2Response(children, stat);
            requestPathMetricsCollector.registerRequest(request.type, path);
            break;
        }
        case OpCode.checkWatches: 
        {
            lastOp = "CHKW";
            CheckWatchesRequest checkWatches = new CheckWatchesRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
            WatcherType type = WatcherType.fromInt(checkWatches.getType());
            path = checkWatches.getPath();
            boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
            if (!containsWatcher) 
            {
                String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
                throw new KeeperException.NoWatcherException(msg);
            }
            
            requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath());
            break;
        }
        case OpCode.removeWatches: 
        {
            lastOp = "REMW";
            RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
            WatcherType type = WatcherType.fromInt(removeWatches.getType());
            path = removeWatches.getPath();
            boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
            if (!removed) 
            {
                String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type);
                throw new KeeperException.NoWatcherException(msg);
            }
            
            requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath());
            break;
        }
        case OpCode.whoAmI: 
        {
            lastOp = "HOMI";
            rsp = new WhoAmIResponse(AuthUtil.getClientInfos(request.authInfo));
            break;
        }
        case OpCode.getEphemerals: 
        {
            lastOp = "GETE";
            GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
            String prefixPath = getEphemerals.getPrefixPath();
            Set allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
            List ephemerals = new ArrayList<>();
            if (prefixPath == null || prefixPath.trim().isEmpty() || "/".equals(prefixPath.trim())) 
            {
                ephemerals.addAll(allEphems);
            } 
            else 
            {
                for (String p : allEphems) 
                {
                    if (p.startsWith(prefixPath)) 
                    {
                        ephemerals.add(p);
                    }
                }
            }

            rsp = new GetEphemeralsResponse(ephemerals);
            break;
        }
    }
} 
catch (SessionMovedException e) 
{
    cnxn.sendCloseSession();
    return;
} 
catch (KeeperException e) 
{
    err = e.code();
} 
catch (Exception e) 
{
    LOG.error("Failed to process {}", request, e);
    StringBuilder sb = new StringBuilder();
    ByteBuffer bb = request.request;
    bb.rewind();
    while (bb.hasRemaining()) 
    {
        sb.append(Integer.toHexString(bb.get() & 0xff));
    }
    
    LOG.error("Dumping request buffer: 0x{}", sb.toString());
    err = Code.MARSHALLINGERROR;
}

ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
updateStats(request, lastOp, lastZxid);
try 
{
    if (path == null || rsp == null) 
    {
        responseSize = cnxn.sendResponse(hdr, rsp, "response");
    } 
    else 
    {
        int opCode = request.type;
        Stat stat = null;
        switch (opCode) 
        {
            case OpCode.getdata: 
            {
                GetDataResponse getDataResponse = (GetDataResponse) rsp;
                stat = getDataResponse.getStat();
                responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
                break;
            }
            case OpCode.getChildren2 : 
            {
                GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;
                stat = getChildren2Response.getStat();
                responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
                break;
            }
            default:
                responseSize = cnxn.sendResponse(hdr, rsp, "response");
        }
    }

    if (request.type == OpCode.closeSession) 
    {
        cnxn.sendCloseSession();
    }
} 
catch (IOException e) 
{
    LOG.error("FIXMSG", e);
} 
finally 
{
    ServerMetrics.getMetrics().RESPONSE_BYTES.add(responseSize);
}
applyRequest
// 对请求进行处理
ProcessTxnResult rc = zks.processTxn(request);
// 关闭会话
if (request.type == OpCode.closeSession && connClosedByClient(request)) 
{
    if (closeSession(zks.serverCnxnFactory, request.sessionId) 
    	|| closeSession(zks.secureServerCnxnFactory, request.sessionId)) 
    {
        return rc;
    }
}

// 请求是写请求
if (request.getHdr() != null) 
{
	// 统计
    long propagationLatency = Time.currentWallTime() - request.getHdr().getTime();
    if (propagationLatency >= 0) 
    {
        ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency);
    }
}

return rc;
finishSessionInit
   try 
   {
       if (valid) 
       {
           if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) 
           {
               serverCnxnFactory.registerConnection(cnxn);
           } 
           else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) 
           {
               secureServerCnxnFactory.registerConnection(cnxn);
           }
       }
   } 
   catch (Exception e) 
   {
       LOG.warn("Failed to register with JMX", e);
   }

   try 
   {
       ConnectResponse rsp = new ConnectResponse(
			0, 
			valid ? cnxn.getSessionTimeout() : 0, 
			valid ? cnxn.getSessionId() : 0, 
			valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
       bos.writeInt(-1, "len");
       rsp.serialize(bos, "connect");
       if (!cnxn.isOldClient) 
       {
           bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
       }
       
       baos.close();
       ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
       bb.putInt(bb.remaining() - 4).rewind();
       cnxn.sendBuffer(bb);
       if (valid) 
       {
           LOG.debug("Established session 0x{} with negotiated timeout {} for client {}", 
           		Long.toHexString(cnxn.getSessionId()), cnxn.getSessionTimeout(), 
           		cnxn.getRemoteSocketAddress());
           // 开启连接上数据接收
           cnxn.enableRecv();
       } 
       else 
       {
           LOG.info("Invalid session 0x{} for client {}, probably expired", 
           		Long.toHexString(cnxn.getSessionId()), cnxn.getRemoteSocketAddress());
           // 发送会话终止标记
           cnxn.sendBuffer(ServerCnxnFactory.closeConn);
       }
   } 
   catch (Exception e) 
   {
       LOG.warn("Exception while establishing session, closing", e);
       cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
   }
processTxn
// 获取请求的事务头
TxnHeader hdr = request.getHdr();
// 针对创建会话&关闭会话进行处理
processTxnForSessionEvents(request, hdr, request.getTxn());
// 写请求
final boolean writeRequest = (hdr != null);
// 集群请求
final boolean quorumRequest = request.isQuorum();
// 不是写请求
// 且不是集群请求
if (!writeRequest && !quorumRequest) 
{
	// 直接返回处理结果
    return new ProcessTxnResult();
}

synchronized (outstandingChanges) 
{
    ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
    	if (hdr == null) 
        {
            return new ProcessTxnResult();
        } 
        else 
        {
            return getZKDatabase().processTxn(hdr, txn, digest);
            	dataTree.processTxn(hdr, txn, digest);
            		ProcessTxnResult result = processTxn(header, txn);
            			this.processTxn(header, txn, false);
			        compareDigest(header, txn, digest);
			        return result;
        }
    // 如果是写请求
    if (writeRequest) 
    {
    	// 获取zxid
        long zxid = hdr.getZxid();
        // 如果 存在处理中ChangeRecord
        // 且 首个ChangeRecord的zxid小于这里请求的zxid
        while (!outstandingChanges.isEmpty() && outstandingChanges.peek().zxid <= zxid) 
        {
        	// 移除
            ChangeRecord cr = outstandingChanges.remove();
            ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
            if (cr.zxid < zxid) 
            {
                LOG.warn("Zxid outstanding 0x{} is less than current 0x{}", 
                	Long.toHexString(cr.zxid), Long.toHexString(zxid));
            }
            
            // 处理路径的最新状态信息
            if (outstandingChangesForPath.get(cr.path) == cr) 
            {
            	// path的关联项从outstandingChangesForPath移除
            	// 因为path的关联项已经持久化到数据实体对象中了
                outstandingChangesForPath.remove(cr.path);
            }
        }
    }
	
	// 集群请求
    if (quorumRequest) 
    {
        getZKDatabase().addCommittedProposal(request);
    }
    
    return rc;
}
addCommittedProposal

对最近一定数量的已经完成最后处理的请求对象,
构造关联提议对象,
将其存储起来,以备需要时使用

// 写锁
WriteLock wl = logLock.writeLock();
try 
{
	// 锁定
    wl.lock();
    // 容器中提议对象数量超过commitLogCount
    if (committedLog.size() > commitLogCount) 
    {
    	// 从容器移除一个元素
        committedLog.remove();
        // 获取首个提议对象对应集群包中的zxid---容器中所有提议中最小的zxid
        minCommittedLog = committedLog.peek().packet.getZxid();
    }
    
    // committedLog变空了
    if (committedLog.isEmpty()) 
    {
    	// 容器中所有提议中最小的zxid,最大的zxid此时均为处理掉请求的zxid
        minCommittedLog = request.zxid;
        maxCommittedLog = request.zxid;
    }
	
	// 请求对象序列化
    byte[] data = SerializeUtils.serializeRequest(request);
    // 构造集群包
    // 包类型Leader.PROPOSAL
    // 包zxid为处理掉对象的zxid
    // 包数据为对象序列化所得
    QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
    // 动态构造新的提议对象
    Proposal p = new Proposal();
    p.packet = pp;
    p.request = request;
    // 将提议对象加入committedLog
    committedLog.add(p);
    // 更新committedLog中所有提议的最大的zxid
    maxCommittedLog = p.packet.getZxid();
} 
finally 
{
    wl.unlock();
}
processTxnForSessionEvents(request, hdr, request.getTxn());
// 请求类型
int opCode = (request == null) ? hdr.getType() : request.type;
// 客户端id
long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
// 创建会话
if (opCode == OpCode.createSession) 
{
	// 事务头非空
	// 且 事务体为CreateSessionTxn
    if (hdr != null && txn instanceof CreateSessionTxn) 
    {
        CreateSessionTxn cst = (CreateSessionTxn) txn;
        // 向会话追踪提交 会话id--超时设置
        sessionTracker.commitSession(sessionId, cst.getTimeOut());
    } 
    // 请求为空
    // 且 请求不是本地会话
    else if (request == null || !request.isLocalSession()) 
    {
        LOG.warn("*****>>>>> Got {} {}",  txn.getClass(), txn.toString());
    }
} 
// 关闭会话
else if (opCode == OpCode.closeSession) 
{
	// 向会话追踪移除会话
    sessionTracker.removeSession(sessionId);
}
集群请求判断
public boolean isQuorum() 
{
    switch (this.type) 
    {
    // 读取类
    case OpCode.exists:
    case OpCode.getACL:
    case OpCode.getChildren:
    case OpCode.getAllChildrenNumber:
    case OpCode.getChildren2:
    case OpCode.getdata:
    case OpCode.getEphemerals:
    case OpCode.multiRead:
    case OpCode.whoAmI:
        return false;
    // 写入类 
    case OpCode.create:
    case OpCode.create2:
    case OpCode.createTTL:
    case OpCode.createContainer:
    case OpCode.error:
    case OpCode.delete:
    case OpCode.deleteContainer:
    case OpCode.setACL:
    case OpCode.setdata:
    case OpCode.check:
    case OpCode.multi:
    case OpCode.reconfig:
        return true;
    // 会话创建,管理类在请求非本地会话时,属于集群
    case OpCode.closeSession:
    case OpCode.createSession:
        return !this.isLocalSession;
    default:
        return false;
    }
}
processTxn

利用事务头,事务体,实际处理写请求

// 动态构造ProcessTxnResult对象
ProcessTxnResult rc = new ProcessTxnResult();
try 
{
	// 事务头包含客户端id
    rc.clientId = header.getClientId();
    // 事务头包含cxid
    rc.cxid = header.getCxid();
    // 事务头包含zxid
    rc.zxid = header.getZxid();
    // 事务头包含类别
    rc.type = header.getType();
    // 事务头包含错误码
    rc.err = 0;
    // 事务头包含multiResult
    rc.multiResult = null;
    switch (header.getType()) 
    {
    // 创建节点
    case OpCode.create:
    	// 事务体
        CreateTxn createTxn = (CreateTxn) txn;
        // 节点路径
        rc.path = createTxn.getPath();
        createNode(
        	// 路径
        	createTxn.getPath(), 
        	// 数据
        	createTxn.getData(), 
        	// 权限
        	createTxn.getAcl(), 
        	// 临时节点包含所属客户端id
        	createTxn.getEphemeral() ? header.getClientId() : 0, 
        	// 父节点累计改变版本
        	createTxn.getParentCVersion(),
        	// zxid 
        	header.getZxid(), 
        	// 时间
        	header.getTime(), 
        	null);
        break;
    case OpCode.create2:
        CreateTxn create2Txn = (CreateTxn) txn;
        rc.path = create2Txn.getPath();
        Stat stat = new Stat();
        createNode(
        	create2Txn.getPath(), 
        	create2Txn.getData(), 
        	create2Txn.getAcl(), 
        	create2Txn.getEphemeral() ? header.getClientId() : 0, 
        	create2Txn.getParentCVersion(), 
        	header.getZxid(), 
        	header.getTime(), stat);
        // 获取节点状态
        rc.stat = stat;
        break;
    case OpCode.createTTL:
        CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
        rc.path = createTtlTxn.getPath();
        stat = new Stat();
        createNode(
        	createTtlTxn.getPath(), 
        	createTtlTxn.getData(), 
        	createTtlTxn.getAcl(), 
        	EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), 
        	createTtlTxn.getParentCVersion(), 
        	header.getZxid(), 
        	header.getTime(), 
        	stat);
        // 获取节点状态
        rc.stat = stat;
        break;
    case OpCode.createContainer:
        CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
        rc.path = createContainerTxn.getPath();
        stat = new Stat();
        createNode(
        	createContainerTxn.getPath(), 
        	createContainerTxn.getData(), 
        	createContainerTxn.getAcl(), 
        	EphemeralType.CONTAINER_EPHEMERAL_OWNER, 
        	createContainerTxn.getParentCVersion(), 
        	header.getZxid(), 
        	header.getTime(), 
        	stat);
        rc.stat = stat;
        break;
    case OpCode.delete:
    case OpCode.deleteContainer:
        DeleteTxn deleteTxn = (DeleteTxn) txn;
        rc.path = deleteTxn.getPath();
        deleteNode(deleteTxn.getPath(), header.getZxid());
        break;
    case OpCode.reconfig:
    case OpCode.setdata:
        SetDataTxn setDataTxn = (SetDataTxn) txn;
        rc.path = setDataTxn.getPath();
        rc.stat = setData(
        	setDataTxn.getPath(), 
        	setDataTxn.getData(), 
        	setDataTxn.getVersion(), 
        	header.getZxid(), 
        	header.getTime());
        break;
    case OpCode.setACL:
        SetACLTxn setACLTxn = (SetACLTxn) txn;
        rc.path = setACLTxn.getPath();
        rc.stat = setACL(
        	setACLTxn.getPath(), 
        	setACLTxn.getAcl(), 
        	setACLTxn.getVersion());
        break;
    case OpCode.closeSession:
        long sessionId = header.getClientId();
        if (txn != null) 
        {
            killSession(
            	sessionId, 
            	header.getZxid(), 
            	ephemerals.remove(sessionId), 
            	((CloseSessionTxn) txn).getPaths2Delete());
        } 
        else 
        {
            killSession(sessionId, header.getZxid());
        }
        break;
    case OpCode.error:
        ErrorTxn errTxn = (ErrorTxn) txn;
        // 错误码
        rc.err = errTxn.getErr();
        break;
    case OpCode.check:
    	// 检查
        CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
        rc.path = checkTxn.getPath();
        break;
    case OpCode.multi:
    	// 复合 *** 作
    	// 复合 *** 作的事务体
        MultiTxn multiTxn = (MultiTxn) txn;
        // 取出复合的多个Txn
        List txns = multiTxn.getTxns();
        // 动态构造用于存储多个处理结果的容器
        rc.multiResult = new ArrayList();
        boolean failed = false;
        // 依次处理每个子事务
        for (Txn subtxn : txns) 
        {
        	// 一旦有一个子事务之前已经判定为失败
            if (subtxn.getType() == OpCode.error) 
            {
            	// 整体失败,后续不处理
                failed = true;
                break;
            }
        }
		
        boolean post_failed = false;
        // 对每个子事务依次处理
        for (Txn subtxn : txns) 
        {
        	// 字节流
            ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
            Record record = null;
            // 子事务类型
            switch (subtxn.getType()) 
            {
            // 创建
            case OpCode.create:
            	// 动态构造
                record = new CreateTxn();
                break;
            case OpCode.createTTL:
                record = new CreateTTLTxn();
                break;
            case OpCode.createContainer:
                record = new CreateContainerTxn();
                break;
            case OpCode.delete:
            case OpCode.deleteContainer:
                record = new DeleteTxn();
                break;
            case OpCode.setdata:
                record = new SetDataTxn();
                break;
            case OpCode.error:
                record = new ErrorTxn();
                post_failed = true;
                break;
            case OpCode.check:
                record = new CheckVersionTxn();
                break;
            default:
                throw new IOException("Invalid type of op: " + subtxn.getType());
            }
            
            assert (record != null);
            // 反向序列化得到请求实体
            ByteBufferInputStream.byteBuffer2Record(bb, record);
            // 如果某个子事务判断为失败
            // 且 当前子事务非失败
            if (failed && subtxn.getType() != OpCode.error) 
            {
            	// 对此子事务,OK
                int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();
                // 设置子事务类型为失败
                subtxn.setType(OpCode.error);
                // 动态构造ErrorTxn
                record = new ErrorTxn(ec);
            }

            assert !failed || (subtxn.getType() == OpCode.error);
            // 动态构造事务头
            TxnHeader subHdr = new TxnHeader(
            	// 客户端id
            	header.getClientId(),
            	// cxid 
            	header.getCxid(),
            	// zxid 
            	header.getZxid(),
            	// 时间 
            	header.getTime(),
            	// 类型 
            	subtxn.getType());
            // 对子事务进行处理---若存在某个子事务为失败,所有其他子事务的事务体会设置为ErrorTxn
            ProcessTxnResult subRc = processTxn(subHdr, record, true);
            // 加入处理结果
            rc.multiResult.add(subRc);
            // 子事务结果为失败
            // 整体结果不为失败
            if (subRc.err != 0 && rc.err == 0) 
            {
            	// 设置整体结果为失败的那个子事务的结果
                rc.err = subRc.err;
            }
        }
        break;
    }
} 
catch (KeeperException e) 
{
    LOG.debug("Failed: {}:{}", header, txn, e);
    rc.err = e.code().intValue();
} 
catch (IOException e) 
{
    LOG.debug("Failed: {}:{}", header, txn, e);
}

// 类型为创建
// 失败原因为节点不存在
if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) 
{
    LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err);
    int lastSlash = rc.path.lastIndexOf('/');
    String parentName = rc.path.substring(0, lastSlash);
    CreateTxn cTxn = (CreateTxn) txn;
    try 
    {
        setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
    } 
    catch (KeeperException.NonodeException e) 
    {
        LOG.error("Failed to set parent cversion for: {}", parentName, e);
        rc.err = e.code().intValue();
    }
} 
else if (rc.err != Code.OK.intValue()) 
{
    LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err);
}

if (!isSubTxn) 
{
    if (rc.zxid > lastProcessedZxid) 
    {
        lastProcessedZxid = rc.zxid;
    }

    if (digestFromLoadedSnapshot != null) 
    {
        compareSnapshotDigests(rc.zxid);
    } 
    else 
    {
        logZxidDigest(rc.zxid, getTreeDigest());
    }
}

return rc;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5710535.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存