Curator-framework是ZooKeeper Client更高的抽象API,最佳核心的功能就是自动连接管理:
- 当ZooKeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明监控节点数据变化事件NodeDataChanged,需要时调用updateServerList()方法Curator recipes自动移除监控
目前Curator有2.x.x和3.x.x两个系列的版本,支持不同版本的Zookeeper。其中Curator 2.x.x兼容Zookeeper的3.4.x和3.5.x。而Curator 3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性。
更加清晰的API简化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口,提供Recipes实现 : 选举,共享锁, 路径cache, 分布式队列,分布式优先队列等。
maven配置依赖事务管理org.apache.curator curator-recipes2.12.0
public void testTransaction() throws Exception{ //定义几个基本 *** 作 CuratorOp createOp = client.transactionOp().create() .forPath("/curator/one_path","some data".getBytes()); CuratorOp setDataOp = client.transactionOp().setData() .forPath("/curator","other data".getBytes()); CuratorOp deleteOp = client.transactionOp().delete() .forPath("/curator"); //事务执行结果 List监听器results = client.transaction() .forOperations(createOp,setDataOp,deleteOp); //遍历输出结果 for(CuratorTransactionResult result : results){ System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType()); } } //因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚
Curator提供了三种Watcher(Cache)来监听结点的变化:
Path Cache:监视一个路径下
1)孩子结点的创建2)删除3)以及结点数据的更新。
产生的事件会传递给注册的PathChildrenCacheListener。
Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
ExecutorService pool = Executors.newFixedThreadPool(2); final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false); nodeCache.start(true); nodeCache.getListenable().addListener( new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("Node data is changed, new data: " + new String(nodeCache.getCurrentData().getData())); }}, pool); final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true); childrenCache.start(StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener( new PathChildrenCacheListener() { @Override public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED: " + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED: " + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED: " + event.getData().getPath()); break; default: break; } } }, pool ); client.setData().forPath("/zk-huey/cnode", "world".getBytes()); Thread.sleep(10 * 1000); pool.shutdown(); client.close();分布式锁思路
最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。
下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,每次获得锁时会生成这种串,释放锁时清空数据。
import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; public class CuratorDistrLockTest { private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk Curatorframework client = CuratorframeworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(Curatorframework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }Leader选举
当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。
import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; public class CuratorLeaderTest { private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(Curatorframework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(Curatorframework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk Curatorframework client = CuratorframeworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } }
参考资料注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
https://www.cnblogs.com/qingyunzong/p/8666288.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)