启动日志
2022-01-17 15:30:38,815 [myid:] - INFO [main:ZooKeeperServer@836] - tickTime set to 15000 2022-01-17 15:30:38,815 [myid:] - INFO [main:ZooKeeperServer@845] - minSessionTimeout set to -1 2022-01-17 15:30:38,815 [myid:] - INFO [main:ZooKeeperServer@854] - maxSessionTimeout set to -1
set to -1 含义
zookeeper配置
[combscheduler@host1 conf]$ more zoo.cfg # The number of milliseconds of each tick tickTime=15000 # The number of ticks that the initial # synchronization phase can take initLimit=20 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=10 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. #dataDir=/tmp/zookeeper # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 #最大的客户端session超时时间,默认值为20个tickTime,单位是毫秒 #maxSessionTimeout=300000 #最小的客户端session超时时间,默认值为2个tickTime,单位是毫秒 #minSessionTimeout=10000
源码
package org.apache.zookeeper.server; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.linkedList; import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslException; import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.Environment; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.StatPersisted; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.proto.AuthPacket; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ConnectResponse; import org.apache.zookeeper.proto.GetSASLRequest; import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.proto.SetSASLResponse; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException; import org.apache.zookeeper.server.ServerCnxn.CloseRequestException; import org.apache.zookeeper.server.SessionTracker.Session; import org.apache.zookeeper.server.SessionTracker.SessionExpirer; import org.apache.zookeeper.server.auth.AuthenticationProvider; import org.apache.zookeeper.server.auth.ProviderRegistry; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.TxnHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { protected static final Logger LOG; static { LOG = LoggerFactory.getLogger(ZooKeeperServer.class); Environment.logEnv("Server environment:", LOG); } protected ZooKeeperServerBean jmxServerBean; protected DataTreeBean jmxDataTreeBean; public interface DataTreeBuilder { public DataTree build(); } static public class BasicDataTreeBuilder implements DataTreeBuilder { public DataTree build() { return new DataTree(); } } public static final int DEFAULT_TICK_TIME = 3000; protected int tickTime = DEFAULT_TICK_TIME; protected int minSessionTimeout = -1; protected int maxSessionTimeout = -1; protected SessionTracker sessionTracker; private FileTxnSnapLog txnLogFactory = null; private ZKDatabase zkDb; private final AtomicLong hzxid = new AtomicLong(0); public final static Exception ok = new Exception("No prob"); protected RequestProcessor firstProcessor; protected volatile State state = State.INITIAL; protected enum State { INITIAL, RUNNING, SHUTDOWN, ERROR; } static final private long superSecret = 0xB3415C00L; private final AtomicInteger requestsInProcess = new AtomicInteger(0); final ListoutstandingChanges = new ArrayList (); // this data structure must be accessed under the outstandingChanges lock final HashMap outstandingChangesForPath = new HashMap (); private ServerCnxnFactory serverCnxnFactory; private final ServerStats serverStats; private final ZooKeeperServerListener listener; private ZooKeeperServerShutdownHandler zkShutdownHandler; void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); } public ZooKeeperServer() { serverStats = new ServerStats(this); listener = new ZooKeeperServerListenerImpl(this); } public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb) { serverStats = new ServerStats(this); this.txnLogFactory = txnLogFactory; this.txnLogFactory.setServerStats(this.serverStats); this.zkDb = zkDb; this.tickTime = tickTime; this.minSessionTimeout = minSessionTimeout; this.maxSessionTimeout = maxSessionTimeout; listener = new ZooKeeperServerListenerImpl(this); LOG.info("Created server with tickTime " + tickTime + " minSessionTimeout " + getMinSessionTimeout() + " maxSessionTimeout " + getMaxSessionTimeout() + " datadir " + txnLogFactory.getDataDir() + " snapdir " + txnLogFactory.getSnapDir()); } public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException { this(txnLogFactory, tickTime, -1, -1, treeBuilder, new ZKDatabase(txnLogFactory)); } public ServerStats serverStats() { return serverStats; } public void dumpConf(PrintWriter pwriter) { pwriter.print("clientPort="); pwriter.println(getClientPort()); pwriter.print("dataDir="); pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath()); pwriter.print("dataLogDir="); pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath()); pwriter.print("tickTime="); pwriter.println(getTickTime()); pwriter.print("maxClientCnxns="); pwriter.println(serverCnxnFactory.getMaxClientCnxnsPerHost()); pwriter.print("minSessionTimeout="); pwriter.println(getMinSessionTimeout()); pwriter.print("maxSessionTimeout="); pwriter.println(getMaxSessionTimeout()); pwriter.print("serverId="); pwriter.println(getServerId()); } public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { this( new FileTxnSnapLog(snapDir, logDir), tickTime, new BasicDataTreeBuilder()); } public ZooKeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException { this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, treeBuilder, new ZKDatabase(txnLogFactory)); } public ZKDatabase getZKDatabase() { return this.zkDb; } public void setZKDatabase(ZKDatabase zkDb) { this.zkDb = zkDb; } public void loadData() throws IOException, InterruptedException { if(zkDb.isInitialized()){ setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { setZxid(zkDb.loadDatabase()); } // Clean up dead sessions linkedList deadSessions = new linkedList (); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } zkDb.setDataTreeInit(true); for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } } public void takeSnapshot(){ try { txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts()); } catch (IOException e) { LOG.error("Severe unrecoverable error, exiting", e); // This is a severe error that we cannot recover from, // so we need to exit System.exit(10); } } public long getZxid() { return hzxid.get(); } long getNextZxid() { return hzxid.incrementAndGet(); } public void setZxid(long zxid) { hzxid.set(zxid); } private void close(long sessionId) { submitRequest(null, sessionId, OpCode.closeSession, 0, null, null); } public void closeSession(long sessionId) { LOG.info("Closing session 0x" + Long.toHexString(sessionId)); // we do not want to wait for a session close. send it as soon as we // detect it! close(sessionId); } protected void killSession(long sessionId, long zxid) { zkDb.killSession(sessionId, zxid); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId)); } if (sessionTracker != null) { sessionTracker.removeSession(sessionId); } } public void expire(Session session) { long sessionId = session.getSessionId(); LOG.info("Expiring session 0x" + Long.toHexString(sessionId) + ", timeout of " + session.getTimeout() + "ms exceeded"); close(sessionId); } public static class MissingSessionException extends IOException { private static final long serialVersionUID = 7467414635467261007L; public MissingSessionException(String msg) { super(msg); } } void touch(ServerCnxn cnxn) throws MissingSessionException { if (cnxn == null) { return; } long id = cnxn.getSessionId(); int to = cnxn.getSessionTimeout(); if (!sessionTracker.touchSession(id, to)) { throw new MissingSessionException( "No session with sessionid 0x" + Long.toHexString(id) + " exists, probably expired and removed"); } } protected void registerJMX() { // register with JMX try { jmxServerBean = new ZooKeeperServerBean(this); MBeanRegistry.getInstance().register(jmxServerBean, null); try { jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxDataTreeBean = null; } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxServerBean = null; } } public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } if (!zkDb.isInitialized()) { loadData(); } } public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); registerJMX(); setState(State.RUNNING); notifyAll(); } protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); } public ZooKeeperServerListener getZooKeeperServerListener() { return listener; } protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1, getZooKeeperServerListener()); } protected void startSessionTracker() { ((SessionTrackerImpl)sessionTracker).start(); } protected void setState(State state) { this.state = state; // Notify server state changes to the registered shutdown handler, if any. if (zkShutdownHandler != null) { zkShutdownHandler.handle(state); } else { LOG.debug("ZKShutdownHandler is not registered, so ZooKeeper server " + "won't take any action on ERROR or SHUTDOWN server state changes"); } } protected boolean canShutdown() { return state == State.RUNNING || state == State.ERROR; } public boolean isRunning() { return state == State.RUNNING; } public void shutdown() { shutdown(false); } public synchronized void shutdown(boolean fullyShutDown) { if (!canShutdown()) { LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); return; } LOG.info("shutting down"); // new RuntimeException("Calling shutdown").printStackTrace(); setState(State.SHUTDOWN); // Since sessionTracker and syncThreads poll we just have to // set running to false and they will detect it during the poll // interval. if (sessionTracker != null) { sessionTracker.shutdown(); } if (firstProcessor != null) { firstProcessor.shutdown(); } if (zkDb != null) { if (fullyShutDown) { zkDb.clear(); } else { // else there is no need to clear the database // * When a new quorum is established we can still apply the diff // on top of the same zkDb data // * If we fetch a new snapshot from leader, the zkDb will be // cleared anyway before loading the snapshot try { //This will fast forward the database to the latest recorded transactions zkDb.fastForwardDatabase(); } catch (IOException e) { LOG.error("Error updating DB", e); zkDb.clear(); } } } unregisterJMX(); } protected void unregisterJMX() { // unregister from JMX try { if (jmxDataTreeBean != null) { MBeanRegistry.getInstance().unregister(jmxDataTreeBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } try { if (jmxServerBean != null) { MBeanRegistry.getInstance().unregister(jmxServerBean); } } catch (Exception e) { LOG.warn("Failed to unregister with JMX", e); } jmxServerBean = null; jmxDataTreeBean = null; } public void incInProcess() { requestsInProcess.incrementAndGet(); } public void decInProcess() { requestsInProcess.decrementAndGet(); } public int getInProcess() { return requestsInProcess.get(); } static class ChangeRecord { ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List acl) { this.zxid = zxid; this.path = path; this.stat = stat; this.childCount = childCount; this.acl = acl; } long zxid; String path; StatPersisted stat; int childCount; List acl; @SuppressWarnings("unchecked") ChangeRecord duplicate(long zxid) { StatPersisted stat = new StatPersisted(); if (this.stat != null) { DataTree.copyStatPersisted(this.stat, stat); } return new ChangeRecord(zxid, path, stat, childCount, acl == null ? new ArrayList() : new ArrayList(acl)); } } byte[] generatePasswd(long id) { Random r = new Random(id ^ superSecret); byte p[] = new byte[16]; r.nextBytes(p); return p; } protected boolean checkPasswd(long sessionId, byte[] passwd) { return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId)); } long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId; } public void setOwner(long id, Object owner) throws SessionExpiredException { sessionTracker.setOwner(id, owner); } protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); } finishSessionInit(cnxn, rc); } public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { if (!checkPasswd(sessionId, passwd)) { finishSessionInit(cnxn, false); } else { revalidateSession(cnxn, sessionId, sessionTimeout); } } public void finishSessionInit(ServerCnxn cnxn, boolean valid) { // register with JMX try { if (valid) { serverCnxnFactory.registerConnection(cnxn); } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); } try { ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no // longer valid valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); if (!cnxn.isOldClient) { bos.writeBool( this instanceof ReadOnlyZooKeeperServer, "readOnly"); } baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); cnxn.sendBuffer(bb); if (!valid) { LOG.info("Invalid session 0x" + Long.toHexString(cnxn.getSessionId()) + " for client " + cnxn.getRemoteSocketAddress() + ", probably expired"); cnxn.sendBuffer(ServerCnxnFactory.closeConn); } else { LOG.info("Established session 0x" + Long.toHexString(cnxn.getSessionId()) + " with negotiated timeout " + cnxn.getSessionTimeout() + " for client " + cnxn.getRemoteSocketAddress()); cnxn.enableRecv(); } } catch (Exception e) { LOG.warn("Exception while establishing session, closing", e); cnxn.close(); } } public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) { closeSession(cnxn.getSessionId()); } public long getServerId() { return 0; } private void submitRequest(ServerCnxn cnxn, long sessionId, int type, int xid, ByteBuffer bb, List authInfo) { Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); submitRequest(si); } public void submitRequest(Request si) { if (firstProcessor == null) { synchronized (this) { try { // Since all requests are passed to the request // processor it should wait for setting up the request // processor chain. The state will be updated to RUNNING // after the setup. while (state == State.INITIAL) { wait(1000); } } catch (InterruptedException e) { LOG.warn("Unexpected interruption", e); } if (firstProcessor == null || state != State.RUNNING) { throw new RuntimeException("Not started"); } } } try { touch(si.cnxn); boolean validpacket = Request.isValid(si.type); if (validpacket) { firstProcessor.processRequest(si); if (si.cnxn != null) { incInProcess(); } } else { LOG.warn("Received packet at server of unknown type " + si.type); new UnimplementedRequestProcessor().processRequest(si); } } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } } public static int getSnapCount() { String sc = System.getProperty("zookeeper.snapCount"); try { int snapCount = Integer.parseInt(sc); // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor if( snapCount < 2 ) { LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2"); snapCount = 2; } return snapCount; } catch (Exception e) { return 100000; } } public int getGlobalOutstandingLimit() { String sc = System.getProperty("zookeeper.globalOutstandingLimit"); int limit; try { limit = Integer.parseInt(sc); } catch (Exception e) { limit = 1000; } return limit; } public void setServerCnxnFactory(ServerCnxnFactory factory) { serverCnxnFactory = factory; } public ServerCnxnFactory getServerCnxnFactory() { return serverCnxnFactory; } public long getLastProcessedZxid() { return zkDb.getDataTreeLastProcessedZxid(); } public long getOutstandingRequests() { return getInProcess(); } public void truncateLog(long zxid) throws IOException { this.zkDb.truncateLog(zxid); } public int getTickTime() { return tickTime; } public void setTickTime(int tickTime) { LOG.info("tickTime set to " + tickTime); this.tickTime = tickTime; } public int getMinSessionTimeout() { return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout; } public void setMinSessionTimeout(int min) { LOG.info("minSessionTimeout set to " + min); this.minSessionTimeout = min; } public int getMaxSessionTimeout() { return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout; } public void setMaxSessionTimeout(int max) { LOG.info("maxSessionTimeout set to " + max); this.maxSessionTimeout = max; } public int getClientPort() { return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1; } public void setTxnLogFactory(FileTxnSnapLog txnLog) { this.txnLogFactory = txnLog; } public FileTxnSnapLog getTxnLogFactory() { return this.txnLogFactory; } public String getState() { return "standalone"; } public void dumpEphemerals(PrintWriter pwriter) { zkDb.dumpEphemerals(pwriter); } public int getNumAliveConnections() { return serverCnxnFactory.getNumAliveConnections(); } public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); ConnectRequest connReq = new ConnectRequest(); connReq.deserialize(bia, "connect"); if (LOG.isDebugEnabled()) { LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress() + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen())); } boolean readonly = false; try { readonly = bia.readBool("readOnly"); cnxn.isOldClient = false; } catch (IOException e) { // this is ok -- just a packet from an old client which // doesn't contain readonly field LOG.warn("Connection request from old client " + cnxn.getRemoteSocketAddress() + "; will be dropped if server is in r-o mode"); } if (readonly == false && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg); } if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" + Long.toHexString(connReq.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; LOG.info(msg); throw new CloseRequestException(msg); } int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); long sessionId = connReq.getSessionId(); if (sessionId != 0) { long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); } } public boolean shouldThrottle(long outStandingCount) { if (getGlobalOutstandingLimit() < getInProcess()) { return outStandingCount > 0; } return false; } public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { // We have the request, now process and setup for next InputStream bais = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); RequestHeader h = new RequestHeader(); h.deserialize(bia, "header"); // Through the magic of byte buffers, txn will not be // pointing // to the start of the txn incomingBuffer = incomingBuffer.slice(); if (h.getType() == OpCode.auth) { LOG.info("got auth packet " + cnxn.getRemoteSocketAddress()); AuthPacket authPacket = new AuthPacket(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); String scheme = authPacket.getScheme(); AuthenticationProvider ap = ProviderRegistry.getProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; if(ap != null) { try { authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth()); } catch(RuntimeException e) { LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e); authReturn = KeeperException.Code.AUTHFAILED; } } if (authReturn!= KeeperException.Code.OK) { if (ap == null) { LOG.warn("No authentication provider for scheme: " + scheme + " has " + ProviderRegistry.listProviders()); } else { LOG.warn("Authentication failed for scheme: " + scheme); } // send a response... ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); cnxn.sendResponse(rh, null, null); // ... and close connection cnxn.sendBuffer(ServerCnxnFactory.closeConn); cnxn.disableRecv(); } else { if (LOG.isDebugEnabled()) { LOG.debug("Authentication succeeded for scheme: " + scheme); } LOG.info("auth success " + cnxn.getRemoteSocketAddress()); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh, null, null); } return; } else { if (h.getType() == OpCode.sasl) { Record rsp = processSasl(incomingBuffer,cnxn); ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it? return; } else { Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); si.setOwner(ServerCnxn.me); submitRequest(si); } } cnxn.incrOutstandingRequests(h); } private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException { LOG.debug("Responding to client SASL token."); GetSASLRequest clientTokenRecord = new GetSASLRequest(); ByteBufferInputStream.byteBuffer2Record(incomingBuffer,clientTokenRecord); byte[] clientToken = clientTokenRecord.getToken(); LOG.debug("Size of client SASL token: " + clientToken.length); byte[] responseToken = null; try { ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer; try { // note that clientToken might be empty (clientToken.length == 0): // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the // SASL negotiation process. responseToken = saslServer.evaluateResponse(clientToken); if (saslServer.isComplete() == true) { String authorizationID = saslServer.getAuthorizationID(); LOG.info("adding SASL authorization for authorizationID: " + authorizationID); cnxn.addAuthInfo(new Id("sasl",authorizationID)); } } catch (SaslException e) { LOG.warn("Client failed to SASL authenticate: " + e, e); if ((System.getProperty("zookeeper.allowSaslFailedClients") != null) && (System.getProperty("zookeeper.allowSaslFailedClients").equals("true"))) { LOG.warn("Maintaining client connection despite SASL authentication failure."); } else { LOG.warn("Closing client connection due to SASL authentication failure."); cnxn.close(); } } } catch (NullPointerException e) { LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly."); } if (responseToken != null) { LOG.debug("Size of server SASL response: " + responseToken.length); } // wrap SASL response token to client inside a Response object. return new SetSASLResponse(responseToken); } public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); rc = getZKDatabase().processTxn(hdr, txn); if (opCode == OpCode.createSession) { if (txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.addSession(sessionId, cst .getTimeOut()); } else { LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } return rc; } void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) { this.zkShutdownHandler = zkShutdownHandler; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)