前期回顾:
- ZooKeeper :Shell脚本搭建单机版ZooKeeper
- ZooKeeper :重要概念 & 客户端命令介绍
- ZooKeeper :搭建ZooKeeper集群
- ZooKeeper :Nginx基于TCP协议代理ZooKeeper集群
本篇博客博主将会给大家介绍ZooKeeper提供的Java客户端API,以后还会介绍Curator的使用。
先创建一个maven项目:
pom.xml如下所示:
4.0.0 com.kaven zookeeper1.0-SNAPSHOT 8 8 org.apache.zookeeper zookeeper3.6.3
ZooKeeper依赖包的版本最好要和ZooKeeper服务端的版本一致。
启动ZooKeeper集群和Nginx TCP连接代理。
Java客户端与ZooKeeper服务端的连接、断连、重连以及Watcher事件监听等都会触发WatchedEvent。
WatchedEvent类:
package org.apache.zookeeper; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.proto.WatcherEvent; @InterfaceAudience.Public public class WatchedEvent { // ZooKeeper的当前状态 private final KeeperState keeperState; // 事件类型 private final EventType eventType; // 事件涉及的znode路径 private String path; ... }
Application类(实现了Watcher接口,通过process方法,可以监听WatchedEvent的触发):
package com.kaven.zookeeper; import org.apache.zookeeper.*; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class Application implements Watcher { private static CountDownLatch latch; // Nginx代理ZooKeeper集群后的连接套接字 private static final String SERVER_PROXY = "192.168.1.200:9999"; // ZooKeeper集群的连接套接字 private static final String SERVER_CLUSTER = "192.168.1.199:9000,192.168.1.200:9000,192.168.1.201:9000"; private static final int TIMEOUT = 40000; private static long time; public static void main(String[] args) throws IOException, InterruptedException { Watcher watcher = new Application(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher); latch.await(); System.out.println("Connection complete!"); } @Override public void process(WatchedEvent watchedEvent) { System.out.println("-----------------WatchedEvent------------------"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getState().name()); System.out.println("time use(ms):" + (System.currentTimeMillis() - time)); time = System.currentTimeMillis(); System.out.println("-----------------WatchedEvent------------------"); if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { latch.countDown(); } } }
输出:
-----------------WatchedEvent------------------ None SyncConnected time use(ms):9672 -----------------WatchedEvent------------------ Connection complete!
不了解多线程协调工具CountDownLatch可以看下面这篇博客:
- Java并发编程一CountDownLatch、CyclicBarrier、Semaphore初使用
与ZooKeeper服务端的交互,核心是ZooKeeper类,这是ZooKeeper客户端库的主要类。要使用ZooKeeper服务,应用程序必须首先实例化ZooKeeper类的对象。所有的 *** 作都将通过调用ZooKeeper类的方法来完成。除非另有说明,否则此类的方法是线程安全的。一旦与服务端建立连接,就会为客户端分配一个Session ID。客户端将定期向服务端发送心跳以保持会话有效。只要客户端的Session ID保持有效,应用程序就可以通过客户端调用ZooKeeper API。
如果由于某种原因,客户端长时间未能向服务端发送心跳(例如超过sessionTimeout值),服务端将使会话过期,Session ID将失效。 客户端对象将不再可用。 要进行ZooKeeper API调用,应用程序必须创建一个新的客户端对象。如果客户端当前连接的ZooKeeper服务端出现故障或没有响应,客户端将在其Session ID到期之前自动尝试连接其他服务端(集群)。 如果成功,应用程序可以继续使用客户端。
ZooKeeper API方法有同步方法和异步方法。同步方法阻塞,直到服务端响应。异步方法只是将请求发送并立即返回(请求会排队)。调用异步方法时,会将一个回调实例以参数的方式传给它,该实例将在请求执行成功或错误时被执行,并带有指示错误的状态码(回调方法的rc参数,该状态码在Code枚举类中进行了定义,调用ZooKeeper API的异步方法失败时,可以查看状态码,去Code枚举类中查询相关原因)。
OK(Ok), SYSTEMERROR(SystemError), RUNTIMEINCONSISTENCY(RuntimeInconsistency), DATAINCONSISTENCY(DataInconsistency), CONNECTIONLOSS(ConnectionLoss), MARSHALLINGERROR(MarshallingError), UNIMPLEMENTED(Unimplemented), OPERATIONTIMEOUT(OperationTimeout), BADARGUMENTS(BadArguments), NEWCONFIGNOQUORUM(NewConfigNoQuorum), RECONFIGINPROGRESS(ReconfigInProgress), UNKNOWNSESSION(UnknownSession), APIERROR(APIError), NONODE(NoNode), NOAUTH(NoAuth), BADVERSION(BadVersion), NOCHILDRENFOREPHEMERALS(NoChildrenForEphemerals), NODEEXISTS(NodeExists), NOTEMPTY(NotEmpty), SESSIONEXPIRED(SessionExpired), INVALIDCALLBACK(InvalidCallback), INVALIDACL(InvalidACL), AUTHFAILED(AuthFailed), SESSIONMOVED(-118), NOTREADONLY(-119), EPHEMERALONLOCALSESSION(EphemeralOnLocalSession), NOWATCHER(-121), REQUESTTIMEOUT(-122), RECONFIGDISABLED(-123), SESSIONCLOSEDREQUIRESASLAUTH(-124);
ZooKeeper API调用成功后可以在ZooKeeper服务端的数据节点上留下Watcher。 其他成功的 ZooKeeper API调用可以触发这些Watcher。 一旦Watcher被触发,一个事件将被传递给Watcher的处理方法(以回调的形式,如process方法)。 每个Watcher只能触发一次。
因此,连接ZooKeeper服务端,直接使用ZooKeeper类的构造函数即可。
构造函数部分参数如下:
- connectString:逗号分隔的host:port对,每个host:port对对应一个ZooKeeper服务端,如"192.168.1.199:9000,192.168.1.200:9000,192.168.1.201:9000"。如果需要使用chroot后缀,直接将chroot后缀拼接在host:port对的后面即可,如:"192.168.1.199:9000,192.168.1.200:9000,192.168.1.201:9000/kaven",之后客户端将基于"/kaven"进行 *** 作( *** 作"/elasticjob/job"将导致 *** 作在"/kaven/elasticjob/job"上)。
- sessionTimeout:以毫秒为单位的Session超时时间。
- watcher:一个Watcher实例,它会收到ZooKeeper状态变化的通知,也可能会收到节点事件的通知。
- sessionId:重新连接时使用的Session ID。
- sessionPasswd:重新连接时该Session的密码。
重新连接
连接成功后,ZooKeeper服务端会给客户端分配一个Session ID和该Session的密码,用于该客户端重新连接。
public static void main(String[] args) throws IOException, InterruptedException { Watcher watcher = new Application(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher); latch.await(); System.out.println("Connection complete!"); long sessionId = zk.getSessionId(); byte[] sessionPasswd = zk.getSessionPasswd(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); zk = new ZooKeeper(SERVER_CLUSTER, TIMEOUT, watcher, sessionId, sessionPasswd); latch.await(); System.out.println("Reconnection complete!"); }
输出:
-----------------WatchedEvent------------------ None SyncConnected time use(ms):9671 -----------------WatchedEvent------------------ Connection complete! -----------------WatchedEvent------------------ None SyncConnected time use(ms):9015 -----------------WatchedEvent------------------ Reconnection complete!
很显然使用Nginx代理ZooKeeper集群后的连接套接字SERVER_PROXY和ZooKeeper集群的连接套接字SERVER_CLUSTER都可以连接ZooKeeper集群。
// Nginx代理ZooKeeper集群后的连接套接字 private static final String SERVER_PROXY = "192.168.1.200:9999"; // ZooKeeper集群的连接套接字 private static final String SERVER_CLUSTER = "192.168.1.199:9000,192.168.1.200:9000,192.168.1.201:9000";
断开连接
public static void main(String[] args) throws IOException, InterruptedException { Watcher watcher = new Application(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher); latch.await(); System.out.println("Connection complete!"); latch = new CountDownLatch(1); time = System.currentTimeMillis(); zk.close(); latch.await(); System.out.println("Disconnect complete!"); } @Override public void process(WatchedEvent watchedEvent) { System.out.println("-----------------WatchedEvent------------------"); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getState().name()); System.out.println("time use(ms):" + (System.currentTimeMillis() - time)); time = System.currentTimeMillis(); System.out.println("-----------------WatchedEvent------------------"); if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { latch.countDown(); } else if(watchedEvent.getState().equals(Event.KeeperState.Closed)) { latch.countDown(); } }
输出:
-----------------WatchedEvent------------------ None SyncConnected time use(ms):9665 -----------------WatchedEvent------------------ Connection complete! -----------------WatchedEvent------------------ None Closed time use(ms):116 -----------------WatchedEvent------------------ Disconnect complete!
获取状态
public static void main(String[] args) throws IOException, InterruptedException { Watcher watcher = new Application(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher); latch.await(); System.out.println(zk.getState()); System.out.println("Connection complete!"); latch = new CountDownLatch(1); time = System.currentTimeMillis(); zk.close(); latch.await(); System.out.println(zk.getState()); System.out.println("Disconnect complete!"); }
输出:
-----------------WatchedEvent------------------ None SyncConnected time use(ms):9689 -----------------WatchedEvent------------------ ConNECTED Connection complete! -----------------WatchedEvent------------------ None Closed time use(ms):116 -----------------WatchedEvent------------------ CLOSED Disconnect complete!
状态是枚举类型(States枚举类):
@InterfaceAudience.Public public enum States { CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY, CLOSED, AUTH_FAILED, NOT_CONNECTED; ... }ACL
ZooKeeper的Java客户端对ACL的定义体现在Perms接口(授权权限)和Id类(授权策略和授权对象)中,对ZooKeeper ACL不太了解可以看下面这篇博客。
- ZooKeeper :重要概念 & 客户端命令介绍
@InterfaceAudience.Public public interface Perms { int READ = 1 << 0; int WRITE = 1 << 1; int CREATE = 1 << 2; int DELETE = 1 << 3; int ADMIN = 1 << 4; int ALL = READ | WRITE | CREATE | DELETE | ADMIN; }
在Perms接口中授权权限被定义成一个整数,每个授权权限的二进制表示只有一个位置为1,并且这个位置各不相同,因此,可以通过按位或的方式实现授权权限的叠加。
@Public public class Id implements Record { private String scheme; private String id; ... }
授权策略(Scheme):
- world:开放模式,world表示任意客户端都可以访问(默认设置)。
- ip:限定客户端IP防问。
- auth:只有在会话中通过了认证才可以访问(通过addauth命令)。
- digest:与auth类似,区别在于auth用明文密码,而digest用SHA1+base64加密后的密码(通过addauth命令,实际场景中digest更常见)。
授权对象(ID)就是指定的授权策略(Scheme)的内容,比如world:anyone中的anyone、ip:192.168.1.189中的192.168.1.189、auth:username:password中的username:password(明文密码)、digest:username:password_digest中的username:password_digest(用SHA1+base64加密后的密码)。
ZooKeeper的Java客户端中内置了一些ACL定义(Ids接口中):
@InterfaceAudience.Public public interface Ids { Id ANYONE_ID_UNSAFE = new Id("world", "anyone"); Id AUTH_IDS = new Id("auth", ""); @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API") ArrayList OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE))); @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API") ArrayList CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS))); @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API") ArrayList READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE))); }
授权IP地址为192.168.1.100的客户端READ和WRITE权限可以定义如下:
new ACL(Perms.READ | Perms.WRITE, new Id("ip", "192.168.1.100"));
其他ACL定义以此类推。
设置和获取节点ACL
setACL和getACL两个方法都存在同步版本与异步版本,setACL方法在设置节点的ACL后还会返回节点的状态信息,getACL方法返回给定路径下的节点的ACL和状态信息。异步版本的方法基本上都是在调用异步回调方法时,将需要返回的信息作为参数传进来,实现信息的获取。
// 如果存在这样的节点并且给定的aclVersion与节点的aclVersion匹配 // 则为给定路径的节点设置ACL并且返回节点的状态信息 // 如果不存在给定路径的节点,将抛出错误代码为KeeperException.NoNode的KeeperException // 如果给定的aclVersion与节点的aclVersion不匹配,则会抛出错误代码为 KeeperException.BadVersion的KeeperException Stat setACL(final String path, List acl, int aclVersion) // setACL的异步版本 void setACL(final String path, List acl, int version, StatCallback cb, Object ctx) // 返回给定路径的节点的ACL和状态信息 // 如果不存在给定路径的节点,将抛出错误代码为KeeperException.NoNode的KeeperException // 如果stat参数不为空,则会将节点的状态信息复制到此参数中,其他方法也是类似的方式来完成状态信息的返回 List getACL(final String path, Stat stat) // getACL的异步版本 void getACL(final String path, Stat stat, ACLCallback cb, Object ctx)
添加认证
addAuthInfo方法将指定的scheme:auth信息添加到客户端与ZooKeeper服务端建立的连接中(此方法不是线程安全的),使用方法如下所示:
zk.addAuthInfo("digest", "kaven:itkaven".getBytes()); zk.addAuthInfo("auth", "kaven:itkaven".getBytes());Znode
创建节点
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { Watcher watcher = new Application(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher); latch.await(); System.out.println(zk.getState()); System.out.println("Connection complete!"); String message = "success"; ACL acl = new ACL( ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")) ); latch = new CountDownLatch(1); zk.create("/itkaven", "hello kaven".getBytes(), new ArrayList<>(Collections.singletonList(acl)), CreateMode.EPHEMERAL, (rc, path, ctx, name, s) -> { System.out.println(rc); System.out.println(path); System.out.println(name); System.out.println(name.equals(path) ? ctx : "error"); System.out.println(s.getDataLength()); latch.countDown(); }, message); latch.await(); zk.addAuthInfo("digest", "kaven:itkaven".getBytes()); Stat stat = new Stat(); byte[] data = zk.getData("/itkaven", false, stat); System.out.println("data: " + new String(data)); Thread.sleep(1000000); }
输出:
-----------------WatchedEvent------------------ None SyncConnected time use(ms):9696 -----------------WatchedEvent------------------ ConNECTED Connection complete! 0 /itkaven /itkaven success 11 data: hello kaven
在其他客户端上 *** 作该节点,就会出现权限问题:
DigestAuthenticationProvider.generateDigest("kaven:itkaven")用于生成加密后的密码(digest使用SHA1+base64加密后的密码):
public static String generateDigest(String idPassword) throws NoSuchAlgorithmException { String[] parts = idPassword.split(":", 2); byte[] digest = MessageDigest.getInstance("SHA1").digest(idPassword.getBytes()); return parts[0] + ":" + base64Encode(digest); }
使用create方法创建节点(异步创建,ZooKeeper的Java客户端方法大部分都有异步版本,所以需要使用CountDownLatch来协调多线程执行顺序),部分参数如下:
- path:该节点的路径。
- data:该节点存储的数据。
- acl:该节点的ACL列表。
- createMode:该节点的类型(持久类型、临时类型、容器类型等)。
- cb:创建该节点成功或者失败后的回调(可通过Lambda表达式指定)。
- ctx:传递给异步调用的上下文对象。
- ttl:创建TTL节点时,用于指定TTL节点的TTL时间,创建其他类型的节点时,指定该参数会导致节点创建不成功(除非指定为-1)。
节点类型(7种,容器节点和TTL节点在重要概念 & 客户端命令介绍这篇博客中已经介绍过了):
PERSISTENT(0, false, false, false, false), PERSISTENT_SEQUENTIAL(2, false, true, false, false), EPHEMERAL(1, true, false, false, false), EPHEMERAL_SEQUENTIAL(3, true, true, false, false), CONTAINER(4, false, false, true, false), PERSISTENT_WITH_TTL(5, false, false, false, true), PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true);
回调有两种方式,第一种方式(Create2Callback接口):
@InterfaceAudience.Public interface Create2Callback extends AsyncCallback { void processResult(int rc, String path, Object ctx, String name, Stat stat); }
- rc:返回码或调用结果。
- path:传递给异步调用的路径。
- ctx:传递给异步调用的上下文对象。
- name:创建的节点的名称。 节点创建成功时,name和path通常相等,除非创建顺序节点。
- stat :给定path上的节点的Stat对象。
Stat对象就是节点的状态信息:
@Public public class Stat implements Record { private long czxid; private long mzxid; private long ctime; private long mtime; private int version; private int cversion; private int aversion; private long ephemeralOwner; private int dataLength; private int numChildren; private long pzxid; ... }
第二种方式(StringCallback接口,与第一种方式的差别,只是processResult方法没有stat参数):
@InterfaceAudience.Public interface StringCallback extends AsyncCallback { void processResult(int rc, String path, Object ctx, String name); }
删除节点
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { Watcher watcher = new Application(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher); latch.await(); System.out.println(zk.getState()); System.out.println("Connection complete!"); String message = "success"; ACL acl = new ACL( ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")) ); latch = new CountDownLatch(1); zk.create("/itkaven", "hello kaven".getBytes(), new ArrayList<>(Collections.singletonList(acl)), CreateMode.EPHEMERAL, (rc, path, ctx, name, s) -> { System.out.println(rc); System.out.println(path); System.out.println(name); System.out.println(name.equals(path) ? ctx : "error"); System.out.println(s.getDataLength()); latch.countDown(); }, message); latch.await(); zk.addAuthInfo("digest", "kaven:itkaven".getBytes()); Stat stat = new Stat(); byte[] data = zk.getData("/itkaven", false, stat); System.out.println("data: " + new String(data)); String deleteMessage = "Delete complete!"; zk.delete("/itkaven", stat.getVersion(), (rc, path, ctx) -> { System.out.println(rc); System.out.println(path); System.out.println(ctx); }, deleteMessage); Thread.sleep(1000000); }
输出:
-----------------WatchedEvent------------------ None SyncConnected time use(ms):9669 -----------------WatchedEvent------------------ ConNECTED Connection complete! 0 /itkaven /itkaven success 11 data: hello kaven 0 /itkaven Delete complete!
delete方法删除给定路径的节点。 如果存在这样的节点,并且给定的版本与节点的版本匹配(如果给定的版本为-1,则它与节点的任何版本匹配),则调用将成功。这里使用的是delete方法的异步调用。
节点数据和状态信息
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { Watcher watcher = new Application(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher); latch.await(); System.out.println(zk.getState()); System.out.println("Connection complete!"); String message = "success"; ACL acl = new ACL( ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")) ); latch = new CountDownLatch(1); zk.create("/itkaven", "hello kaven".getBytes(), new ArrayList<>(Collections.singletonList(acl)), CreateMode.EPHEMERAL, (rc, path, ctx, name, s) -> { System.out.println(rc); System.out.println(path); System.out.println(name); System.out.println(name.equals(path) ? ctx : "error"); System.out.println(s.getDataLength()); latch.countDown(); }, message); latch.await(); zk.addAuthInfo("digest", "kaven:itkaven".getBytes()); zk.setData("/itkaven", "new data".getBytes(), -1); Stat stat = new Stat(); byte[] data = zk.getData("/itkaven", false, stat); System.out.println("data: " + new String(data)); String statMessage = "stat success"; AtomicReferencenewStat = new AtomicReference<>(null); latch = new CountDownLatch(1); zk.exists("/itkaven", watcher, (rc, path, ctx, s) -> { if(rc == KeeperException.Code.OK.intValue()) { System.out.println(ctx); newStat.set(s); } latch.countDown(); }, statMessage); latch.await(); if(newStat.get() != null) { System.out.println(stat.equals(newStat.get())); } Thread.sleep(1000000); }
输出:
-----------------WatchedEvent------------------ None SyncConnected time use(ms):9697 -----------------WatchedEvent------------------ ConNECTED Connection complete! 0 /itkaven /itkaven success 11 data: new data stat success true
- setData:如果存在这样的节点并且给定版本与节点的版本匹配(如果给定版本为-1,则它与节点的任何版本都匹配),则为给定路径的节点设置数据,并且返回节点的状态信息。此 *** 作如果成功,将触发getData调用留下的该路径节点上的所有watch,该方法存在异步版本(通过异步方法的回调将请求的信息返回给客户端)。
- getData:方法返回给定路径的节点的数据和状态信息(类似ZooKeeper客户端的get -s命令)。如果watch为true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。watch将由在节点上设置数据或删除节点的成功 *** 作触发,该方法也存在异步版本。
- exists:方法返回给定路径的节点的状态(类似ZooKeeper客户端的stat命令)。如果不存在这样的节点,则返回null。如果watch为 true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。 watch将由创建、删除节点或在节点上设置数据的成功 *** 作触发,该方法也存在异步版本。
- AtomicReference:可参考这篇博客Java并发编程一引用类型、升级类型原子类初使用加源码分析。
获取子节点
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException { Watcher watcher = new Application(); latch = new CountDownLatch(1); time = System.currentTimeMillis(); ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher); latch.await(); System.out.println(zk.getState()); System.out.println("Connection complete!"); String message = "success"; ACL acl = new ACL( ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")) ); latch = new CountDownLatch(1); zk.create("/itkaven", "hello kaven".getBytes(), new ArrayList<>(Collections.singletonList(acl)), CreateMode.EPHEMERAL, (rc, path, ctx, name, s) -> { System.out.println(rc); System.out.println(path); System.out.println(name); System.out.println(name.equals(path) ? ctx : "error"); System.out.println(s.getDataLength()); latch.countDown(); }, message); latch.await(); // zk.addAuthInfo("digest", "kaven:itkaven".getBytes()); zk.getChildren("/", false).forEach(System.out::println); System.out.println(zk.getAllChildrenNumber("/")); System.out.println(zk.getEphemerals("/")); System.out.println(zk.getEphemerals()); Thread.sleep(1000000); }
输出:
-----------------WatchedEvent------------------ None SyncConnected time use(ms):9711 -----------------WatchedEvent------------------ ConNECTED Connection complete! 0 /itkaven /itkaven success 11 kaven zookeeper nginx itkaven lalal 9 [/itkaven] [/itkaven]
- getChildren:返回给定路径的节点的子节点列表。如果watch为true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。 删除给定路径的节点或在节点下创建、删除子节点的成功 *** 作将触发watch。返回的子节点列表未排序,该方法存在异步版本。
- getAllChildrenNumber:同步或者异步获取特定路径下所有子节点的数量(包括子节点的子节点,以此类推)。
- getEphemerals:同步或者异步获取所有在此会话创建的与prefixPath路径匹配的临时节点(不使用prefixPath路径参数就是获取此会话创建的所有临时节点 )。 如果prefixPath 是"/"那么它会返回所有的临时节点(不包括其他客户端上创建的临时节点)。
ZooKeeper的Java客户端中关于Session、ACL以及Znode API的介绍就到这里,光看不练假把式,编程需要多练习和实战才能得到提升,只有使用的多了,才能快速排查出问题所在。如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)