一、实现思路准备工作:zookeeper环境(单机、集群都可)
因为zookeeper是通过zab协议实现的,能够保证数据在分布式系统中的一致性,可以实现配置中心的需求。
具体实现思路- 创建连接;通过watch去监听各个事件的发生,从而做出对应的 *** 作,如:监听到配置被修改,则同步修改本地服务的配置;通过回调函数去查看节点的状态、类型 和 结果。一般zookeeper会使用异步 *** 作,所以需要使用到回调函数来获取一些返回结果,如:新增一条配置,被监听到,在回调函数可以获得其节点的状态 和 新增的值。
public class ZkConfig { // 自己的zookeeper环境,可单机或集群,集群配置的地址通过逗号隔开 //private static String addr = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181"; private static String addr = "127.0.0.1:2181/testConfig"; private static Integer timeOut = 5000; static CountDownLatch countDownLatch = new CountDownLatch(1); public static ZooKeeper getCon(){ System.out.println(" 获取连接 "); // 创建连接默认的watch DefortWatch defortWatch = new DefortWatch(); defortWatch.setCountDownLatch(countDownLatch); ZooKeeper zooKeeper = null; try { zooKeeper = new ZooKeeper(addr,timeOut,defortWatch); // 解决异步问题,连接成功后再往下执行 countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } return zooKeeper; } }准备连接默认的watch
public class DefortWatch implements Watcher { CountDownLatch countDownLatch; public CountDownLatch getCountDownLatch() { return countDownLatch; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } // 通过watch去监听事件,发现状态为 SyncConnected ,说明已连接,释放等待 @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: System.out.println("》》》》》》 zookeeper node create.... 《《《《《《"); countDownLatch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } } }准备监听配置状态的watch和callback
public class WatchCallBack implements Watcher , AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private CountDownLatch countDownLatch; private MyConf myConf; public MyConf getMyConf() { return myConf; } public void setMyConf(MyConf myConf) { this.myConf = myConf; } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } public CountDownLatch getCountDownLatch() { return countDownLatch; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void await(){ // 判断节点是否存在,会触发watch 和 节点状态的回调 zooKeeper.exists("/myconf",this,this,"test"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // watcher // 根据监听zookeeper不同的命令,去做出对应的 *** 作 @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case None: break; case NodeCreated: // getData 会触发 watch 和 DataCallback // 创建节点,通过DataCallback刚创建的配置同步到本地 System.out.println("========== 创建myconf节点 =========="); zooKeeper.getData("/myconf",this,this,"test"); break; case NodeDeleted: // 容错性 System.out.println("========== 删除myconf节点 =========="); myConf.setConfVal(""); countDownLatch = new CountDownLatch(1); break; case NodeDataChanged: // 修改节点,通过DataCallback刚创建的配置同步到本地 System.out.println("========== 修改myconf节点 =========="); zooKeeper.getData("/myconf",this,this,"test"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; case PersistentWatchRemoved: break; } } //stateCallback @Override public void processResult(int i, String s, Object o, Stat stat) { if(stat != null){ zooKeeper.getData("/myconf",this,this,"test"); }else{ System.out.println("》》》》 this node is not exist 《《《《"); } } //dataCallback @Override public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) { if(bytes != null){ String data = new String(bytes); myConf.setConfVal(data); countDownLatch.countDown(); } } }本地配置类
public class MyConf { String confVal; public String getConfVal() { return confVal; } public void setConfVal(String confVal) { this.confVal = confVal; } }测试类
public class ZkTest { ZooKeeper zooKeeper; private CountDownLatch countDownLatch = new CountDownLatch(1); public void before(){ zooKeeper = ZkConfig.getCon(); } public void after(){ try { zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void test(){ // 获取连接 this.before(); WatchCallBack watchCallBack = new WatchCallBack(); MyConf myConf = new MyConf(); // 将 需要的属性 传递过去 watchCallBack.setZooKeeper(zooKeeper); watchCallBack.setCountDownLatch(countDownLatch); watchCallBack.setMyConf(myConf); // 去watch数据 watchCallBack.await(); // 测试 while (true){ try { // 模拟: 在监听事件时,节点被删除,则将本地配置设置为"",以此判断被删除 if("".equals(myConf.getConfVal())){ System.out.println("is delete "); countDownLatch.await(); }else{ System.out.println("my confVal :::"+myConf.getConfVal()); } Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }测试结果
》》》》》》 zookeeper node create.... 《《《《《《 ========== 创建myconf节点 ========== my confVal :::111
========== 修改myconf节点 ========== my confVal :::222
========== 删除myconf节点 ========== is delete
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)