zookeeper实现配置中心

zookeeper实现配置中心,第1张

zookeeper实现配置中心

准备工作:zookeeper环境(单机、集群都可)

一、实现思路

因为zookeeper是通过zab协议实现的,能够保证数据在分布式系统中的一致性,可以实现配置中心的需求。

具体实现思路
    创建连接;通过watch去监听各个事件的发生,从而做出对应的 *** 作,如:监听到配置被修改,则同步修改本地服务的配置;通过回调函数去查看节点的状态、类型 和 结果。一般zookeeper会使用异步 *** 作,所以需要使用到回调函数来获取一些返回结果,如:新增一条配置,被监听到,在回调函数可以获得其节点的状态 和 新增的值。
watch流程图

分布式配置

二、具体实现 获取连接
public class ZkConfig {

	// 自己的zookeeper环境,可单机或集群,集群配置的地址通过逗号隔开
	//private static String addr = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181";
    private static String addr = "127.0.0.1:2181/testConfig";
    private static Integer timeOut = 5000;

    static CountDownLatch countDownLatch = new CountDownLatch(1);
	
    public static ZooKeeper getCon(){
        System.out.println("   获取连接 ");
        // 创建连接默认的watch
        DefortWatch defortWatch = new DefortWatch();
        defortWatch.setCountDownLatch(countDownLatch);
        ZooKeeper zooKeeper = null;
        try {
            zooKeeper = new ZooKeeper(addr,timeOut,defortWatch);
            // 解决异步问题,连接成功后再往下执行
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zooKeeper;
    }
}
准备连接默认的watch
public class DefortWatch implements Watcher {

    CountDownLatch countDownLatch;

    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    // 通过watch去监听事件,发现状态为 SyncConnected ,说明已连接,释放等待
    @Override
    public void process(WatchedEvent watchedEvent) {
        switch (watchedEvent.getState()) {
            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                System.out.println("》》》》》》 zookeeper node create.... 《《《《《《");
                countDownLatch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
            case Closed:
                break;
        }
    }
}
准备监听配置状态的watch和callback
public class WatchCallBack implements Watcher , AsyncCallback.StatCallback, AsyncCallback.DataCallback {

    private ZooKeeper zooKeeper;
    private CountDownLatch countDownLatch;
    private MyConf myConf;

    public MyConf getMyConf() {
        return myConf;
    }

    public void setMyConf(MyConf myConf) {
        this.myConf = myConf;
    }

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public void await(){
        // 判断节点是否存在,会触发watch 和 节点状态的回调
        zooKeeper.exists("/myconf",this,this,"test");
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // watcher
    // 根据监听zookeeper不同的命令,去做出对应的 *** 作
    @Override
    public void process(WatchedEvent watchedEvent) {
        switch (watchedEvent.getType()) {
            case None:
                break;
            case NodeCreated:
            	// getData 会触发 watch 和 DataCallback
            	// 创建节点,通过DataCallback刚创建的配置同步到本地
                System.out.println("========== 创建myconf节点 ==========");
                zooKeeper.getData("/myconf",this,this,"test");
                break;
            case NodeDeleted:
                // 容错性
                System.out.println("========== 删除myconf节点 ==========");
                myConf.setConfVal("");
                countDownLatch = new CountDownLatch(1);
                break;
            case NodeDataChanged:
            	// 修改节点,通过DataCallback刚创建的配置同步到本地
                System.out.println("========== 修改myconf节点 ==========");
                zooKeeper.getData("/myconf",this,this,"test");
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

    //stateCallback
    @Override
    public void processResult(int i, String s, Object o, Stat stat) {
        if(stat != null){
            zooKeeper.getData("/myconf",this,this,"test");
        }else{
            System.out.println("》》》》 this node is not exist 《《《《");
        }
    }

    //dataCallback
    @Override
    public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) 			    {
        if(bytes != null){
            String data = new String(bytes);
            myConf.setConfVal(data);
            countDownLatch.countDown();
        }
    }
}
本地配置类
public class MyConf {
	
    String confVal;

    public String getConfVal() {
        return confVal;
    }

    public void setConfVal(String confVal) {
        this.confVal = confVal;
    }
}
测试类
public class ZkTest {

    ZooKeeper zooKeeper;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void before(){
        zooKeeper = ZkConfig.getCon();

    }

    public void after(){
        try {
            zooKeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void test(){
        // 获取连接
        this.before();
		
        WatchCallBack watchCallBack = new WatchCallBack();
        MyConf myConf = new MyConf();
		
		// 将 需要的属性 传递过去
        watchCallBack.setZooKeeper(zooKeeper);
        watchCallBack.setCountDownLatch(countDownLatch);
        watchCallBack.setMyConf(myConf);
		
		// 去watch数据
        watchCallBack.await();

		// 测试
        while (true){
            try {
            	// 模拟: 在监听事件时,节点被删除,则将本地配置设置为"",以此判断被删除
                if("".equals(myConf.getConfVal())){
                    System.out.println("is delete ");
                    countDownLatch.await();
                }else{
                    System.out.println("my confVal :::"+myConf.getConfVal());
                }
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}
测试结果

》》》》》》 zookeeper node create.... 《《《《《《
========== 创建myconf节点 ==========
my confVal :::111

========== 修改myconf节点 ==========
my confVal :::222

========== 删除myconf节点 ==========
is delete 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716672.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存