使用的项目构建工具为Maven,使用坐标如下:
org.apache.curator curator-recipes2.13.0 org.apache.curator curator-framework2.13.0 com.google.collections google-collections1.0 org.slf4j slf4j-api1.7.25 junit junit4.12 test org.apache.maven.plugins maven-compiler-plugin3.2 1.8 1.8 UTF-8
Curator包含的几个包:
- curator-framework:对zookeeper的底层api的一些封装curator-client:提供一些客户端的 *** 作,例如重试策略等urator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
使用静态方法创建客户端
RetryPolicy retry = new ExponentialBackoffRetry(1000, 1); Curatorframework client = CuratorframeworkFactory.newClient(connect, 5000, 2000, retry);
newClient包含四个主要的参数:
Untitled
使用功Fluent风格的Api创建会话
Curatorframework client = CuratorframeworkFactory .builder() .connectString(connect) .sessionTimeoutMs(5000) .connectionTimeoutMs(3000) .retryPolicy(retry) .build();
创建包含隔离命名空间的会话
为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径,例如(下面的例子)当客户端指定了独立命名空间为“/mybase”,那么该客户端对Zookeeper上的数据节点的 *** 作都是基于该目录进行的
RetryPolicy retry = new ExponentialBackoffRetry(1000, 1); Curatorframework client = CuratorframeworkFactory .builder() .connectString(connect) .sessionTimeoutMs(5000) .connectionTimeoutMs(3000) .retryPolicy(retry) .namespace("mybase") .build();
启动客户端
当创建会话成功,得到client实例然后可以直接调用其的start()方法打开客户端
client.start();
数据节点 *** 作
创建数据节点
Zookeeper的节点创建模式:
- PERSISTENT:持久化PERSISTENT_SQUENTIAL:持久化并带有序列号EPHEMERAL:临时EPHEMERAL_SQUENTIAL:临时并带有序列号
创建一个节点,初始化内容为空
client.create().forPath("path"); //如果没有指定节点属性,节点创建模式默认为持久化节点,内容默认为空
创建一个节点,附带初始化内容
client.create().forPath("path","init".getBytes());
创建一个节点,指定创建模式(临时节点),内容为空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
创建一个节点,指定创建模式(临时节点),附带初始化内容
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes);
创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点。
client.create() .creatingParentContainersIfNeeded() .withMode(CraeteMOde.EPHEMERAL) .forPath("path","init".getBytes);
这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
删除数据节点
删除一个节点
client.delete().forPath("path"); //注意:此方法只能删除叶子节点,否则会抛出异常
删除一个节点,并且递归删除其他所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("path);
删除一个节点,强制指定版本进行删除
client.delete().withVersion(10086).forPath("path");
删除一个节点,强制保证删除
client.delete().guaranteed().forPath("path");
guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除 *** 作,直到删除节点成功。
读取数据节点数据
读取一个数据节点的数据内容
client.getData().forPath("path");
注意,此方法返回值是byte[]
读取一个节点的数据内容,同时获取到该节点的stat
Stat stat=new Stat(); Client.getData().storingStatIn(stat).forPath("path");
更新数据节点
更新一个节点的数据内容
client.setData().forPath("path","data".getBytes());
注意:该接口会返回一个Stat实例
更新一个节点的数据内容,强制指定版本进行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
检查节点时候存在
client.checkExists().forPath("path");
注意:该方法返回一个Stat实例,用于检查Znode是否存在的 *** 作,可以协调额外的方法监控或者后台处理)并在最后调用forPath()指定要 *** 作的Znode
获取某个节点的所有子节点路径
client.getChildren.forPath("path");
注意:该方法的返回值为List 事务 Curatorframework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等 *** 作然后调用commit()作为一个原子 *** 作提交 异步接口 创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息 响应吗(#getResultCode()) 异步节点创建法: 注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。 实验democlient.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorframework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("path");
private final String connect="192.168.123.72:2180,192.168.123.73:2180,192.168.123.74:2180";
@Test
public void writeZookeeperAPITest() throws Exception {
RetryPolicy retry = new ExponentialBackoffRetry(1000, 1);
Curatorframework client = CuratorframeworkFactory
.builder()
.connectString(connect)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(retry)
.namespace("mybase")
.build();
client.start();
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/zoomdem"+new SimpleDateFormat("hh:mm:ss").format(new Date()),new SimpleDateFormat("yyyy-MM-dd").format(new Date())
.getBytes());
client.close();
}
@Test
public void readZookeeperAPITest() throws Exception {
RetryPolicy Retry = new ExponentialBackoffRetry(2000,3);
Curatorframework client
= CuratorframeworkFactory
.builder()
.connectString(connect)
.sessionTimeoutMs(3000)
.connectionTimeoutMs(4000)
.namespace("thisday15")
.retryPolicy(Retry)
.build();
client.start();
byte[] thisday15s = client.getData().forPath("/hellozoom");
System.out.println(new String(thisday15s));
client.close();
}
@Test
public void deleteZookeeperAPITest() throws Exception{
RetryPolicy Retry = new ExponentialBackoffRetry(5000, 2200);
Curatorframework client = CuratorframeworkFactory
.builder()
.namespace("thisday15")
.retryPolicy(Retry)
.connectString(connect)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.build();
client.start();
System.out.println(client.checkExists().forPath("/hellozoom").toString());
client.delete().forPath("/hellozoom");
System.out.println(client.checkExists().forPath("/hellozoom").toString());
client.close();
}
@Test
public void createZookeeperAPITest(){
RetryPolicy Retry = new ExponentialBackoffRetry(5000, 5000);
Curatorframework client = CuratorframeworkFactory
.builder()
.connectionTimeoutMs(5000)
.sessionTimeoutMs(3000)
.connectString(connect)
.retryPolicy(Retry)
.build();
client.start();
try {
client
.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/code3","init0".getBytes());
} catch (Exception e) {
e.printStackTrace();
}finally {
client.close();
}
}
@Test
public void backgroundZookeeperAPITest(){
RetryPolicy retry = new ExponentialBackoffRetry(5000, 5000);
// 创建异步线程池
Executor executor = Executors.newFixedThreadPool(2);
Curatorframework client = CuratorframeworkFactory
.builder()
.connectString(connect)
.retryPolicy(retry)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.build();
client.start();
try {
client
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.inBackground(((curatorframework, curatorEvent) -> {
System.out.println(String.format("eventType:%s;\n eventResult:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
}),executor)
.forPath("/code888","init889".getBytes());
} catch (Exception e) {
e.printStackTrace();
}finally {
client.close();
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)