在之前的博客中,博主介绍了Curator框架的重试策略和Session API,并且对namespace进行了原理分析:
- ZooKeeper : Curator框架重试策略和Session API介绍
- ZooKeeper : Curator框架namespace原理分析
博主使用的Curator框架版本是5.2.0,ZooKeeper版本是3.6.3(5.2.0版本的Curator使用3.6.3版本的ZooKeeper,因此它们是兼容的)。
Znodeorg.apache.curator curator-recipes5.2.0
在ZooKeeper中,Znode是存储数据、事件监听以及权限控制的主要对象,ZooKeeper的数据模型是一棵树,由斜杠/分割,类似Linux的文件系统,如下图所示(图来自ZooKeeper官网):
Znode有如下类型:
- 持久、临时:持久是默认的Znode类型,临时Znode相较于持久Znode来说就是它会随着客户端会话结束而被删除,通常可以用在一些特定的场景,如分布式锁释放、健康检查等。
- 持久顺序、临时顺序:在上面两种Znode类型的基础上,ZooKeeper会自动在这两种Znode类型的节点名后加一个数字后缀(保证唯一),这数字后缀的应用场景可以实现诸如分布式队列,分布式公平锁等。
- 容器:容器Znode类型是 3.5 以后新增的节点类型,容器Znode和持久Znode类似,但是区别是服务端启动后,会有一个单独的线程去扫描所有的容器Znode,当发现容器Znode的子节点数量为0时,会自动删除该容器Znode(删除时机,留到以后分析ZooKeeper源码时再进行介绍)。
- TTL、顺序TTL:这两种Znode类型重点是TTL(time to live,存活时间,单位为秒) ,当该节点没有子节点并且超过了指定的TTL时间后就会被自动删除,和容器Znode类似,只是容器Znode没有超时时间,使用TTL Znode需要配置extendedTypesEnabled=true,不然创建TTL Znode时会收到Unimplemented的报错。
每个Znode除了可以存储数据外,其本身还存储了数据节点相关的一些状态信息。Znode的状态信息如下表所示:
在ZooKeeper提供的客户端中,使用stat命令可以查看Znode的状态信息(如下图所示,根Znode的状态信息):
- ZooKeeper :重要概念 & 客户端命令介绍
创建Znode需要使用create方法,但Curator框架提供的create方法,比Java原生客户端提供的create方法更方便、强大和易用,可以通过Builder的方式来组装需要创建的Znode,并且当需要创建的节点的父节点不存在时,可以先创建其父节点(不是默认行为,需要通过方法指定)。
package com.kaven.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.imps.CuratorframeworkState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; public class Application{ private static final String SERVER_PROXY = "192.168.1.184:9000"; private static final int CONNECTION_TIMEOUT_MS = 40000; private static final int SESSION_TIMEOUT_MS = 10000; private static final String NAMESPACE = "namespace"; public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); Curatorframework curator = CuratorframeworkFactory.builder() .connectString(SERVER_PROXY) .retryPolicy(retryPolicy) .connectionTimeoutMs(CONNECTION_TIMEOUT_MS) .sessionTimeoutMs(SESSION_TIMEOUT_MS) .namespace(NAMESPACE) .build(); curator.start(); assert curator.getState().equals(CuratorframeworkState.STARTED); curator.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground((client, event) -> { int resultCode = event.getResultCode(); System.out.println(KeeperException.Code.get(resultCode).name()); }) .forPath("/father/son/grandson", "data".getBytes()); Thread.sleep(10000000); } }
192.168.1.184:9000是ZooKeeper服务端的套接字(用于客户端的连接,博主修改了客户端连接的默认端口),create方法会返回CreateBuilder接口实现类的一个实例,用于通过Builder的方式来组装需要创建的Znode,如下图所示(很显然其他 *** 作也是如此):
调用creatingParentsIfNeeded方法后,如果需要创建的节点的父节点不存在时(默认会报错),会以持久节点类型创建其不存在的父节点,而调用creatingParentContainersIfNeeded方法后,会以容器节点类型创建其不存在的父节点。调用withMode方法指定需要创建的节点的类型,所有的节点类型在CreateMode枚举类中定义:
package org.apache.zookeeper; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @InterfaceAudience.Public public enum CreateMode { PERSISTENT(0, false, false, false, false), PERSISTENT_SEQUENTIAL(2, false, true, false, false), EPHEMERAL(1, true, false, false, false), EPHEMERAL_SEQUENTIAL(3, true, true, false, false), CONTAINER(4, false, false, true, false), PERSISTENT_WITH_TTL(5, false, false, false, true), PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true); private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class); private boolean ephemeral; private boolean sequential; private final boolean isContainer; private int flag; private boolean isTTL; CreateMode(int flag, boolean ephemeral, boolean sequential, boolean isContainer, boolean isTTL) { this.flag = flag; this.ephemeral = ephemeral; this.sequential = sequential; this.isContainer = isContainer; this.isTTL = isTTL; } ... }
调用inBackground方法会指定该 *** 作以异步的方式进行,当异步 *** 作完成后需要调用回调方法,因此需要将回调实例传入inBackground方法中,这里使用lambda表达式表示该回调实例,它其实应该是BackgroundCallback接口实现类的实例,因为该接口只有一个未实现的方法,因此可以通过lambda表达式表示,类似函数式接口。
public interface BackgroundCallback { public void processResult(Curatorframework client, CuratorEvent event) throws Exception; }
而调用forPath方法,就是指定该 *** 作的节点路径,并且还可以指定该节点存储的数据(也可以不指定)。所有节点都创建成功。
创建TTL节点,需要添加配置(extendedTypesEnabled=true):
添加配置后再重启ZooKeeper服务即可。
/usr/local/apache-zookeeper-3.6.3-bin/bin/zkServer.sh restart
CountDownLatch latch = new CountDownLatch(1); Stat stat = new Stat(); curator.create() .orSetData() .withTtl(10000) .storingStatIn(stat) .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT_WITH_TTL) .inBackground((client, event) -> { int resultCode = event.getResultCode(); System.out.println(KeeperException.Code.get(resultCode).name()); latch.countDown(); }) .forPath("/father/son/grandson", "data".getBytes()); latch.await(); System.out.println(stat);
- orSetData:如果节点存在,就设置节点数据。
- withTtl:指定TTL节点的TTL时间,必须是创建TTL节点。
- storingStatIn:将节点的状态信息存储于传入的Stat实例中(以填充的形式)。
- CountDownLatch :线程协调工具,因为上面创建节点的 *** 作是异步的,因此需要等回调方法被调用后(创建完成,不管创建失败还是成功),才能输出stat实例,不然stat实例还未被填充(Java并发编程一CountDownLatch、CyclicBarrier、Semaphore初使用)。
而这个TTL节点只能存活10秒钟,因为它没有子节点。
删除节点需要调用delete方法,如果需要删除的节点存在子节点,可以先删除其存在的子节点(不是默认行为,需要通过方法指定)。
CountDownLatch latch = new CountDownLatch(1); curator.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground((client, event) -> { int resultCode = event.getResultCode(); System.out.println(KeeperException.Code.get(resultCode).name()); latch.countDown(); }) .forPath("/father/son/grandson", "data".getBytes()); latch.await(); Stat stat = curator.setData() .forPath("/father", "father".getBytes()); System.out.println(stat.getVersion()); byte[] data = curator.getData() .storingStatIn(stat) .forPath("/father"); System.out.println(new String(data)); System.out.println(stat.getVersion()); curator.delete() .guaranteed() .deletingChildrenIfNeeded() .withVersion(0) .inBackground((client, event) -> { int resultCode = event.getResultCode(); System.out.println(KeeperException.Code.get(resultCode).name()); }) .forPath("/father");
- setData:设置节点存储的数据。
- getData:获取节点存储的数据。
- guaranteed:解决了在服务端 *** 作可能成功,但在响应返回给客户端之前发生连接失败的边缘情况。
- deletingChildrenIfNeeded:如果需要删除的节点存在子节点,可以先删除其存在的子节点。
- withVersion:指定删除节点的版本号,如果与ZooKeeper服务端该节点的版本号不匹配,则删除 *** 作会失败。
存在的子节点也不会被删除。
正确的版本号,删除 *** 作会成功。
.withVersion(stat.getVersion())获取子节点列表
获取子节点列表需要使用getChildren方法。
curator.getChildren() .forPath("/") .forEach(System.out::println); curator.getChildren() .forPath("/father") .forEach(System.out::println);
输出如下图所示:
Curator框架基本上使用了ZooKeeper提供的ACL定义,只是改变了使用方式,ZooKeeper的Java客户端对ACL的定义体现在Perms接口(授权权限)和Id类(授权策略和授权对象)中:
@InterfaceAudience.Public public interface Perms { int READ = 1 << 0; int WRITE = 1 << 1; int CREATE = 1 << 2; int DELETE = 1 << 3; int ADMIN = 1 << 4; int ALL = READ | WRITE | CREATE | DELETE | ADMIN; }
在Perms接口中授权权限被定义成一个整数,每个授权权限的二进制表示只有一个位置为1,并且这个位置各不相同,因此,可以通过按位或的方式实现授权权限的叠加。
@Public public class Id implements Record { private String scheme; private String id; ... }
授权策略(Scheme):
- world:开放模式,world表示任意客户端都可以访问(默认设置)。
- ip:限定客户端IP防问。
- auth:只有在会话中通过了认证才可以访问(通过addauth命令)。
- digest:与auth类似,区别在于auth用明文密码,而digest用SHA1+base64加密后的密码(通过addauth命令,实际场景中digest更常见)。
授权对象(ID)就是指定的授权策略(Scheme)的内容,比如world:anyone中的anyone、ip:192.168.1.189中的192.168.1.189、auth:username:password中的username:password(明文密码)、digest:username:password_digest中的username:password_digest(用SHA1+base64加密后的密码)。
ZooKeeper的Java客户端中内置了一些ACL定义(Ids接口中):
@InterfaceAudience.Public public interface Ids { Id ANYONE_ID_UNSAFE = new Id("world", "anyone"); Id AUTH_IDS = new Id("auth", ""); @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API") ArrayList OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE))); @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API") ArrayList CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS))); @SuppressFBWarnings(value = "MS_MUTABLE_COLLECTION", justification = "Cannot break API") ArrayList READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE))); }
授权IP地址为192.168.1.100的客户端READ和WRITE权限可以定义如下:
new ACL(Perms.READ | Perms.WRITE, new Id("ip", "192.168.1.100"));
其他ACL定义以此类推。
客户端添加验证客户端添加验证其实很简单,在创建Curatorframework实例时,通过authorization方法指定:
Curatorframework curator = CuratorframeworkFactory.builder() .connectString(SERVER_PROXY) .retryPolicy(retryPolicy) .connectionTimeoutMs(CONNECTION_TIMEOUT_MS) .sessionTimeoutMs(SESSION_TIMEOUT_MS) .namespace(NAMESPACE) .authorization("digest", "kaven:itkaven".getBytes()) .build();设置ACL
通过withACL方法,可以指定节点的ACL。
ACL acl1 = new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven"))); ACL acl2 = new ACL(ZooDefs.Perms.ALL ^ ZooDefs.Perms.DELETE, new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven"))); CountDownLatch latch = new CountDownLatch(1); curator.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .withACL(Collections.singletonList(acl2), true) .inBackground((client, event) -> { int resultCode = event.getResultCode(); System.out.println(KeeperException.Code.get(resultCode).name()); latch.countDown(); }) .forPath("/father/son/grandson", "data".getBytes()); latch.await(); curator.delete() .inBackground((client, event) -> { int resultCode = event.getResultCode(); System.out.println(KeeperException.Code.get(resultCode).name()); }) .forPath("/father/son/grandson");
这两个ACL实例是一样的功能(使用二进制的按位或和异或的 *** 作,得到的结果是一样的)。
ACL acl1 = new ACL(ZooDefs.Perms.CREATE | ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven"))); ACL acl2 = new ACL(ZooDefs.Perms.ALL ^ ZooDefs.Perms.DELETE, new Id("digest", DigestAuthenticationProvider.generateDigest("kaven:itkaven")));
withACL方法的第二个参数如果为true(可以不指定,默认为false,博主只是为了演示,一般情况下不使用),则指定的ACL也应用于创建的父节点(即不存在的父节点),因此删除/father/son/grandson节点返回NOAUTH(没有权限),因为/father/son节点也具有指定的ACL,因此不能删除它的子节点(没有DELETE权限)。
而创建子节点的权限是有的(同理,其他具有权限的 *** 作也可以顺利完成)。
curator.create() .inBackground((client, event) -> { int resultCode = event.getResultCode(); System.out.println(KeeperException.Code.get(resultCode).name()); }) .forPath("/father/son/grandson_");
还可以通过setACL方法(也可以指定节点的版本号,这里就不演示了)来设置节点的ACL,而getACL方法用来获取节点的ACL。
curator.setACL() .withACL(Collections.singletonList(acl1)) .forPath("/father/son/grandson"); System.out.println(curator.getACL() .forPath("/father/son/grandson"));
输出如下图所示:
23表示10111(二进制表示的权限)。
Curator框架Znode和ACL API就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)