zookeeper分布式锁

zookeeper分布式锁,第1张

zookeeper分布式锁 分布式锁

准备工作 导入依赖
	 
	 	junit 
	 	junit 
	 	RELEASE 
	  

	 
		org.apache.logging.log4j 
	 	log4j-core 
	 	2.8.2
	 

	  
	 	org.apache.zookeeper 
	 	zookeeper 
	 	3.5.7 
	  
方法实现
import org.apache.zookeeper.*; 
import org.apache.zookeeper.data.Stat; 

import java.io.IOException; 
import java.util.Collections; 
import java.util.List; 
import java.util.concurrent.CountDownLatch; 

public class DistributedLock { 

 // zookeeper server 列表 
 private String connectString = 
"hadoop102:2181,hadoop103:2181,hadoop104:2181"; 
 // 超时时间 
 private int sessionTimeout = 2000; 

 private ZooKeeper zk; 

 private String rootNode = "locks"; 
 private String subNode = "seq-"; 
 // 当前client 等待的子节点 
 private String waitPath; 

 //ZooKeeper 连接 
 private CountDownLatch connectLatch = new CountDownLatch(1);
   //ZooKeeper 节点等待 
 private CountDownLatch waitLatch = new CountDownLatch(1); 

 // 当前client 创建的子节点 
 private String currentNode; 

 // 和zk 服务建立连接,并创建根节点 
 public DistributedLock() throws IOException, 
InterruptedException, KeeperException { 

     zk  =  new  ZooKeeper(connectString,  sessionTimeout,  new 
Watcher() { 
         @Override 
         public void process(WatchedEvent event) { 
             // 连接建立时, 打开latch, 唤醒wait 在该latch 上的线程 
             if (event.getState() == 
Event.KeeperState.SyncConnected) { 
                 connectLatch.countDown(); 
             } 

             // 发生了waitPath 的删除事件 
             if (event.getType() == 
Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) 
{ 
                 waitLatch.countDown(); 
             } 
         } 
     }); 

     // 等待连接建立 
     connectLatch.await(); 

     //获取根节点状态 
     Stat stat = zk.exists("/" + rootNode, false); 

     //如果根节点不存在,则创建根节点,根节点类型为永久节点 
     if (stat == null) { 
         System.out.println("根节点不存在"); 
         zk.create("/" + rootNode, new byte[0], 
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 
     } 
 } 

 // 加锁方法 
 public void zkLock() { 

     try { 
         //在根节点下创建临时顺序节点,返回值为创建的节点路径 
         currentNode = zk.create("/" + rootNode + "/" + subNode, 
null, ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                 CreateMode.EPHEMERAL_SEQUENTIAL); 

         // wait 一小会, 让结果更清晰一些 
         Thread.sleep(10); 

         // 注意, 没有必要监听"/locks"的子节点的变化情况
         List  childrenNodes  =  zk.getChildren("/"  + 
rootNode, false); 

         //  列表中只有一个子节点,  那肯定就是 currentNode  ,  说明
client 获得锁 
         if (childrenNodes.size() == 1) { 
             return; 
         } else { 
             //对根节点下的所有临时顺序节点进行从小到大排序 
             Collections.sort(childrenNodes); 

             //当前节点名称 
             String  thisNode  =  currentNode.substring(("/"  + 
rootNode + "/").length()); 
             //获取当前节点的位置 
             int index = childrenNodes.indexOf(thisNode); 

             if (index == -1) { 
                 System.out.println("数据异常"); 
             } else if (index == 0) { 
                 //  index  ==  0,  说明 thisNode 在列表中最小,  当前
client 获得锁 
                 return; 
             } else { 
                 // 获得排名比currentNode 前1 位的节点 
                 this.waitPath  =  "/"  +  rootNode  +  "/"  + 
childrenNodes.get(index - 1); 

                 //  在 waitPath 上注册监听器,  当 waitPath 被删除时, zookeeper 会回调监听器的process 方法 
                 zk.getData(waitPath, true, new Stat()); 
                 //进入等待锁状态 
                 waitLatch.await(); 

                 return; 
             } 
         } 
     } catch (KeeperException e) { 
         e.printStackTrace(); 
     } catch (InterruptedException e) { 
         e.printStackTrace(); 
     } 
 } 

 // 解锁方法 
 public void zkUnlock() { 
     try { 
         zk.delete(this.currentNode, -1); 
     } catch (InterruptedException | KeeperException e) { 
         e.printStackTrace(); 
     } 
 } 
}
测试
import org.apache.zookeeper.KeeperException; 

import java.io.IOException; 

public class DistributedLockTest { 

 public static void main(String[] args) throws 
InterruptedException, IOException, KeeperException { 

     // 创建分布式锁1 
     final DistributedLock lock1 = new DistributedLock(); 
     // 创建分布式锁2 
     final DistributedLock lock2 = new DistributedLock(); 

     new Thread(new Runnable() { 
         @Override 
         public void run() { 
             // 获取锁对象 
             try { 
                 lock1.zkLock(); 
                 System.out.println("线程1 获取锁"); 
                 Thread.sleep(5 * 1000); 

                 lock1.zkUnlock(); 
                 System.out.println("线程1 释放锁"); 
             } catch (Exception e) { 
                 e.printStackTrace(); 
             } 
         } 
     }).start(); 

     new Thread(new Runnable() { 
         @Override 
         public void run() { 
             // 获取锁对象 
             try { 
                 lock2.zkLock(); 
                 System.out.println("线程2 获取锁"); 
                 Thread.sleep(5 * 1000); 

                 lock2.zkUnlock(); 
                 System.out.println("线程2 释放锁"); 
             } catch (Exception e) { 
                 e.printStackTrace(); 
             } 
         } 
     }).start(); 
 } 
}
方法讲解

当线程获取到zookeeper得节点创建一个临时节点,作为锁标志,并获取当前节点在zookeeper得位置,如果不为0,会监听这个节点得前置节点,并进入等待状态,直到获取锁。如果为-1 则出现错误,其他状态会等待锁。当方法完成之后会进行删除当前线程持有的节点,以达到释放锁的目的。

通过Curator 框架实现分布式锁

原生的Java API 开发存在的问题

(1)会话连接是异步的,需要自己去处理。比如使用CountDownLatch

(2)Watch 需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归

导入依赖
 
 org.apache.curator 
 curator-framework 
 4.3.0 
 
 
 org.apache.curator 
 curator-recipes 
 4.3.0 
 
 
 org.apache.curator 
 curator-client 
 4.3.0 

代码演示

import org.apache.curator.RetryPolicy; 
import org.apache.curator.framework.Curatorframework; 
import org.apache.curator.framework.CuratorframeworkFactory; 
import org.apache.curator.framework.recipes.locks.InterProcessLock; 
import org.apache.curator.framework.recipes.locks.InterProcessMutex; 
import org.apache.curator.retry.ExponentialBackoffRetry; 

public class CuratorLockTest { 

 private String rootNode = "/locks";
 // zookeeper server 列表 
 private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; 

 // connection 超时时间 
 private int connectionTimeout = 2000; 

 // session 超时时间 
 private int sessionTimeout = 2000; 

 public static void main(String[] args) { 

     new CuratorLockTest().test(); 
 } 

 // 测试 
 private void test() { 

     // 创建分布式锁1 
     final InterProcessLock lock1 = new InterProcessMutex(getCuratorframework(), rootNode); 

     // 创建分布式锁2 
     final InterProcessLock lock2 = new InterProcessMutex(getCuratorframework(), rootNode); 

     new Thread(new Runnable() { 
         @Override 
         public void run() { 
             // 获取锁对象 
             try { 
                 lock1.acquire(); 
                 System.out.println("线程1 获取锁"); 
                 // 测试锁重入 
                 lock1.acquire(); 
                 System.out.println("线程1 再次获取锁"); 
                 Thread.sleep(5 * 1000); 
                 lock1.release(); 
                 System.out.println("线程1 释放锁"); 
                 lock1.release(); 
                 System.out.println("线程1 再次释放锁"); 
             } catch (Exception e) { 
                 e.printStackTrace(); 
             } 
         } 
     }).start(); 
     new Thread(new Runnable() { 
         @Override 
         public void run() { 
             // 获取锁对象 
             try { 
                 lock2.acquire(); 
                 System.out.println("线程2 获取锁"); 
                 // 测试锁重入 
                 lock2.acquire();
                  System.out.println("线程2 再次获取锁"); 
                 Thread.sleep(5 * 1000); 
                 lock2.release(); 
                 System.out.println("线程2 释放锁"); 
                 lock2.release(); 
                 System.out.println("线程2 再次释放锁"); 
             } catch (Exception e) { 
                 e.printStackTrace(); 
             } 
         } 
     }).start(); 
 } 

 // 分布式锁初始化 
 public Curatorframework getCuratorframework (){ 

     //重试策略,初试时间3 秒,重试3 次 
     RetryPolicy policy = new ExponentialBackoffRetry(3000, 3); 

     //通过工厂创建Curator 
     Curatorframework client = 
CuratorframeworkFactory.builder() 
             .connectString(connectString) 
             .connectionTimeoutMs(connectionTimeout) 
             .sessionTimeoutMs(sessionTimeout) 
             .retryPolicy(policy).build(); 

     //开启连接 
     client.start(); 
     System.out.println("zookeeper 初始化完成..."); 
     return client; 
 } 

}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5619432.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存