ZooKeeper : Curator框架之Leader选举LeaderSelector

ZooKeeper : Curator框架之Leader选举LeaderSelector,第1张

ZooKeeper : Curator框架之Leader选举LeaderSelector

在上一篇博客中给大家介绍了 Curator框架的LeaderLatch,它是一种Leader选举实现,本篇博客介绍Curator框架的另一种Leader选举实现LeaderSelector。

  • ZooKeeper : Curator框架之Leader选举LeaderLatch

这里不再赘述Leader选举的概念。

测试代码

CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorframeworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.31.175:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
}

LeaderSelectorRunnable类(实现了Runnable接口,模拟分布式服务节点参与Leader选举):

package com.kaven.zookeeper;

import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class LeaderSelectorRunnable implements Runnable{
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的Curatorframework实例,表示不同的分布式服务节点
        Curatorframework curator = getCuratorframework();
        curator.start();
        assert curator.getState().equals(CuratorframeworkState.STARTED);

        // 模拟随机加入的分布式服务节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);

        // 创建监听器
        LeaderSelectorListenerImpl listener = new LeaderSelectorListenerImpl();
        // 创建LeaderSelector实例(用于Leader选举)
        // curator是Curatorframework实例,用于与ZooKeeper交互
        // "/services/leader"是leaderPath,Leader节点会成功创建该节点(其他节点则会失败)
        // EXECUTOR_SERVICE是用于执行业务的Executor实例
        // listener是该实例的监听器
        LeaderSelector selector = new LeaderSelector(curator, "/services/leader",
                EXECUTOR_SERVICE, listener);
        // 给自定义的监听器设置LeaderSelector实例
        listener.setSelector(selector);
        // 将线程名(Thread.currentThread().getName())作为分布式服务节点的id
        selector.setId(Thread.currentThread().getName());

        System.out.println(selector.getId() + "准备好了!");
        // 开始Leader选举
        selector.start();
        System.out.println(selector.getId() + "开始Leader选举!");
    }

    private Curatorframework getCuratorframework() {

        // 创建Curatorframework实例
        return CuratorframeworkFactory.builder()
                .connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorframeworkProperties.NAMESPACE)
                .build();
    }

    @Setter
    private static class LeaderSelectorListenerImpl implements LeaderSelectorListener {

        private LeaderSelector selector;

        // 被选举为Leader时调用
        // 该方法结束后会释放领导权,即重新进行Leader选举(还有节点的情况下)
        @Override
        public void takeLeadership(Curatorframework client) throws Exception {
            System.out.println(selector.getId() + "被选举为Leader");
            selector.getParticipants().forEach(System.out::println);
            // 模拟业务处理
            Thread.sleep(5000);
        }

        // 当连接状态发生变化时调用
        @Override
        public void stateChanged(Curatorframework client, ConnectionState newState) {
            System.out.println(selector.getId() + " : " + newState.name());
        }
    }
}

启动类;

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 7; i++) {
            EXECUTOR_SERVICE.execute(new LeaderSelectorRunnable());
        }
        Thread.sleep(10000000);
    }
}

模拟7个分布式服务节点进行Leader选举,输出如下所示:

pool-1-thread-2准备好了!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
pool-1-thread-2开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
pool-1-thread-6 : ConNECTED
pool-1-thread-1 : ConNECTED
pool-1-thread-5 : ConNECTED
pool-1-thread-2 : ConNECTED
pool-1-thread-7 : ConNECTED
pool-1-thread-3 : ConNECTED
pool-1-thread-4 : ConNECTED
pool-1-thread-1被选举为Leader
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-6被选举为Leader
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-7被选举为Leader
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-5被选举为Leader
Participant{id='pool-1-thread-5', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-3被选举为Leader
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-4被选举为Leader
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-2被选举为Leader
Participant{id='pool-1-thread-2', isLeader=true}
重新参与Leader选举

被选举为Leader的LeaderSelector实例并没有调用close方法,却重新进行了Leader选举,这其实是监听器的takeLeadership方法结束后会释放领导权,即重新进行Leader选举(还有节点的情况下)。

相关方法:

    @VisibleForTesting
    void doWork() throws Exception
    {
        hasLeadership = false;
        try
        {
            // 尝试获取锁,即成为Leader
            mutex.acquire();
            // 成功获取锁,成为Leader
            hasLeadership = true;
            try
            {             
                // 调用监听器的takeLeadership方法
                listener.takeLeadership(client);
            }
            finally
            { 
                // 设置isQueued属性为false
                // 即不再参入Leader选举
                clearIsQueued();
            }
        }
        finally
        {
            if ( hasLeadership )
            {
                // 失去领导权
                hasLeadership = false;
                // 清除任何中断的状态,以便 mutex.release() 立即工作
                boolean wasInterrupted = Thread.interrupted();  
                try
                {
                    // 释放锁
                    mutex.release();
                }
                catch ( Exception e )
                {}
            }
        }
    }

因此监听器的takeLeadership方法就是用来处理业务逻辑的,处理完就会释放锁,即退出Leader选举,如果处理完业务后还想再次参与Leader选举,可以调用LeaderSelector实例的autoRequeue方法(将autoRequeue属性设置为true,即自动重新排队,再次参与Leader选举):

    // 默认情况下,当LeaderSelectorListener.takeLeadership(Curatorframework)返回时,此实例不会重新排队
    // 调用此方法会将实例置于一种自动重新排队的模式
    public void autoRequeue()
    {
        autoRequeue.set(true);
    }

相关方法:

    private synchronized boolean internalRequeue()
    {
        // 没有在排队,并且状态为STARTED
        if ( !isQueued && (state.get() == State.STARTED) )
        {
            // 设置排队状态为true
            isQueued = true;
            // 向Executor实例(创建LeaderSelector实例时传入的)提交任务
            Future task = executorService.submit(new Callable()
            {
                @Override
                public Void call() throws Exception
                {
                    try
                    {
                        // 执行业务,会间接调用上面介绍的doWork方法
                        doWorkLoop();
                    }
                    finally
                    {
                        // 设置isQueued属性为false
                        // 即不再参入Leader选举
                        clearIsQueued();
                        // 是否自动重新入队
                        if ( autoRequeue.get() )
                        {
                            // 自动重新排队
                            // 继续参与Leader选举
                            internalRequeue();
                        }
                    }
                    return null;
                }
            });
            // 设置任务
            ourTask.set(task);

            return true;
        }
        return false;
    }

将autoRequeue属性设置为true,每次失去领导权(之前是Leader的情况下),都会自动重新排队,再次参与Leader选举。

修改代码:

        // 开始Leader选举
        selector.start();
        System.out.println(selector.getId() + "开始Leader选举!");
        selector.autoRequeue();

输出如下所示:

pool-1-thread-5被选举为Leader
Participant{id='pool-1-thread-5', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
pool-1-thread-7被选举为Leader
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
pool-1-thread-1被选举为Leader
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
pool-1-thread-6被选举为Leader
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
pool-1-thread-3被选举为Leader
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
pool-1-thread-4被选举为Leader
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
pool-1-thread-2被选举为Leader
Participant{id='pool-1-thread-2', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-1', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
...

每个参与Leader选举的LeaderSelector实例在失去领导权后都会自动重新排队,即每次参与Leader选举的Participant列表都包含7个LeaderSelector实例。

需要时才重新排队,而不是每次都自动重新排队,可以调用requeue方法(实际上还是调用internalRequeue方法):

    // 重新排队争取领导权
    // 如果此实例已排队,则不会发生任何事情并返回false
    // 如果实例未排队,则重新排队并返回 true
    public boolean requeue()
    {
        Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
        return internalRequeue();
    }

Curator框架的Leader选举实现LeaderSelector就介绍到这里,源码以后会进行分析介绍,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5698535.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存