ZooKeeper :Java客户端Session、ACL、Znode API介绍

ZooKeeper :Java客户端Session、ACL、Znode API介绍,第1张

ZooKeeper :Java客户端Session、ACL、Znode API介绍 ZooKeeper :Java客户端Session、ACL、Znode API介绍

前期回顾:

  • ZooKeeper :Shell脚本搭建单机版ZooKeeper
  • ZooKeeper :重要概念 & 客户端命令介绍
  • ZooKeeper :搭建ZooKeeper集群
  • ZooKeeper :Nginx基于TCP协议代理ZooKeeper集群

本篇博客博主将会给大家介绍ZooKeeper提供的Java客户端API,以后还会介绍Curator的使用。

先创建一个maven项目:

pom.xml如下所示:



    4.0.0

    com.kaven
    zookeeper
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            org.apache.zookeeper
            zookeeper
            3.6.3
        
    

ZooKeeper依赖包的版本最好要和ZooKeeper服务端的版本一致。

启动ZooKeeper集群和Nginx TCP连接代理。


Session

Java客户端与ZooKeeper服务端的连接、断连、重连以及Watcher事件监听等都会触发WatchedEvent。

WatchedEvent类:

package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.proto.WatcherEvent;


@InterfaceAudience.Public
public class WatchedEvent {
    
    // ZooKeeper的当前状态
    private final KeeperState keeperState;
    // 事件类型
    private final EventType eventType;
    // 事件涉及的znode路径
    private String path;
    ...
}

Application类(实现了Watcher接口,通过process方法,可以监听WatchedEvent的触发):

package com.kaven.zookeeper;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;



public class Application implements Watcher {

    private static CountDownLatch latch;
    // Nginx代理ZooKeeper集群后的连接套接字
    private static final String SERVER_PROXY = "192.168.1.200:9999";
    // ZooKeeper集群的连接套接字
    private static final String SERVER_CLUSTER = "192.168.1.199:9000,192.168.1.200:9000,192.168.1.201:9000";
    private static final int TIMEOUT = 40000;
    private static long time;

    public static void main(String[] args) throws IOException, InterruptedException {
        Watcher watcher = new Application();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher);
        latch.await();

        System.out.println("Connection complete!");
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("-----------------WatchedEvent------------------");
        System.out.println(watchedEvent.getType());
        System.out.println(watchedEvent.getState().name());
        System.out.println("time use(ms):" + (System.currentTimeMillis() - time));
        time = System.currentTimeMillis();
        System.out.println("-----------------WatchedEvent------------------");
        if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
            latch.countDown();
        }
    }
}

输出:

-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9672
-----------------WatchedEvent------------------
Connection complete!

不了解多线程协调工具CountDownLatch可以看下面这篇博客:

  • Java并发编程一CountDownLatch、CyclicBarrier、Semaphore初使用

与ZooKeeper服务端的交互,核心是ZooKeeper类,这是ZooKeeper客户端库的主要类。要使用ZooKeeper服务,应用程序必须首先实例化ZooKeeper类的对象。所有的 *** 作都将通过调用ZooKeeper类的方法来完成。除非另有说明,否则此类的方法是线程安全的。一旦与服务端建立连接,就会为客户端分配一个Session ID。客户端将定期向服务端发送心跳以保持会话有效。只要客户端的Session ID保持有效,应用程序就可以通过客户端调用ZooKeeper API。

如果由于某种原因,客户端长时间未能向服务端发送心跳(例如超过sessionTimeout值),服务端将使会话过期,Session ID将失效。 客户端对象将不再可用。 要进行ZooKeeper API调用,应用程序必须创建一个新的客户端对象。如果客户端当前连接的ZooKeeper服务端出现故障或没有响应,客户端将在其Session ID到期之前自动尝试连接其他服务端(集群)。 如果成功,应用程序可以继续使用客户端。

ZooKeeper API方法有同步方法和异步方法。同步方法阻塞,直到服务端响应。异步方法只是将请求发送并立即返回(请求会排队)。调用异步方法时,会将一个回调实例以参数的方式传给它,该实例将在请求执行成功或错误时被执行,并带有指示错误的状态码(回调方法的rc参数,该状态码在Code枚举类中进行了定义,调用ZooKeeper API的异步方法失败时,可以查看状态码,去Code枚举类中查询相关原因)。

        OK(Ok),
        SYSTEMERROR(SystemError),
        RUNTIMEINCONSISTENCY(RuntimeInconsistency),
        DATAINCONSISTENCY(DataInconsistency),
        CONNECTIONLOSS(ConnectionLoss),
        MARSHALLINGERROR(MarshallingError),
        UNIMPLEMENTED(Unimplemented),
        OPERATIONTIMEOUT(OperationTimeout),
        BADARGUMENTS(BadArguments),
        NEWCONFIGNOQUORUM(NewConfigNoQuorum),
        RECONFIGINPROGRESS(ReconfigInProgress),
        UNKNOWNSESSION(UnknownSession),
        APIERROR(APIError),
        NONODE(NoNode),
        NOAUTH(NoAuth),
        BADVERSION(BadVersion),
        NOCHILDRENFOREPHEMERALS(NoChildrenForEphemerals),
        NODEEXISTS(NodeExists),
        NOTEMPTY(NotEmpty),
        SESSIONEXPIRED(SessionExpired),
        INVALIDCALLBACK(InvalidCallback),
        INVALIDACL(InvalidACL),
        AUTHFAILED(AuthFailed),
        SESSIONMOVED(-118),
        NOTREADONLY(-119),
        EPHEMERALONLOCALSESSION(EphemeralOnLocalSession),
        NOWATCHER(-121),
        REQUESTTIMEOUT(-122),
        RECONFIGDISABLED(-123),
        SESSIONCLOSEDREQUIRESASLAUTH(-124);

ZooKeeper API调用成功后可以在ZooKeeper服务端的数据节点上留下Watcher。 其他成功的 ZooKeeper API调用可以触发这些Watcher。 一旦Watcher被触发,一个事件将被传递给Watcher的处理方法(以回调的形式,如process方法)。 每个Watcher只能触发一次。

因此,连接ZooKeeper服务端,直接使用ZooKeeper类的构造函数即可。

构造函数部分参数如下:

  • connectString:逗号分隔的host:port对,每个host:port对对应一个ZooKeeper服务端,如"192.168.1.199:9000,192.168.1.200:9000,192.168.1.201:9000"。如果需要使用chroot后缀,直接将chroot后缀拼接在host:port对的后面即可,如:"192.168.1.199:9000,192.168.1.200:9000,192.168.1.201:9000/kaven",之后客户端将基于"/kaven"进行 *** 作( *** 作"/elasticjob/job"将导致 *** 作在"/kaven/elasticjob/job"上)。
  • sessionTimeout:以毫秒为单位的Session超时时间。
  • watcher:一个Watcher实例,它会收到ZooKeeper状态变化的通知,也可能会收到节点事件的通知。
  • sessionId:重新连接时使用的Session ID。
  • sessionPasswd:重新连接时该Session的密码。

重新连接

连接成功后,ZooKeeper服务端会给客户端分配一个Session ID和该Session的密码,用于该客户端重新连接。

    public static void main(String[] args) throws IOException, InterruptedException {
        Watcher watcher = new Application();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher);
        latch.await();

        System.out.println("Connection complete!");

        long sessionId = zk.getSessionId();
        byte[] sessionPasswd = zk.getSessionPasswd();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        zk = new ZooKeeper(SERVER_CLUSTER, TIMEOUT, watcher, sessionId, sessionPasswd);
        latch.await();

        System.out.println("Reconnection complete!");
    }

输出:

-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9671
-----------------WatchedEvent------------------
Connection complete!
-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9015
-----------------WatchedEvent------------------
Reconnection complete!

很显然使用Nginx代理ZooKeeper集群后的连接套接字SERVER_PROXY和ZooKeeper集群的连接套接字SERVER_CLUSTER都可以连接ZooKeeper集群。

    // Nginx代理ZooKeeper集群后的连接套接字
    private static final String SERVER_PROXY = "192.168.1.200:9999";
    // ZooKeeper集群的连接套接字
    private static final String SERVER_CLUSTER = "192.168.1.199:9000,192.168.1.200:9000,192.168.1.201:9000";

断开连接

    public static void main(String[] args) throws IOException, InterruptedException {
        Watcher watcher = new Application();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher);
        latch.await();

        System.out.println("Connection complete!");

        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        zk.close();
        latch.await();

        System.out.println("Disconnect complete!");
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("-----------------WatchedEvent------------------");
        System.out.println(watchedEvent.getType());
        System.out.println(watchedEvent.getState().name());
        System.out.println("time use(ms):" + (System.currentTimeMillis() - time));
        time = System.currentTimeMillis();
        System.out.println("-----------------WatchedEvent------------------");
        if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
            latch.countDown();
        }
        else if(watchedEvent.getState().equals(Event.KeeperState.Closed)) {
            latch.countDown();
        }
    }

输出:

-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9665
-----------------WatchedEvent------------------
Connection complete!
-----------------WatchedEvent------------------
None
Closed
time use(ms):116
-----------------WatchedEvent------------------
Disconnect complete!

获取状态

    public static void main(String[] args) throws IOException, InterruptedException {
        Watcher watcher = new Application();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher);
        latch.await();
        System.out.println(zk.getState());

        System.out.println("Connection complete!");

        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        zk.close();
        latch.await();
        System.out.println(zk.getState());

        System.out.println("Disconnect complete!");
    }

输出:

-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9689
-----------------WatchedEvent------------------
ConNECTED
Connection complete!
-----------------WatchedEvent------------------
None
Closed
time use(ms):116
-----------------WatchedEvent------------------
CLOSED
Disconnect complete!

状态是枚举类型(States枚举类):

    @InterfaceAudience.Public
    public enum States {
        CONNECTING,
        ASSOCIATING,
        CONNECTED,
        CONNECTEDREADONLY,
        CLOSED,
        AUTH_FAILED,
        NOT_CONNECTED;
        ...
    }
ACL

ZooKeeper的Java客户端对ACL的定义体现在Perms接口(授权权限)和Id类(授权策略和授权对象)中,对ZooKeeper ACL不太了解可以看下面这篇博客。

  • ZooKeeper :重要概念 & 客户端命令介绍
   @InterfaceAudience.Public
    public interface Perms {

        int READ = 1 << 0;

        int WRITE = 1 << 1;

        int CREATE = 1 << 2;

        int DELETE = 1 << 3;

        int ADMIN = 1 << 4;

        int ALL = READ | WRITE | CREATE | DELETE | ADMIN;

    }

在Perms接口中授权权限被定义成一个整数,每个授权权限的二进制表示只有一个位置为1,并且这个位置各不相同,因此,可以通过按位或的方式实现授权权限的叠加。

权限值(只看后5位)权限描述00001READ可以读取节点数据及显示子节点列表00010WRITE可以设置节点数据00100CREATE可以创建子节点01000DELETE可以删除子节点(仅直接子节点)10000ADMIN可以设置节点访问控制列表权限
@Public
public class Id implements Record {
    private String scheme;
    private String id;
    ...
}

授权策略(Scheme):

  1. world:开放模式,world表示任意客户端都可以访问(默认设置)。
  2. ip:限定客户端IP防问。
  3. auth:只有在会话中通过了认证才可以访问(通过addauth命令)。
  4. digest:与auth类似,区别在于auth用明文密码,而digest用SHA1+base64加密后的密码(通过addauth命令,实际场景中digest更常见)。

授权对象(ID)就是指定的授权策略(Scheme)的内容,比如world:anyone中的anyone、ip:192.168.1.189中的192.168.1.189、auth:username:password中的username:password(明文密码)、digest:username:password_digest中的username:password_digest(用SHA1+base64加密后的密码)。

ZooKeeper的Java客户端中内置了一些ACL定义(Ids接口中):

    @InterfaceAudience.Public
    public interface Ids {

        Id ANYONE_ID_UNSAFE = new Id("world", "anyone");

        Id AUTH_IDS = new Id("auth", "");

        @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API")
        ArrayList OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));

        @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API")
        ArrayList CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));

        @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API")
        ArrayList READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
    }

授权IP地址为192.168.1.100的客户端READ和WRITE权限可以定义如下:

new ACL(Perms.READ | Perms.WRITE, new Id("ip", "192.168.1.100"));

其他ACL定义以此类推。

设置和获取节点ACL

setACL和getACL两个方法都存在同步版本与异步版本,setACL方法在设置节点的ACL后还会返回节点的状态信息,getACL方法返回给定路径下的节点的ACL和状态信息。异步版本的方法基本上都是在调用异步回调方法时,将需要返回的信息作为参数传进来,实现信息的获取。

// 如果存在这样的节点并且给定的aclVersion与节点的aclVersion匹配
// 则为给定路径的节点设置ACL并且返回节点的状态信息
// 如果不存在给定路径的节点,将抛出错误代码为KeeperException.NoNode的KeeperException
// 如果给定的aclVersion与节点的aclVersion不匹配,则会抛出错误代码为 KeeperException.BadVersion的KeeperException
Stat setACL(final String path, List acl, int aclVersion)
// setACL的异步版本
void setACL(final String path, List acl, int version, StatCallback cb, Object ctx)
// 返回给定路径的节点的ACL和状态信息
// 如果不存在给定路径的节点,将抛出错误代码为KeeperException.NoNode的KeeperException
// 如果stat参数不为空,则会将节点的状态信息复制到此参数中,其他方法也是类似的方式来完成状态信息的返回
List getACL(final String path, Stat stat)
// getACL的异步版本
void getACL(final String path, Stat stat, ACLCallback cb, Object ctx)

添加认证

addAuthInfo方法将指定的scheme:auth信息添加到客户端与ZooKeeper服务端建立的连接中(此方法不是线程安全的),使用方法如下所示:

zk.addAuthInfo("digest", "kaven:itkaven".getBytes());
zk.addAuthInfo("auth", "kaven:itkaven".getBytes());
Znode

创建节点

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        Watcher watcher = new Application();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher);
        latch.await();
        System.out.println(zk.getState());

        System.out.println("Connection complete!");

        String message = "success";
        ACL acl = new ACL(
                ZooDefs.Perms.ALL,
                new Id("digest",
                        DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
        );
        latch = new CountDownLatch(1);
        zk.create("/itkaven",
                "hello kaven".getBytes(),
                new ArrayList<>(Collections.singletonList(acl)),
                CreateMode.EPHEMERAL,
                (rc, path, ctx, name, s) -> {
                    System.out.println(rc);
                    System.out.println(path);
                    System.out.println(name);
                    System.out.println(name.equals(path) ? ctx : "error");
                    System.out.println(s.getDataLength());
                    latch.countDown();
                },
                message);

        latch.await();
        zk.addAuthInfo("digest", "kaven:itkaven".getBytes());
        Stat stat = new Stat();
        byte[] data = zk.getData("/itkaven", false, stat);
        System.out.println("data: " + new String(data));

        Thread.sleep(1000000);
    }

输出:

-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9696
-----------------WatchedEvent------------------
ConNECTED
Connection complete!
0
/itkaven
/itkaven
success
11
data: hello kaven

在其他客户端上 *** 作该节点,就会出现权限问题:

DigestAuthenticationProvider.generateDigest("kaven:itkaven")用于生成加密后的密码(digest使用SHA1+base64加密后的密码):

    public static String generateDigest(String idPassword) throws NoSuchAlgorithmException {
        String[] parts = idPassword.split(":", 2);
        byte[] digest = MessageDigest.getInstance("SHA1").digest(idPassword.getBytes());
        return parts[0] + ":" + base64Encode(digest);
    }

使用create方法创建节点(异步创建,ZooKeeper的Java客户端方法大部分都有异步版本,所以需要使用CountDownLatch来协调多线程执行顺序),部分参数如下:

  • path:该节点的路径。
  • data:该节点存储的数据。
  • acl:该节点的ACL列表。
  • createMode:该节点的类型(持久类型、临时类型、容器类型等)。
  • cb:创建该节点成功或者失败后的回调(可通过Lambda表达式指定)。
  • ctx:传递给异步调用的上下文对象。
  • ttl:创建TTL节点时,用于指定TTL节点的TTL时间,创建其他类型的节点时,指定该参数会导致节点创建不成功(除非指定为-1)。

节点类型(7种,容器节点和TTL节点在重要概念 & 客户端命令介绍这篇博客中已经介绍过了):

    PERSISTENT(0, false, false, false, false),
    PERSISTENT_SEQUENTIAL(2, false, true, false, false),
    EPHEMERAL(1, true, false, false, false),
    EPHEMERAL_SEQUENTIAL(3, true, true, false, false),
    CONTAINER(4, false, false, true, false),
    PERSISTENT_WITH_TTL(5, false, false, false, true),
    PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true);

回调有两种方式,第一种方式(Create2Callback接口):

    @InterfaceAudience.Public
    interface Create2Callback extends AsyncCallback {
        void processResult(int rc, String path, Object ctx, String name, Stat stat);
    }
  • rc:返回码或调用结果。
  • path:传递给异步调用的路径。
  • ctx:传递给异步调用的上下文对象。
  • name:创建的节点的名称。 节点创建成功时,name和path通常相等,除非创建顺序节点。
  • stat :给定path上的节点的Stat对象。

Stat对象就是节点的状态信息:

@Public
public class Stat implements Record {
    private long czxid;
    private long mzxid;
    private long ctime;
    private long mtime;
    private int version;
    private int cversion;
    private int aversion;
    private long ephemeralOwner;
    private int dataLength;
    private int numChildren;
    private long pzxid;
    ...
}

第二种方式(StringCallback接口,与第一种方式的差别,只是processResult方法没有stat参数):

    @InterfaceAudience.Public
    interface StringCallback extends AsyncCallback {
        void processResult(int rc, String path, Object ctx, String name);
    }

删除节点

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        Watcher watcher = new Application();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher);
        latch.await();
        System.out.println(zk.getState());

        System.out.println("Connection complete!");

        String message = "success";
        ACL acl = new ACL(
                ZooDefs.Perms.ALL,
                new Id("digest",
                        DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
        );
        latch = new CountDownLatch(1);
        zk.create("/itkaven",
                "hello kaven".getBytes(),
                new ArrayList<>(Collections.singletonList(acl)),
                CreateMode.EPHEMERAL,
                (rc, path, ctx, name, s) -> {
                    System.out.println(rc);
                    System.out.println(path);
                    System.out.println(name);
                    System.out.println(name.equals(path) ? ctx : "error");
                    System.out.println(s.getDataLength());
                    latch.countDown();
                },
                message);

        latch.await();
        zk.addAuthInfo("digest", "kaven:itkaven".getBytes());
        Stat stat = new Stat();
        byte[] data = zk.getData("/itkaven", false, stat);
        System.out.println("data: " + new String(data));

        String deleteMessage = "Delete complete!";
        zk.delete("/itkaven",
                stat.getVersion(),
                (rc, path, ctx) -> {
                    System.out.println(rc);
                    System.out.println(path);
                    System.out.println(ctx);
                },
                deleteMessage);

        Thread.sleep(1000000);
    }

输出:

-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9669
-----------------WatchedEvent------------------
ConNECTED
Connection complete!
0
/itkaven
/itkaven
success
11
data: hello kaven
0
/itkaven
Delete complete!

delete方法删除给定路径的节点。 如果存在这样的节点,并且给定的版本与节点的版本匹配(如果给定的版本为-1,则它与节点的任何版本匹配),则调用将成功。这里使用的是delete方法的异步调用。

节点数据和状态信息

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        Watcher watcher = new Application();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher);
        latch.await();
        System.out.println(zk.getState());

        System.out.println("Connection complete!");

        String message = "success";
        ACL acl = new ACL(
                ZooDefs.Perms.ALL,
                new Id("digest",
                        DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
        );
        latch = new CountDownLatch(1);
        zk.create("/itkaven",
                "hello kaven".getBytes(),
                new ArrayList<>(Collections.singletonList(acl)),
                CreateMode.EPHEMERAL,
                (rc, path, ctx, name, s) -> {
                    System.out.println(rc);
                    System.out.println(path);
                    System.out.println(name);
                    System.out.println(name.equals(path) ? ctx : "error");
                    System.out.println(s.getDataLength());
                    latch.countDown();
                },
                message);

        latch.await();
        zk.addAuthInfo("digest", "kaven:itkaven".getBytes());
        zk.setData("/itkaven", "new data".getBytes(), -1);
        
        Stat stat = new Stat();
        byte[] data = zk.getData("/itkaven", false, stat);
        System.out.println("data: " + new String(data));

        String statMessage = "stat success";
        AtomicReference newStat = new AtomicReference<>(null);
        latch = new CountDownLatch(1);
        zk.exists("/itkaven",
                watcher,
                (rc, path, ctx, s) -> {
                    if(rc == KeeperException.Code.OK.intValue()) {
                        System.out.println(ctx);
                        newStat.set(s);
                    }
                    latch.countDown();
                },
                statMessage);
        latch.await();
        if(newStat.get() != null) {
            System.out.println(stat.equals(newStat.get()));
        }
        Thread.sleep(1000000);
    }

输出:

-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9697
-----------------WatchedEvent------------------
ConNECTED
Connection complete!
0
/itkaven
/itkaven
success
11
data: new data
stat success
true
  • setData:如果存在这样的节点并且给定版本与节点的版本匹配(如果给定版本为-1,则它与节点的任何版本都匹配),则为给定路径的节点设置数据,并且返回节点的状态信息。此 *** 作如果成功,将触发getData调用留下的该路径节点上的所有watch,该方法存在异步版本(通过异步方法的回调将请求的信息返回给客户端)。
  • getData:方法返回给定路径的节点的数据和状态信息(类似ZooKeeper客户端的get -s命令)。如果watch为true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。watch将由在节点上设置数据或删除节点的成功 *** 作触发,该方法也存在异步版本。
  • exists:方法返回给定路径的节点的状态(类似ZooKeeper客户端的stat命令)。如果不存在这样的节点,则返回null。如果watch为 true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。 watch将由创建、删除节点或在节点上设置数据的成功 *** 作触发,该方法也存在异步版本。
  • AtomicReference:可参考这篇博客Java并发编程一引用类型、升级类型原子类初使用加源码分析。

获取子节点

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        Watcher watcher = new Application();
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, watcher);
        latch.await();
        System.out.println(zk.getState());

        System.out.println("Connection complete!");

        String message = "success";
        ACL acl = new ACL(
                ZooDefs.Perms.ALL,
                new Id("digest",
                        DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
        );
        latch = new CountDownLatch(1);
        zk.create("/itkaven",
                "hello kaven".getBytes(),
                new ArrayList<>(Collections.singletonList(acl)),
                CreateMode.EPHEMERAL,
                (rc, path, ctx, name, s) -> {
                    System.out.println(rc);
                    System.out.println(path);
                    System.out.println(name);
                    System.out.println(name.equals(path) ? ctx : "error");
                    System.out.println(s.getDataLength());
                    latch.countDown();
                },
                message);

        latch.await();
//        zk.addAuthInfo("digest", "kaven:itkaven".getBytes());
        zk.getChildren("/", false).forEach(System.out::println);
        System.out.println(zk.getAllChildrenNumber("/"));
        System.out.println(zk.getEphemerals("/"));
        System.out.println(zk.getEphemerals());

        Thread.sleep(1000000);
    }

输出:

-----------------WatchedEvent------------------
None
SyncConnected
time use(ms):9711
-----------------WatchedEvent------------------
ConNECTED
Connection complete!
0
/itkaven
/itkaven
success
11
kaven
zookeeper
nginx
itkaven
lalal
9
[/itkaven]
[/itkaven]
  • getChildren:返回给定路径的节点的子节点列表。如果watch为true并且调用成功(也可以传入一个Watcher实例),则watch将留在给定路径的节点上。 删除给定路径的节点或在节点下创建、删除子节点的成功 *** 作将触发watch。返回的子节点列表未排序,该方法存在异步版本。
  • getAllChildrenNumber:同步或者异步获取特定路径下所有子节点的数量(包括子节点的子节点,以此类推)。
  • getEphemerals:同步或者异步获取所有在此会话创建的与prefixPath路径匹配的临时节点(不使用prefixPath路径参数就是获取此会话创建的所有临时节点 )。 如果prefixPath 是"/"那么它会返回所有的临时节点(不包括其他客户端上创建的临时节点)。

ZooKeeper的Java客户端中关于Session、ACL以及Znode API的介绍就到这里,光看不练假把式,编程需要多练习和实战才能得到提升,只有使用的多了,才能快速排查出问题所在。如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5573396.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存