case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { follower.shutdown(); setFollower(null); updateServerState(); } break; case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } updateServerState(); } 上一篇讲到进行了选举结束,那么每个节点的角色会确定,这一篇,看看follower和leader角色分别会做什么 *** 作,先看leader
如果该服务器是leader角色,那么会调用makeLeader方法,这个方法里会new Leader,直接看Leader的构造函数
public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException { this.self = self; this.proposalStats = new BufferStats(); Setaddresses; if (self.getQuorumListenonAllIPs()) { addresses = self.getQuorumAddress().getWildcardAddresses(); } else { addresses = self.getQuorumAddress().getAllAddresses(); } addresses.stream() .map(address -> createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum())) .filter(Optional::isPresent) .map(Optional::get) .forEach(serverSockets::add); if (serverSockets.isEmpty()) { throw new IOException("Leader failed to initialize any of the following sockets: " + addresses); } this.zk = zk; }
可以看到这里是为每个ip创建ServerSocket,由此我们可以知道服务端之间的通讯是走的BIO。为什么这里走bio呢,因为zk服务端不多呀,撑死一个集群就10台-30台服务器吧,用bio处理是够用了。
创建好所有的ServerSocket后会调用lead方法,lead方法里面履行的是leader的职责,这里代码挺多的,总结一下,这坨代码做了以下几件事情。
1、zk.loadData()
2、开始LearnCnxAcceptor线程
3、设置一通QuorumVerifier(妈呀,这个玩意是干嘛的)
4、等待其他节点选举的ack
5、startZkServer()
6、上报心跳
这6个步骤里面最重要的是LearnCnxAcceptor线程了,我们来看看里面做的是什么 *** 作
顾名思义,这个线程事接收follower节点请求的。
public void run() { try { Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress()); while (!stop.get()) { acceptConnections(); } } catch (Exception e) { LOG.warn("Exception while accepting follower", e); if (fail.compareAndSet(false, true)) { handleException(getName(), e); halt(); } } finally { latch.countDown(); } }
private void acceptConnections() throws IOException { Socket socket = null; boolean error = false; try { socket = serverSocket.accept(); // start with the initLimit, once the ack is processed // in LearnerHandler switch to the syncLimit socket.setSoTimeout(self.tickTime * self.initLimit); socket.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream(socket.getInputStream()); LearnerHandler fh = new LearnerHandler(socket, is, Leader.this); fh.start(); } catch (SocketException e) { error = true; if (stop.get()) { LOG.warn("Exception while shutting down acceptor.", e); } else { throw e; } } catch (SaslException e) { LOG.error("Exception while connecting to quorum learner", e); error = true; } catch (Exception e) { error = true; throw e; } finally { // Don't leak sockets on errors if (error && socket != null && !socket.isClosed()) { try { socket.close(); } catch (IOException e) { LOG.warn("Error closing socket: " + socket, e); } } } } }
将接收到的请求流,交给LearnerHandler去处理,这个handler里面封装了zk server之间通讯的逻辑,要好好看看,遇到难题,更沉下心来。这个Handler是一个线程,看它的run方法。
@Override public void run() { try { learnerMaster.addLearnerHandler(this); tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline(); ia = BinaryInputArchive.getArchive(bufferedInput); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); oa = BinaryOutputArchive.getArchive(bufferedOutput); QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); messageTracker.trackReceived(qp.getType()); if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) { LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString()); return; } if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) { throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType()); } byte[] learnerInfoData = qp.getData(); if (learnerInfoData != null) { ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData); if (learnerInfoData.length >= 8) { this.sid = bbsid.getLong(); } if (learnerInfoData.length >= 12) { this.version = bbsid.getInt(); // protocolVersion } if (learnerInfoData.length >= 20) { long configVersion = bbsid.getLong(); if (configVersion > learnerMaster.getQuorumVerifierVersion()) { throw new IOException("Follower is ahead of the leader (has a later activated configuration)"); } } } else { this.sid = learnerMaster.getAndDecrementFollowerCounter(); } String followerInfo = learnerMaster.getPeerInfo(this.sid); if (followerInfo.isEmpty()) { LOG.info( "Follower sid: {} not in the current config {}", this.sid, Long.toHexString(learnerMaster.getQuorumVerifierVersion())); } else { LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo); } if (qp.getType() == Leader.OBSERVERINFO) { learnerType = LearnerType.OBSERVER; } learnerMaster.registerLearnerHandlerBean(this, sock); long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); if (this.getVersion() < 0x10000) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message learnerMaster.waitForEpochAck(this.getSid(), ss); } else { byte[] ver = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); messageTracker.trackSent(Leader.LEADERINFO); bufferedOutput.flush(); QuorumPacket ackEpochPacket = new QuorumPacket(); ia.readRecord(ackEpochPacket, "packet"); messageTracker.trackReceived(ackEpochPacket.getType()); if (ackEpochPacket.getType() != Leader.ACKEPOCH) { LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString()); return; } ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData()); ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid()); learnerMaster.waitForEpochAck(this.getSid(), ss); } peerLastZxid = ss.getLastZxid(); // Take any necessary action if we need to send TRUNC or DIFF // startForwarding() will be called in all cases boolean needSnap = syncFollower(peerLastZxid, learnerMaster); // syncs between followers and the leader are exempt from throttling because it // is important to keep the state of quorum servers up-to-date. The exempted syncs // are counted as concurrent syncs though boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER; if (needSnap) { syncThrottler = learnerMaster.getLearnerSnapSyncThrottler(); syncThrottler.beginSync(exemptFromThrottle); ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress()); try { long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid(); oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet"); messageTracker.trackSent(Leader.SNAP); bufferedOutput.flush(); LOG.info( "Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, " + "send zxid of db as 0x{}, {} concurrent snapshot sync, " + "snapshot sync was {} from throttle", Long.toHexString(peerLastZxid), Long.toHexString(leaderLastZxid), Long.toHexString(zxidToSend), syncThrottler.getSyncInProgress(), exemptFromThrottle ? "exempt" : "not exempt"); // Dump data to peer learnerMaster.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); bufferedOutput.flush(); } finally { ServerMetrics.getMetrics().SNAP_COUNT.add(1); } } else { syncThrottler = learnerMaster.getLearnerDiffSyncThrottler(); syncThrottler.beginSync(exemptFromThrottle); ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress()); ServerMetrics.getMetrics().DIFF_COUNT.add(1); } LOG.debug("Sending NEWLEADER message to {}", sid); // the version of this quorumVerifier will be set by leader.lead() in case // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if // we got here, so the version was set if (getVersion() < 0x10000) { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); oa.writeRecord(newLeaderQP, "packet"); } else { QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null); queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); // Start thread that blast packets in the queue to learner startSendingPackets(); qp = new QuorumPacket(); ia.readRecord(qp, "packet"); messageTracker.trackReceived(qp.getType()); if (qp.getType() != Leader.ACK) { LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp)); return; } LOG.debug("Received NEWLEADER-ACK message from {}", sid); learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid()); syncLimitCheck.start(); // sync ends when NEWLEADER-ACK is received syncThrottler.endSync(); if (needSnap) { ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress()); } else { ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress()); } syncThrottler = null; // now that the ack has been processed expect the syncLimit sock.setSoTimeout(learnerMaster.syncTimeout()); learnerMaster.waitForStartup(); // Mutation packets will be queued during the serialize, // so we need to mark when the peer can actually start // using the data // LOG.debug("Sending UPTODATE message to {}", sid); queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); messageTracker.trackReceived(qp.getType()); if (LOG.isTraceEnabled()) { long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK; if (qp.getType() == Leader.PING) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp); } tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline(); packetsReceived.incrementAndGet(); ByteBuffer bb; long sessionId; int cxid; int type; switch (qp.getType()) { case Leader.ACK: if (this.learnerType == LearnerType.OBSERVER) { LOG.debug("Received ACK from Observer {}", this.sid); } syncLimitCheck.updateAck(qp.getZxid()); learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); break; case Leader.PING: // Process the touches ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData()); DataInputStream dis = new DataInputStream(bis); while (dis.available() > 0) { long sess = dis.readLong(); int to = dis.readInt(); learnerMaster.touch(sess, to); } break; case Leader.RevalIDATE: ServerMetrics.getMetrics().RevalIDATE_COUNT.add(1); learnerMaster.revalidateSession(qp, this); break; case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData()); sessionId = bb.getLong(); cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); Request si; if (type == OpCode.sync) { si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); } else { si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); } si.setOwner(this); learnerMaster.submitLearnerRequest(si); requestsReceived.incrementAndGet(); break; default: LOG.warn("unexpected quorum packet, type: {}", packetToString(qp)); break; } } } catch (IOException e) { LOG.error("Unexpected exception in LearnerHandler: ", e); closeSocket(); } catch (InterruptedException e) { LOG.error("Unexpected exception in LearnerHandler.", e); } catch (SyncThrottleException e) { LOG.error("too many concurrent sync.", e); syncThrottler = null; } catch (Exception e) { LOG.error("Unexpected exception in LearnerHandler.", e); throw e; } finally { if (syncThrottler != null) { syncThrottler.endSync(); syncThrottler = null; } String remoteAddr = getRemoteAddress(); LOG.warn("******* GOODBYE {} ********", remoteAddr); messageTracker.dumpToLog(remoteAddr); shutdown(); } }
代码这么多看晕了都,耐下性子看吧。
先来看看zk给的注释
意思就是说这个线程会接收并处理其他节点的请求包,并且也监听新的链接请求。
下一节分别将这个巨长的方法拆分
1、接收并处理请求包、
2、监听新的连接请求
两部分进行分析
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)