Zookeeper--06---Curator客户端的使⽤、zk的watch机制

Zookeeper--06---Curator客户端的使⽤、zk的watch机制,第1张

Zookeeper--06---Curator客户端的使⽤、zk的watch机制

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录
  • Zookeeper客户端(zkCli)的使⽤
      • Zookeeper--03--常用Shell命令
  • Curator客户端的使⽤
    • Curator介绍
    • 1.引⼊Curator
    • 2.创建节点
    • 3.获得节点数据
    • 4.修改节点数据
    • 5.创建节点同时创建⽗节点
    • 6.删除节点
  • zk的watch机制
    • 1.Watch机制介绍
      • 具体交互过程如下:
    • 2.监听器原理
    • 3.zkCli客户端使⽤watch
        • get -w /test ==⼀次性监听节点==
    • 4.curator客户端使⽤watch


Zookeeper客户端(zkCli)的使⽤
Zookeeper–03–常用Shell命令




Curator客户端的使⽤ Curator介绍
  • Curator是Netflix公司开源的⼀套zookeeper客户端框架,Curator是对Zookeeper⽀持最好 的客户端框架。
  • Curator封装了⼤部分Zookeeper的功能,⽐如Leader选举、分布式锁等,减少了技术⼈员在使⽤Zookeeper时的底层细节开发⼯作。
1.引⼊Curator

    org.apache.curator
    curator-framework
    2.12.0


    org.apache.curator
    curator-recipes
    2.12.0




    org.apache.zookeeper
    zookeeper
    3.7.14

application.properties配置⽂件

curator.retryCount=5
curator.elapsedTimeMs=5000
curator.connectString=172.16.253.35:2181
curator.sessionTimeoutMs=60000
curator.connectionTimeoutMs=5000

注⼊配置Bean

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class WrapperZK {

  private int retryCount;

  private int elapsedTimeMs;

  private String connectString;

  private int sessionTimeoutMs;

  private int connectionTimeoutMs;
}

注⼊Curatorframework

import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CuratorConfig {

    @Autowired
    WrapperZK wrapperZk;

    @Bean(initMethod = "start")
    public Curatorframework curatorframework() {
      return CuratorframeworkFactory.newClient(
        wrapperZk.getConnectString(),
        wrapperZk.getSessionTimeoutMs(),
        wrapperZk.getConnectionTimeoutMs(),
        new RetryNTimes(wrapperZk.getRetryCount(), wrapperZk.getElapsedTimeMs()));
    }
}
2.创建节点
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.logging.Logger;

@Slf4j
@SpringBootTest
class BootZkClientApplicationTests {

  @Autowired
  Curatorframework curatorframework;


  @Test
  void createNode() throws Exception {

    //添加持久节点
    String path = curatorframework.create().forPath("/curator-node");
    //添加临时序号节点
    String path1 = curatorframework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());
   
    System.out.println(String.format("curator create node :%s  successfully.",path));
    System.in.read();

  }

}

3.获得节点数据
  @Test
  public void testGetData() throws Exception {
    byte[] bytes = curatorframework.getData().forPath("/curator-node");
  
    System.out.println(new String(bytes));
  }
  
4.修改节点数据
  @Test
  public void testSetData() throws Exception {
    curatorframework.setData().forPath("/curator-node","changed!".getBytes());
    byte[] bytes = curatorframework.getData().forPath("/curator-node");
   
    System.out.println(new String(bytes));
  }

5.创建节点同时创建⽗节点
  @Test
  public void testCreateWithParent() throws Exception {
    String pathWithParent="/node-parent/sub-node-1";
    String path = curatorframework.create().creatingParentsIfNeeded().forPath(pathWithParent);
    
    System.out.println(String.format("curator create node :%s  successfully.",path));
  }

6.删除节点
  @Test
  public void testDelete() throws Exception {
    String pathWithParent="/node-parent";
    curatorframework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
  }
  
zk的watch机制 1.Watch机制介绍
  • 我们可以把 Watch 理解成是注册在特定 Znode 上的触发器。
  • 当这个 Znode 发⽣改变,也就是调⽤了 create , delete , setData ⽅法的时候,将会触发 Znode上注册的对应事件,请求 Watch 的客户端会接收到异步通知
具体交互过程如下:
  • 客户端调⽤ getData ⽅法, watch 参数是 true 。服务端接到请求,返回节点数据,并且在对应的哈希表⾥插⼊被 Watch 的 Znode 路径,以及 Watcher 列表。
  • 当被 Watch 的 Znode 已删除,服务端会查找哈希表,找到该 Znode 对应的所有Watcher,异步通知客户端,并且删除哈希表中对应的 Key-Value。

    客户端使⽤了NIO通信模式监听服务端的调⽤
2.监听器原理

3.zkCli客户端使⽤watch
get -w /test ⼀次性监听节点

4.curator客户端使⽤watch
  @Test
  public void addNodeListener() throws Exception {

    NodeCache nodeCache = new NodeCache(curatorframework, "/curator-node");
    nodeCache.getListenable().addListener(new NodeCacheListener() {
      @Override
      public void nodeChanged() throws Exception {
        log.info("{} path nodeChanged: ","/curator-node");
        printNodeData();
      }
    });

    nodeCache.start();

    System.in.read();

  }

  public void printNodeData() throws Exception {
    byte[] bytes = curatorframework.getData().forPath("/curator-node");
    log.info("data: {}",new String(bytes));
  }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5695876.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存