在上一篇博客中给大家介绍了 Curator框架的LeaderLatch,它是一种Leader选举实现,本篇博客介绍Curator框架的另一种Leader选举实现LeaderSelector。
- ZooKeeper : Curator框架之Leader选举LeaderLatch
这里不再赘述Leader选举的概念。
测试代码CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息):
package com.kaven.zookeeper; import org.apache.curator.RetryPolicy; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorframeworkProperties { // 连接地址 public static final String CONNECT_ADDRESS = "192.168.31.175:9000"; // 连接超时时间 public static final int CONNECTION_TIMEOUT_MS = 40000; // Session超时时间 public static final int SESSION_TIMEOUT_MS = 10000; // 命名空间 public static final String NAMESPACE = "MyNamespace"; // 重试策略 public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3); }
LeaderSelectorRunnable类(实现了Runnable接口,模拟分布式服务节点参与Leader选举):
package com.kaven.zookeeper; import lombok.Setter; import lombok.SneakyThrows; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.CuratorframeworkFactory; import org.apache.curator.framework.imps.CuratorframeworkState; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class LeaderSelectorRunnable implements Runnable{ private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(); @SneakyThrows @Override public void run() { // 使用不同的Curatorframework实例,表示不同的分布式服务节点 Curatorframework curator = getCuratorframework(); curator.start(); assert curator.getState().equals(CuratorframeworkState.STARTED); // 模拟随机加入的分布式服务节点 int randomSleep = new Random().nextInt(1000); Thread.sleep(randomSleep); // 创建监听器 LeaderSelectorListenerImpl listener = new LeaderSelectorListenerImpl(); // 创建LeaderSelector实例(用于Leader选举) // curator是Curatorframework实例,用于与ZooKeeper交互 // "/services/leader"是leaderPath,Leader节点会成功创建该节点(其他节点则会失败) // EXECUTOR_SERVICE是用于执行业务的Executor实例 // listener是该实例的监听器 LeaderSelector selector = new LeaderSelector(curator, "/services/leader", EXECUTOR_SERVICE, listener); // 给自定义的监听器设置LeaderSelector实例 listener.setSelector(selector); // 将线程名(Thread.currentThread().getName())作为分布式服务节点的id selector.setId(Thread.currentThread().getName()); System.out.println(selector.getId() + "准备好了!"); // 开始Leader选举 selector.start(); System.out.println(selector.getId() + "开始Leader选举!"); } private Curatorframework getCuratorframework() { // 创建Curatorframework实例 return CuratorframeworkFactory.builder() .connectString(CuratorframeworkProperties.CONNECT_ADDRESS) .retryPolicy(CuratorframeworkProperties.RETRY_POLICY) .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS) .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS) .namespace(CuratorframeworkProperties.NAMESPACE) .build(); } @Setter private static class LeaderSelectorListenerImpl implements LeaderSelectorListener { private LeaderSelector selector; // 被选举为Leader时调用 // 该方法结束后会释放领导权,即重新进行Leader选举(还有节点的情况下) @Override public void takeLeadership(Curatorframework client) throws Exception { System.out.println(selector.getId() + "被选举为Leader"); selector.getParticipants().forEach(System.out::println); // 模拟业务处理 Thread.sleep(5000); } // 当连接状态发生变化时调用 @Override public void stateChanged(Curatorframework client, ConnectionState newState) { System.out.println(selector.getId() + " : " + newState.name()); } } }
启动类;
package com.kaven.zookeeper; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Application { private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool(); public static void main(String[] args) throws Exception { for (int i = 0; i < 7; i++) { EXECUTOR_SERVICE.execute(new LeaderSelectorRunnable()); } Thread.sleep(10000000); } }
模拟7个分布式服务节点进行Leader选举,输出如下所示:
pool-1-thread-2准备好了! pool-1-thread-3准备好了! pool-1-thread-3开始Leader选举! pool-1-thread-2开始Leader选举! pool-1-thread-4准备好了! pool-1-thread-4开始Leader选举! pool-1-thread-7准备好了! pool-1-thread-7开始Leader选举! pool-1-thread-6准备好了! pool-1-thread-6开始Leader选举! pool-1-thread-5准备好了! pool-1-thread-5开始Leader选举! pool-1-thread-1准备好了! pool-1-thread-1开始Leader选举! pool-1-thread-6 : ConNECTED pool-1-thread-1 : ConNECTED pool-1-thread-5 : ConNECTED pool-1-thread-2 : ConNECTED pool-1-thread-7 : ConNECTED pool-1-thread-3 : ConNECTED pool-1-thread-4 : ConNECTED pool-1-thread-1被选举为Leader Participant{id='pool-1-thread-1', isLeader=true} Participant{id='pool-1-thread-6', isLeader=false} Participant{id='pool-1-thread-7', isLeader=false} Participant{id='pool-1-thread-5', isLeader=false} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} pool-1-thread-6被选举为Leader Participant{id='pool-1-thread-6', isLeader=true} Participant{id='pool-1-thread-7', isLeader=false} Participant{id='pool-1-thread-5', isLeader=false} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} pool-1-thread-7被选举为Leader Participant{id='pool-1-thread-7', isLeader=true} Participant{id='pool-1-thread-5', isLeader=false} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} pool-1-thread-5被选举为Leader Participant{id='pool-1-thread-5', isLeader=true} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} pool-1-thread-3被选举为Leader Participant{id='pool-1-thread-3', isLeader=true} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} pool-1-thread-4被选举为Leader Participant{id='pool-1-thread-4', isLeader=true} Participant{id='pool-1-thread-2', isLeader=false} pool-1-thread-2被选举为Leader Participant{id='pool-1-thread-2', isLeader=true}重新参与Leader选举
被选举为Leader的LeaderSelector实例并没有调用close方法,却重新进行了Leader选举,这其实是监听器的takeLeadership方法结束后会释放领导权,即重新进行Leader选举(还有节点的情况下)。
相关方法:
@VisibleForTesting void doWork() throws Exception { hasLeadership = false; try { // 尝试获取锁,即成为Leader mutex.acquire(); // 成功获取锁,成为Leader hasLeadership = true; try { // 调用监听器的takeLeadership方法 listener.takeLeadership(client); } finally { // 设置isQueued属性为false // 即不再参入Leader选举 clearIsQueued(); } } finally { if ( hasLeadership ) { // 失去领导权 hasLeadership = false; // 清除任何中断的状态,以便 mutex.release() 立即工作 boolean wasInterrupted = Thread.interrupted(); try { // 释放锁 mutex.release(); } catch ( Exception e ) {} } } }
因此监听器的takeLeadership方法就是用来处理业务逻辑的,处理完就会释放锁,即退出Leader选举,如果处理完业务后还想再次参与Leader选举,可以调用LeaderSelector实例的autoRequeue方法(将autoRequeue属性设置为true,即自动重新排队,再次参与Leader选举):
// 默认情况下,当LeaderSelectorListener.takeLeadership(Curatorframework)返回时,此实例不会重新排队 // 调用此方法会将实例置于一种自动重新排队的模式 public void autoRequeue() { autoRequeue.set(true); }
相关方法:
private synchronized boolean internalRequeue() { // 没有在排队,并且状态为STARTED if ( !isQueued && (state.get() == State.STARTED) ) { // 设置排队状态为true isQueued = true; // 向Executor实例(创建LeaderSelector实例时传入的)提交任务 Futuretask = executorService.submit(new Callable () { @Override public Void call() throws Exception { try { // 执行业务,会间接调用上面介绍的doWork方法 doWorkLoop(); } finally { // 设置isQueued属性为false // 即不再参入Leader选举 clearIsQueued(); // 是否自动重新入队 if ( autoRequeue.get() ) { // 自动重新排队 // 继续参与Leader选举 internalRequeue(); } } return null; } }); // 设置任务 ourTask.set(task); return true; } return false; }
将autoRequeue属性设置为true,每次失去领导权(之前是Leader的情况下),都会自动重新排队,再次参与Leader选举。
修改代码:
// 开始Leader选举 selector.start(); System.out.println(selector.getId() + "开始Leader选举!"); selector.autoRequeue();
输出如下所示:
pool-1-thread-5被选举为Leader Participant{id='pool-1-thread-5', isLeader=true} Participant{id='pool-1-thread-7', isLeader=false} Participant{id='pool-1-thread-1', isLeader=false} Participant{id='pool-1-thread-6', isLeader=false} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} pool-1-thread-7被选举为Leader Participant{id='pool-1-thread-7', isLeader=true} Participant{id='pool-1-thread-1', isLeader=false} Participant{id='pool-1-thread-6', isLeader=false} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} Participant{id='pool-1-thread-5', isLeader=false} pool-1-thread-1被选举为Leader Participant{id='pool-1-thread-1', isLeader=true} Participant{id='pool-1-thread-6', isLeader=false} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} Participant{id='pool-1-thread-5', isLeader=false} Participant{id='pool-1-thread-7', isLeader=false} pool-1-thread-6被选举为Leader Participant{id='pool-1-thread-6', isLeader=true} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} Participant{id='pool-1-thread-5', isLeader=false} Participant{id='pool-1-thread-7', isLeader=false} Participant{id='pool-1-thread-1', isLeader=false} pool-1-thread-3被选举为Leader Participant{id='pool-1-thread-3', isLeader=true} Participant{id='pool-1-thread-4', isLeader=false} Participant{id='pool-1-thread-2', isLeader=false} Participant{id='pool-1-thread-5', isLeader=false} Participant{id='pool-1-thread-7', isLeader=false} Participant{id='pool-1-thread-1', isLeader=false} Participant{id='pool-1-thread-6', isLeader=false} pool-1-thread-4被选举为Leader Participant{id='pool-1-thread-4', isLeader=true} Participant{id='pool-1-thread-2', isLeader=false} Participant{id='pool-1-thread-5', isLeader=false} Participant{id='pool-1-thread-7', isLeader=false} Participant{id='pool-1-thread-1', isLeader=false} Participant{id='pool-1-thread-6', isLeader=false} Participant{id='pool-1-thread-3', isLeader=false} pool-1-thread-2被选举为Leader Participant{id='pool-1-thread-2', isLeader=true} Participant{id='pool-1-thread-5', isLeader=false} Participant{id='pool-1-thread-7', isLeader=false} Participant{id='pool-1-thread-1', isLeader=false} Participant{id='pool-1-thread-6', isLeader=false} Participant{id='pool-1-thread-3', isLeader=false} Participant{id='pool-1-thread-4', isLeader=false} ...
每个参与Leader选举的LeaderSelector实例在失去领导权后都会自动重新排队,即每次参与Leader选举的Participant列表都包含7个LeaderSelector实例。
需要时才重新排队,而不是每次都自动重新排队,可以调用requeue方法(实际上还是调用internalRequeue方法):
// 重新排队争取领导权 // 如果此实例已排队,则不会发生任何事情并返回false // 如果实例未排队,则重新排队并返回 true public boolean requeue() { Preconditions.checkState(state.get() == State.STARTED, "close() has already been called"); return internalRequeue(); }
Curator框架的Leader选举实现LeaderSelector就介绍到这里,源码以后会进行分析介绍,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)