文章目录提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
- 前言
- 1. 导入依赖
- 2. 客户端API使用
- 2.1 连接zookeeper
- 2.2 添加节点数据
- 2.3 获取加入的数据并添加事件
- 2.4 异步读取数据
- 总结
前言
使用Java在编写客户端代码,连接Zookeeper集群
1. 导入依赖
我的zookeeper服务集群使用的3.6.3,客户端也使用相同版本
<dependency>
<groupId>org.apache.zookeepergroupId>
<artifactId>zookeeperartifactId>
<version>3.6.3version>
dependency>
2. 客户端API使用
2.1 连接zookeeper搭建含有三个结点的集群
public static void main( String[] args ) throws IOException, InterruptedException {
/*
参数:1. 连接的集群ip
2. session超时时间
3. watcher的回调函数
*/
CountDownLatch cdl = new CountDownLatch(1);
String connectionGroups = "192.168.52.130,192.168.52.131,192.138.52.132";
ZooKeeper zk = new ZooKeeper(connectionGroups, 3000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 获取当前状态
Event.KeeperState state = watchedEvent.getState();
System.out.println(watchedEvent.getPath());
switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println("connected ...");
cdl.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
});
// 1. 此时进入connecting ... 的状态; 原因客户端请求后,集群以异步的方式执行
// 2. 为了达到同步的状态,可以使用CountDownLatch
cdl.await();
ZooKeeper.States state = zk.getState();
switch (state) {
case CONNECTING:
System.out.println("connecting ...");
break;
case ASSOCIATING:
break;
case CONNECTED:
System.out.println("end ...");
break;
case CONNECTEDREADONLY:
break;
case CLOSED:
break;
case AUTH_FAILED:
break;
case NOT_CONNECTED:
break;
}
}
ZK中,每一次客户端的连接会创建一个临时session用于标记连接状态;当连接失效后,可以根据设置的timeout然后剔除集群中的节点数据
集群中出现日志信息:
// 在集群中添加节点, 节点中的数据是二进制安全的,需要使用字节流; 最后的节点模式选择临时节点
String pathName = zk.create("/ooxx", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("createNodePath:" + pathName);
2.3 获取加入的数据并添加事件
byte[] data = zk.getData("/ooxx", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 在获取节点数据的时候可以绑定watch, 在下次节点状态发生变化的时候产生回调
System.out.println("getData回调函数开始。。。");
}
}, new Stat());
System.out.println(new String(data));
Stat stat = zk.setData("/ooxx", "hello_newData".getBytes(), 0);
System.out.println("stat:" + stat);
Stat stat1 = zk.setData("/ooxx", "hello_newData2".getBytes(), stat.getVersion());
System.out.println("stat1:" + stat1);
// 延缓主进程结束时间
Thread.sleep(1000);
- 主进程结束前后集群节点的状态:
2.4 异步读取数据原因:在添加数据的时候节点模式采用EPHEMERAL,表示临时;每一次连接的创建都会产生一个session, 当连接断开后,等待初始设置的3000ms后会将session失效
在前面的getData中,使用的是同步阻塞的形式,所有代码必须从上到下依次执行并完全获取数据;下面是异步的形式:
// finally: 上述的create是同步阻塞的,下面尝试下异步的
System.out.println("async start ...");
zk.getData("/ooxx", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
System.out.println("async wait data ...");
System.out.println("传入的数据:" + o.toString());
System.out.println("节点数据:" + new String(bytes));
}
}, "righteye");
System.out.println("async end ...");
System.out.println("后续执行代码:aaa");
// 延缓主进程结束时间
Thread.sleep(2222222);
运行结果:
- 整理下Java程序编写ZK客户端代码
- 怎么去连接ZK,如何获取数据;了解getData, setData的监听机制,学会了怎么进行同步和异步的获取数据
- 节点状态, 每一次连接都对应一个session,由于session和连接状态的绑定,配合监听事件可以做分布式锁
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)