分布式项目的配置总管理处
1.2 分布式锁
对于分布式项目修改共享数据时加入锁管理(同一时间只能有一个服务对数据进更改)
1.3 集群管理最常见的功能,作为注册中心使用.
2.zookeeper命令 *** 作 2.1 zookeeper数据模型
-
Zookeeper是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构
-
这里面的每一个结点都被称为ZNode,每个节点都会保存自己的数据和节点信息
-
节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下
-
节点可以分为四大类
-
PERSISTENT 持久化节点
-
EPHEMRAL 临时节点 : -e
-
PERSISTENT_SENQUENTIAL 持久化顺序节点 : -s
-
EPHEMRAL_SEQUENTIAL 临时顺序节点 : -es
-
2.2 zookeeper 服务端常用命令
在安装目录的bin目录下
-
启动Zookeeper服务 : ./zkServer.sh start
-
查看Zookeeper服务状态 : ./zkServer.sh statu
-
停止Zookeeper服务 : ./zkServer.sh stop
-
重启Zookeeper服务 : ./zkServer.sh restart
-
连接ZooKeeper服务端
-
./zkCli.sh -server ip:port
-
-
断开客户端连接
-
quit
-
-
设置节点的值
-
set /节点path value
-
-
查看帮助命令
-
help
-
-
删除单个节点
-
delete /节点path
-
-
显示指定目录下的节点
-
ls 目录
-
-
删除带有子节点的节点
-
deleteall /节点path
-
-
创建节点
-
create /节点path value
-
-
获取节点的值
-
get /节点path
-
-
创建临时节点
-
create -e /节点path value
-
-
创建顺序节点
-
create -s /节点path value
-
-
查询节点详细信息
-
ls -s /节点path
-
-
节点详细信息
-
czxid : 节点被创建的事务ID
-
dataversion : 数据版本号
-
ctime : 创建时间
-
aclversion : 权限版本号
-
mzxid : 最后一次被更新的事务ID
-
ephemeralOwner : 用于临时节点 ,代表临时节点的事务ID,如果为持久节点则为0
-
pzxid : 子节点列表最后一次被更新的事务ID
-
dataLength : 节点存储的数据长度
-
cversion : 子节点的版本号
-
numChildren : 当前节点的子节点数
-
-
Curator是Apache ZooKeeper的Java客户端库
-
建立连接
-
添加节点
-
删除节点
-
修改节点
-
查询节点
-
Watch时间监听
-
分布式锁实现
//重试策略1 每间隔x一共重试x次 每隔3秒连接一次一共连接10次 RetryPolicy r = new ExponentialBackoffRetry(3000, 10); //第一种方式 Curatorframework zookeeperClient = CuratorframeworkFactory.newClient("116.62.71.234:2181", 60 * 1000, 15 * 1000, r); //开启连接 zookeeperClient.start();4.1.2 方式二 : 链式编程
//第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start();
-
namespace : 创建根节点itheima(为了方便,不用之后每次进行客户端 *** 作都写/根节点path)
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); String path = zookeeperClient.create().forPath("/app1"); System.out.println(path); //关闭连接 zookeeperClient.close(); }4.2.2 创建节点带有数据
-
如果没有节点信息,则信息默认为客户端ip
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); String path = zookeeperClient.create().forPath("/app2","hehe".getBytes()); System.out.println(path); //关闭连接 zookeeperClient.close(); }4.2.3 创建节点,设置节点类型
-
默认为持久化类型
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); String path = zookeeperClient.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","hehe".getBytes()); System.out.println(path); //关闭连接 app3就会被释放 zookeeperClient.close(); }4.2.4 创建多级节点
-
ZooKeeper如果父节点不存在无法创建子节点
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); String path = zookeeperClient.create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes()); System.out.println(path); //关闭连接 app3就会被释放 zookeeperClient.close(); }4.3 查询节点 4.3.1 查询节点数据
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); byte[] bytes = zookeeperClient.getData().forPath("/app1"); System.out.println(new String(bytes)); //关闭连接 zookeeperClient.close(); }4.3.2 查询节点子节点
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); List4.3.3 查询节点详细信息 2.3详见详细信息strings = zookeeperClient.getChildren().forPath("/app1"); //关闭连接 zookeeperClient.close(); }
-
返回的结果需要封装到一个JavaBean中
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); Stat status = new Stat(); zookeeperClient.getData().storingStatIn(status).forPath("/app1"); //关闭连接 zookeeperClient.close(); }4.4 修改节点信息 4.4.1 基本修改
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); zookeeperClient.setData().forPath("/app1","itcast".getBytes()); //关闭连接 zookeeperClient.close(); }4.4.2 根据版本修改节点信息(推荐)
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); Stat status = new Stat(); zookeeperClient.getData().storingStatIn(status).forPath("/app1"); //查询出版本 int version = status.getVersion(); System.out.println(version); zookeeperClient.setData().withVersion(3).forPath("/app1","itcast".getBytes()); //关闭连接 zookeeperClient.close(); }4.5 删除节点 4.5.1 基本删除
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); zookeeperClient.delete().forPath("/app1"); //关闭连接 zookeeperClient.close(); }4.5.2 带有子节点的递归删除
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); zookeeperClient.delete().deletingChildrenIfNeeded().forPath("/app1"); //关闭连接 zookeeperClient.close(); }4.5.3 必须删除成功
-
guaranteed();
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); zookeeperClient.delete().guaranteed().forPath("/app1"); //关闭连接 zookeeperClient.close(); }4.5.4 删除回调
-
inBackground(()->{})
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); zookeeperClient.delete().guaranteed().inBackground(new BackgroundCallback(){ @Override public void processResult(Curatorframework client, CuratorEvent event) throws Exception { System.out.println("我被删除了"); System.out.println(event); } }).forPath("/app1"); //关闭连接 zookeeperClient.close(); }5.Java API 高级 *** 作 5.1 Watch事件监听 5.1.1 基本概念
-
ZooKeeper允许用户在指定节点上注册一些Watchr,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制时ZooKeeper实现分布式协调服务的重要特性.
-
ZooKeeper中引入了Watcher机制来实现了发布/订阅功能.能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化的时候,会通知所有订阅者.
-
ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐.
-
Curator引入了Cache来实现对ZooKeeper服务端事件的监听.
-
ZooKeeper提供了三种Watcher;
-
NodeCache : 只是监听了某一特定节点
-
PathChildrenCache : 监控一个ZNode的子节点
-
TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache组合.
-
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); //1.创建NodeCache对象 NodeCache nodeCache = new NodeCache(zookeeperClient, "/app1"); //2.注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了"); //获取修改节点后的数据 byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //3.开启监听,如果设置为true,则开启监听时,加载缓存数据 nodeCache.start();; //while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示 //关闭连接 zookeeperClient.close(); }5.1.3 PathChildrenCache
-
参数event可以获得修改的详细信息
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); //1.创建PathChildrenCache对象 PathChildrenCache p = new PathChildrenCache(zookeeperClient,"/app2",true); //2.注册监听 p.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception { System.out.println("子节点发送变化了"); System.out.println(event); //监听子节点的数据变更,并且拿到变更后的数据 //1.获取类型 PathChildrenCacheEvent.Type type = event.getType(); //2.判断类型是否为update if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ System.out.println("数据变了"); byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); //3.开启监听,如果设置为true,则开启监听时,加载缓存数据 p.start(); //while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示 //关闭连接 zookeeperClient.close(); }5.1.4 TreeCache
public static void main(String[] args) throws Exception { //第二种连接方式:链式编程 Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.234:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). namespace("itheima").build(); //开启连接 zookeeperClient.start(); //1.创建NodeCache对象 TreeCache treeCache = new TreeCache(zookeeperClient, "/app1"); //2.注册监听 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(Curatorframework client, TreeCacheEvent event) throws Exception { System.out.println("结点变化了"); System.out.println(event); } }); //3.开启监听,如果设置为true,则开启监听时,加载缓存数据 treeCache.start(); //关闭连接 zookeeperClient.close(); //while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示 }6.分布式锁 6.1 分布式锁
-
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者lock的方式来解决多线程见的代码同步问题,这时多线程的运行都是运行在同一 JVM下,没有任何问题
-
但当我们的应用时分布式集群工作的情况下,属于JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题
-
那么久需要一种更加高级的锁机制,来处理跨机器的进程之间的数据同步问题---这就是分布式锁.
-
核心思想 : 当客户端要获取锁,则创建节点,使用完锁则删除该节点
-
客户端获取锁时,在lock节点下创建临时顺序节点
-
然后分别获取lock下面的所有子节点,客户端获取到所有子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁.
-
如果发现自己创建的节点并非lock所有子节点中最小的,说明还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件.
-
如果发现比自己小得那个节点被删除,则客户端的Watcher会受到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取比自己小的一个节点并注册监听.
-
-
在Curator中有五种锁方案
-
InterProcessSemaphoreMutex : 分布式排它锁 (非可重入锁)
-
InterProcessMutex : 分布式可重入排它锁
-
InterProcessReadWriteLock : 分布式读写锁
-
InterProcessMultiLock : 将多个锁作为单个实体管理的容器
-
InterProcessSemaphoreV2 : 共享信号量
-
Runnable
public class Ticket12306 implements Runnable { private int tickets = 10; //第二种连接方式:链式编程 private static Curatorframework zookeeperClient = CuratorframeworkFactory.builder(). connectString("116.62.71.235:2181"). sessionTimeoutMs(60 * 1000). connectionTimeoutMs(15 * 1000). retryPolicy(new ExponentialBackoffRetry(3000, 10)). build(); private static InterProcessMutex lock = new InterProcessMutex(zookeeperClient,"/lock") ; @Override public void run() { while (true){ //获取锁 try { lock.acquire(3, TimeUnit.SECONDS); if(tickets > 0){ System.out.println(Thread.currentThread().getName()+":"+tickets); Thread.sleep(100); tickets--; } } catch (Exception e) { e.printStackTrace(); }finally { //释放锁 try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
Main
public class Lock12306 { public static void main(String[] args) { Ticket12306 t = new Ticket12306(); Thread t1 = new Thread(t,"携程"); Thread t2 = new Thread(t,"飞猪"); t1.start(); t2.start(); } }7 ZooKeeper集群 7.1 集群简介
-
Leader选举 :
-
Serverid ; 服务器ID
-
比如有三台机器,编号为1,2,3.编号越大在选择算法中权重越大.
-
-
Zxid : 数据ID
-
服务器中存放的最大数据ID,值越大说明数据越新,在选举算法中数据越新则权重越大
-
-
在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了
-
-
与创建单机环境类似
-
修改conf目录下的zoo_sample.cfg为zoo.cfg
-
修改data目录为指定目录,每个集群成员分别配置
-
在data目录下创建myid文件,内容分别为1,2,3
-
分别在conf目录下的zoo.cfg文件加入如下内容
server.1=192.168.149.135:2881:3881 server.2=192.168.149.135:2882:3882 server.3=192.168.149.135:2883:3883
-
解释:server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口,搭建伪集群,端口可以写成127.0.0.1
-
Leader领导者
-
处理事务请求(增删改)
-
集群内部各个服务器的调度者
-
-
Follower跟随者
-
处理客户端非事务请求(查),转发事务请求给Leader服务器.
-
参数Leader选举投票
-
-
Observer观察者
-
处理客户端非事务请求,转发事务请求给Leader服务器.
-
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)