https://www.bilibili.com/video/BV1bi4y1R78E?from=search&seid=15041406188452749491&spm_id_from=333.337.0.0
关注公众号 学习更多精彩课程 使用三大核心语言Java、Python、Golang玩转zookeeper 使用Java原生api *** 作zookeeper 创建一个maven项目创建一个maven项目
添加项目依赖org.apache.zookeeper zookeeper3.4.8
实现代码https://mvnrepository.com/
package com.duoke360; import org.apache.zookeeper.*; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class Test { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String connStr = "192.168.18.128:2181"; CountDownLatch countDown = new CountDownLatch(1); Watcher watcher= event -> { if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.err.println("eventType:"+event.getType()); if(event.getType()== Watcher.Event.EventType.None){ countDown.countDown(); }else if(event.getType()== Watcher.Event.EventType.NodeCreated){ System.out.println("listen:节点创建"); }else if(event.getType()== Watcher.Event.EventType.NodeChildrenChanged){ System.out.println("listen:子节点修改"); } } }; ZooKeeper zookeeper = new ZooKeeper(connStr, 5000,watcher); countDown.await(); //注册监听,每次都要重新注册,否则监听不到 // 先创建一个根节点root zookeeper.exists("/root/ghz", watcher); // 创建节点 String result = zookeeper.create("/root/ghz", "老郭".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(result); Thread.sleep(10); // 获取节点 byte[] bs = zookeeper.getData("/root/ghz", true, null); result = new String(bs); System.out.println("创建节点后的数据是:" + result); // 修改节点 zookeeper.setData("/root/ghz", "多课网-老郭".getBytes(), -1); Thread.sleep(10); bs = zookeeper.getData("/root/ghz", true, null); result = new String(bs); System.out.println("修改节点后的数据是:" + result); // 删除节点 zookeeper.delete("/root/ghz", -1); System.out.println("节点删除成功"); } }运行结果
eventType:None eventType:NodeCreated listen:节点创建 /root/ghz 创建节点后的数据是:老郭 eventType:NodeDataChanged 修改节点后的数据是:多课网-老郭 节点删除成功 eventType:NodeDeleted使用java zkclient库 *** 作zookeeper.md 创建一个maven项目
创建一个maven项目
添加依赖实现代码com.101tec zkclient0.10
package com.duoke360; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.*; import java.util.List; public class Test { public static void main(String[] args) throws InterruptedException { String connStr = "192.168.18.128:2181"; ZkClient zk = new ZkClient(connStr); // 注册【数据】事件 zk.subscribeDataChanges("/root/ghz", new IZkDataListener() { @Override public void handleDataDeleted(String arg0) throws Exception { System.err.println("数据删除:" + arg0); } @Override public void handleDataChange(String arg0, Object arg1) throws Exception { System.err.println("数据修改:" + arg0 + "------" + arg1); } }); zk.subscribeChildChanges("/root", (arg0, arg1) -> { System.err.println("子节点发生变化:" + arg0); arg1.forEach(f -> { System.out.println("content:" + f); }); }); List运行结果list = zk.getChildren("/"); list.forEach(e -> { System.out.println(e); }); String res = zk.create("/root/ghz", "多课网-老郭", CreateMode.PERSISTENT); System.out.println("创建节点/root/ghz成功:" + res); zk.writeData("/root/ghz", "多课网-zookeeper"); System.out.println("修改节点/root/ghz数据成功"); res = zk.readData("/root/ghz"); System.out.println("节点数据:" + res); Thread.sleep(1000); zk.delete("/root/ghz"); System.out.println("删除节点/root/ghz成功"); Thread.sleep(1000); System.out.println("------------------------------------------------"); for (int i = 0; i < 10; i++) { zk.create("/root/ghz", "多课网-老郭", CreateMode.PERSISTENT); Thread.sleep(1000); zk.delete("/root/ghz"); Thread.sleep(1000); } } }
node20000000002 zookeeper node10000000001 root 创建节点/root/ghz成功:/root/ghz 修改节点/root/ghz数据成功 节点数据:多课网-zookeeper content:ghz 数据修改:/root/ghz------多课网-zookeeper 子节点发生变化:/root 数据修改:/root/ghz------多课网-zookeeper 删除节点/root/ghz成功 数据删除:/root/ghz 子节点发生变化:/root ------------------------------------------------ 数据修改:/root/ghz------多课网-老郭 子节点发生变化:/root content:ghz 数据删除:/root/ghz 子节点发生变化:/root 数据修改:/root/ghz------多课网-老郭 子节点发生变化:/root content:ghz 数据删除:/root/ghz 子节点发生变化:/root 数据修改:/root/ghz------多课网-老郭 子节点发生变化:/root使用Java curator库 *** 作zookeeper 创建一个maven项目
创建一个maven项目
添加依赖实现代码org.apache.curator curator-recipes4.2.0
package com.duoke360; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class Test { public static void main(String[] args) throws Exception { String connStr = "192.168.18.128:2181"; Curatorframework cur= CuratorframeworkFactory.builder() .connectString(connStr) .connectionTimeoutMs(5000) .retryPolicy(new ExponentialBackoffRetry(1000,3)) .build(); cur.start();//连接 //创建监听 PathChildrenCache cache=new PathChildrenCache(cur,"/root",true); cache.start(); cache.rebuild(); cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(Curatorframework framwork, PathChildrenCacheEvent event) throws Exception { System.err.println("节点发生变化:"+event.getType()); } }); Stat stat=cur.checkExists().forPath("/root/ghz"); if(stat!=null){ System.out.println("/root/ghz 节点存在,直接删除"); cur.delete().forPath("/root/ghz"); } System.in.read(); System.out.println("准备创建 /root/ghz"); cur.create().withMode(CreateMode.PERSISTENT) .forPath("/root/ghz", "多课网-老郭".getBytes()); System.out.println("节点 /root/ghz 创建成功"); Thread.sleep(1000); byte[] bs=cur.getData().forPath("/root/ghz"); System.out.println("数据:"+new String(bs)); Thread.sleep(1000); cur.delete().forPath("/root/ghz"); Thread.sleep(1000); } }运行结果
test 准备创建 /root/ghz 节点 /root/ghz 创建成功 节点发生变化:CHILD_ADDED 数据:多课网-老郭 节点发生变化:CHILD_REMOVED使用Python *** 作zookeeper 安装
pip install kazoo连接 ZooKeeper
可通过 KazooClient 类直接连接 ZooKeeper ,支持多个 host ,端口默认 2181。
import json from kazoo.client import KazooClient zk = KazooClient(hosts='192.168.18.128:2181') zk.start()创建节点
先看下 create() 方法定义
def create(self, path, value=b"", acl=None, ephemeral=False, sequence=False, makepath=False): :param path: Path of node. :param value: Initial bytes value of node. :param acl: :class:`~kazoo.security.ACL` list. :param ephemeral: Boolean indicating whether node is ephemeral (tied to this session). :param sequence: Boolean indicating whether path is suffixed with a unique index. :param makepath: Whether the path should be created if it doesn't exist.
我们来解释下这些参数:
- path: 节点路径
- value: 节点对应的值,注意值的类型是 bytes
- ephemeral: 若为 True 则创建一个临时节点,session 中断后自动删除该节点。默认 False
- sequence: 若为 True 则在你创建节点名后面增加10位数字(例如:你创建一个 testplatform/test 节点,实际创建的是 testplatform/test0000000003,这串数字是顺序递增的)。默认 False
- makepath: 若为 False 父节点不存在时抛 NoNodeError。若为 True 父节点不存在则创建父节点。默认 False
实例:
from kazoo.client import KazooClient zk = KazooClient(hosts='192.168.18.128:2181') zk.start() # 创建节点:makepath 设置为 True ,父节点不存在则创建,其他参数不填均为默认 zk.create('/root/ghz',b'',makepath=True) # *** 作完后,别忘了关闭zk连接 zk.stop()查看节点
KazooClient 类用提供 get_children() 和 get() 方法获取 子节点 和 节点对应的值
from kazoo.client import KazooClient zk = KazooClient(hosts='192.168.18.128:2181') zk.start() # 获取某个节点下所有子节点 node = zk.get_children('/root') # 获取某个节点对应的值 value = zk.get('/root/ghz') # *** 作完后,别忘了关闭zk连接 zk.stop() print(node,value)更改节点
更改上文创建的 node 值,使用 set() 方法
from kazoo.client import KazooClient zk = KazooClient(hosts='192.168.18.128:2181') zk.start() # 更改节点对应的value zk.set('/root/ghz', b'duoke-ghz') # 获取某个节点对应的值 value = zk.get('/root/ghz') zk.stop() print(value)删除节点
删除上文创建的节点,使用 delete() 方法
from kazoo.client import KazooClient zk = KazooClient(hosts='192.168.18.128:2181') zk.start() # 删除节点对应的value zk.delete('/root/ghz',recursive=False) zk.stop()
watches 事件参数 recursive:若为 False,当需要删除的节点存在子节点,会抛异常 NotEmptyError 。若为True,则删除 此节点 以及 删除该节点的所有子节点
zookeeper 所有读 *** 作都有设置 watch 选项(get_children() 、get() 和 exists())。watch 是一个触发器,当检测到 zookeeper 有子节点变动 或者 节点value发生变动时触发。下面以 get() 方法为例。
from kazoo.client import KazooClient zk = KazooClient(hosts='192.168.18.128:2181') zk.start() def test(event): print('触发事件') if __name__ == "__main__": # 节点必须存在,否则保存 zk.get('/root/ghz', watch=test) print("第一次获取value") zk.set('/root/ghz', b'duoke-ghz') zk.get('/root/ghz', watch=test) zk.set('/root/ghz', b'duoke-ghz2') print("第二次获取value") zk.stop()运行结果
第一次获取value 触发事件 第二次获取value使用golang *** 作zookeeper 下载包
go get github.com/samuel/go-zookeeper连接到Server
package main import ( "fmt" "time" "github.com/samuel/go-zookeeper/zk" ) func conn() *zk.Conn { var hosts = []string{"192.168.18.128:2181"} conn, _, err := zk.Connect(hosts, time.Second*5) defer conn.Close() if err != nil { fmt.Println(err) return nil } else { fmt.Println("连接成功!") return conn } } func main() { conn() }创建节点
func create() { var conn *zk.Conn = conn() defer conn.Close() var path = "/home" var data = []byte("多课网") var flags int32 = 0 //flags有4种取值: //0:永久,除非手动删除 //zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除 //zk.FlagSequence = 2:会自动在节点后面添加序号 //3:Ephemeral和Sequence,即,短暂且自动添加序号 var acls = zk.WorldACL(zk.PermAll) //控制访问权限模式 p, err_create := conn.Create(path, data, flags, acls) if err_create != nil { fmt.Println(err_create) return } fmt.Println("create:", p) }修改节点
func set() { var conn *zk.Conn = conn() defer conn.Close() var path = "/home" var data = []byte("多课网-老郭") conn.Set(path, data, -1) b, _, _ := conn.Get(path) fmt.Println("数据:" + string(b)) }删除节点
func del() { var conn *zk.Conn = conn() defer conn.Close() var path = "/home" err := conn.Delete(path, -1) if err != nil { fmt.Println("删除失败!") } else { fmt.Println("删除成功!") } }watch
func callback(event zk.Event) { fmt.Println("*******************") fmt.Println("path:", event.Path) fmt.Println("type:", event.Type.String()) fmt.Println("state:", event.State.String()) fmt.Println("*******************") } func watch() { var hosts = []string{"192.168.18.128:2181"} option := zk.WithEventCallback(callback) conn, _, err := zk.Connect(hosts, time.Second*5, option) defer conn.Close() if err != nil { fmt.Println(err) return } var path = "/home" _, _, _, err = conn.ExistsW(path) if err != nil { fmt.Println(err) return } // 创建 create(conn) time.Sleep(time.Second * 2) _, _, _, err = conn.ExistsW(path) if err != nil { fmt.Println(err) return } // 删除 del(conn) }关注公众号 学习更多精彩课程
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)