九、zookeeper:服务器动态上下线监听案例

九、zookeeper:服务器动态上下线监听案例,第1张

zk目录


文章目录 9.1、需求9.2、需求分析-服务器动态上下线9.3、具体实现9.3.1、服务器端代码9.3.2、客户端代码 9.4、测试9.4.1、在 Linux 命令行上 *** 作增加减少服务器9.4.2、在 Idea 上 *** 作增加减少服务器


9.1、需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

9.2、需求分析-服务器动态上下线

以下是我个人的理解:

对于zookeeper集群来说,图中服务器和客户端都是 “客户端“,图中的服务器做的是创建节点的动作,图中的客户端执行的监听节点的动作。而只有客户端才能执行这些 *** 作,就像之前直接用命令行 *** 作,都要先启动客户端:[root@wlw102 zookeeper-3.5.7]$ bin/zkCli.sh -server wlw102:2181

不同的是,图中的服务器(也就是这一台电脑)是安装了zookeeper 的,这些安装了zookeeper的机器组成了一个集群,当然这些服务器也可以作为客户端去访问zookeeper集群,所创建的数据也就这些机器的硬盘中的某个文件里存放着。图中的客户端通过域名+端口来访问已启动的zookeeper,从而监控某个节点。

步骤:

服务器上线的时候其实就是服务器启动时去注册信息(创建的都是临时节点)客户端获取到当前在线的服务器列表后服务器节点下线后给集群管理集群管理服务器节点的下件时间通知给客户端客户端通过获取服务器列表重选选择服务器

9.3、具体实现

步骤:

(1)先在集群上创建/servers 节点,在leader上 *** 作

(2)服务器端向 Zookeeper 注册代码(DistributeServer.java):

(3)客户端监听代码(DistributeClient.java):

(1)先在集群上创建/servers 节点,在leader上 *** 作

[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers
9.3.1、服务器端代码

(2)服务器端向 Zookeeper 注册代码(DistributeServer.java):

package com.wlw.case1;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * 服务器端代码——注册节点
 */
public class DistributeServer {

    // 注意:逗号左右不能有空格
    private String connectString = "wlw102:2181,wlw103:2181,wlw104:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {

        DistributeServer server = new DistributeServer();
        // 1 获取zk连接
        server.getConnect();

        // 2 注册服务器到zk集群
        server.regist(args[0]);

        // 3 启动业务逻辑(睡觉)
        server.business();
    }
    /**
     * 获取zk连接
     */
    private void getConnect() throws IOException {

        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    /**
     * 注册服务器到zk集群 (创建临时节点)
     */
    private void regist(String hostname) throws KeeperException, InterruptedException {
        String create = zk.create("/servers/"+hostname, hostname.getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(hostname +" is working") ;
    }

    /**
     * 启动业务逻辑(睡觉)
     */
    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

}

9.3.2、客户端代码

(3)客户端监听代码(DistributeClient.java):

package com.wlw.case1;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * 客户端代码——监听节点
 */
public class DistributeClient {

    // 注意:逗号左右不能有空格
    private String connectString = "wlw102:2181,wlw103:2181,wlw104:2181";
    private int sessionTimeout = 2000;
    private ZooKeeper zk;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeClient client = new DistributeClient();

        // 1 获取zk连接
        client.getConnect();

        // 2 监听/servers下面子节点的增加和删除
        client.getServerList();

        // 3 业务逻辑(睡觉)
        client.business();

    }

    /**
     * 获取zk连接
     */
    private void getConnect() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 保证时刻监控
                try {
                    getServerList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * 监听/servers下面子节点的增加和删除
     */
    private void getServerList() throws KeeperException, InterruptedException {
        List<String> children = zk.getChildren("/servers", true);

        ArrayList<String> servers = new ArrayList<>();

        for (String child : children) {

            byte[] data = zk.getData("/servers/" + child, false, null);

            servers.add(new String(data));
        }

        // 打印节点
        System.out.println(servers);
    }

    /**
     * 启动业务逻辑(睡觉)
     */
    private void business() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }
}

9.4、测试 9.4.1、在 Linux 命令行上 *** 作增加减少服务器

(1)启动 DistributeClient 客户端

(2)在 wlw102 上 zk 的客户端/servers 目录上创建临时带序号节点

[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/wlw102 "wlw102"
[zk: localhost:2181(CONNECTED) 2] create -e -s /servers/wlw103 "wlw103"

(3)观察 Idea 控制台变化

[wlw102, wlw103]

(4)执行删除 *** 作(我们创建的是临时有序号节点)

[zk: localhost:2181(CONNECTED) 8] delete /servers/wlw1020000000000

(5)观察 Idea 控制台变化

[wlw103]
9.4.2、在 Idea 上 *** 作增加减少服务器

(1)启动 DistributeClient 客户端(如果已经启动过,不需要重启)

(2)启动 DistributeServer 服务

①点击IDEA上方的 Edit Configurations②在d出的窗口中(Program arguments)输入想启动的主机,例如,wlw102\③回到 DistributeServer 的 main 方 法 , 右 键 , 在 d 出 的 窗 口 中 点 击 Run DistributeServer.main()④观察 DistributeServer 控制台,提示wlw102 is working⑤观察 DistributeClient 控制台,看节点

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/1294972.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-10
下一篇 2022-06-10

发表评论

登录后才能评论

评论列表(0条)

保存