zookeeper分布式锁的实现

zookeeper分布式锁的实现,第1张

zookeeper分布式锁的实现 实现原理

当一个客户端成功创建一个节点,另外一个客户端是无法创建同名的节点(达到互斥的效果)我们注册该节点的监听时间,当节点删除,会通知其他的客户端,这个时候其他的客户端可以重新去创建该节点(可以认为时拿到锁的客户端释放锁,其他的客户端可以抢锁)创建的节点应该时临时节点,这样保证我们在已经拿到锁的客户端挂掉了会自动释放锁 zk锁基类

package zklock;
 
import org.I0Itec.zkclient.ZkClient;
 
public abstract class AbstractLock {
 
	//zk地址和端口
	public static final String ZK_ADDR = "192.168.0.230:2181";
	//超时时间
	public static final int SESSION_TIMEOUT = 10000;
	//创建zk
	protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
	
	
	
	public void getLock() {
		String threadName = Thread.currentThread().getName();
		if (tryLock()) {
			System.out.println(threadName+"-获取锁成功");
		}else {
			System.out.println(threadName+"-获取锁失败,进行等待...");
			waitLock();
			//递归重新获取锁
			getLock();
		}
	}
	
	//释放锁
	public abstract void releaseLock();
	
	//尝试加锁
	public abstract boolean tryLock();
	
	//等待锁
	public abstract void waitLock();
}
图解(A)

实现(A)
package zklock;
 
import java.util.concurrent.CountDownLatch;
 
import org.I0Itec.zkclient.IZkDataListener;
 

public class SimpleZkLock extends AbstractLock {
 
	private static final String NODE_NAME = "/test_simple_lock";
	
	private CountDownLatch countDownLatch;
	
	@Override
	public void releaseLock() {
		if (null != zkClient) {
			//删除节点
			zkClient.delete(NODE_NAME);
			zkClient.close();
			System.out.println(Thread.currentThread().getName()+"-释放锁成功");
		}
		
	}
 
	//直接创建临时节点,如果创建成功,则表示获取了锁,创建不成功则处理异常
	@Override
	public boolean tryLock() {
		if (null == zkClient) return false;
		try {
			zkClient.createEphemeral(NODE_NAME);
			return true;
		} catch (Exception e) {
			return false;
		}
	}
 
	@Override
	public void waitLock() {
		//监听器
		IZkDataListener iZkDataListener = new IZkDataListener() {
			//节点被删除回调
			@Override
			public void handleDataDeleted(String dataPath) throws Exception {
				if (countDownLatch != null) {
					countDownLatch.countDown();
				}
			}
			//节点改变被回调
			@Override
			public void handleDataChange(String dataPath, Object data) throws Exception {
				// TODO Auto-generated method stub
				
			}
		};
		zkClient.subscribeDataChanges(NODE_NAME, iZkDataListener);
		//如果存在则阻塞
		if (zkClient.exists(NODE_NAME)) {
			countDownLatch = new CountDownLatch(1);
			try {
				countDownLatch.await();
				System.out.println(Thread.currentThread().getName()+" 等待获取锁...");
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		//删除监听
		zkClient.unsubscribeDataChanges(NODE_NAME, iZkDataListener);
	}
 
}
测试部分
package zklock;
 
public class LockTest {
	public static void main(String[] args) {
		//模拟多个10个客户端
		for (int i=0;i<10;i++) {
			Thread thread = new Thread(new LockRunnable());
			thread.start();
		}
		
	}
	
	static class LockRunnable implements Runnable{
 
		@Override
		public void run() {
			AbstractLock zkLock = new SimpleZkLock();
			//AbstractLock zkLock = new HighPerformanceZkLock();
			zkLock.getLock();
			//模拟业务操作
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			zkLock.releaseLock();
		}
		
	}
}

测试结果分析:性能较低
每一次客户端释放锁的时候,其他的客户端都会去抢锁,这就造成了不必要的浪费

图解(B)

实现(B)
package zklock;
 
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
 
import org.I0Itec.zkclient.IZkDataListener;
 

public class HighPerformanceZkLock extends AbstractLock {
 
	private static final String PATH = "/highPerformance_zklock";
	//当前节点路径
	private String currentPath;
	//前一个节点的路径
	private String beforePath;
	
	private CountDownLatch countDownLatch = null;
	
	public HighPerformanceZkLock() {
		//如果不存在这个节点,则创建持久节点
		if (!zkClient.exists(PATH)) {		
			zkClient.createPersistent(PATH);
		}
	}
	
	@Override
	public void releaseLock() {
		if (null != zkClient) {
			zkClient.delete(currentPath);
	        zkClient.close();
		}
 
	}
 
	@Override
	public boolean tryLock() {
		//如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
		if (null == currentPath || "".equals(currentPath)) {
			//在path下创建一个临时的顺序节点
			currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock");
		}
		//获取所有的临时节点,并排序
		List childrens = zkClient.getChildren(PATH);
		Collections.sort(childrens);
		if (currentPath.equals(PATH+"/"+childrens.get(0))) {
			return true;
		}else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
			int pathLength = PATH.length();
			int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
			beforePath = PATH+"/"+childrens.get(wz-1);
		}
		return false;
	}
 
	@Override
	public void waitLock() {
		IZkDataListener lIZkDataListener = new IZkDataListener() {
			
			@Override
			public void handleDataDeleted(String dataPath) throws Exception {
				if (null != countDownLatch){
					countDownLatch.countDown();
				}
			}
			
			@Override
			public void handleDataChange(String dataPath, Object data) throws Exception {
				
			}
		};
		//监听前一个节点的变化
		zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
		if (zkClient.exists(beforePath)) {
			countDownLatch = new CountDownLatch(1);
			try {
				countDownLatch.await();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
	}
 
}

实现思路:
客户端在抢锁的时候进行排队,客户端只要监听它前一个节点的变化就行,如果前一个节点释放了锁,客户端才去进行抢锁 *** 作,这个时候我们就需要创建顺序节点了

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5701122.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存