ignite 2.11.0 节点发现原理及源码分析

ignite 2.11.0 节点发现原理及源码分析,第1张

ignite 2.11.0 节点发现原理及源码分析

ignite 2.11.0 节点发现原理
    • 节点发现介绍
    • 环形拓扑
    • 节点加入过程
      • 概述
      • 创建连接
      • TcpDiscoveryJoinRequestMessage
      • TcpDiscoveryNodeAddedMessage
      • TcpDiscoveryNodeAddFinishedMessage

节点发现介绍

发现机制的主要目标是创建 Ignite 节点的拓扑结构,并在每个节点上构建并维护一致的内存视图。 例如,此视图包含集群中的节点数及节点顺序。

发现机制由 DiscoverySpi 接口表示,默认实现是TcpDiscoverySpi。 其他实现,如 ZookeeperDiscoverySpi,本文重点描述的TcpDiscoverySpi。

拓扑结构由特定的 DiscoverySpi 实现定义,例如,TcpDiscoverySpi 定义了环形拓扑。

在描述集群拓扑时,我们谈论的是仅存在于“发现”级别的逻辑布局。 例如,当查询驻留在缓存中的数据时,集群可能使用与本文描述的拓扑不同的拓扑。

环形拓扑

TcpDiscoverySpi 将集群的所有服务器节点组织成环形结构,其中每个节点只能向单个节点(称为“邻居”)发送发现消息。客户端节点位于环之外,并连接到一个服务端。 此代码逻辑分别包含在服务器节点ServerImpl类和客户端节点ClientImpl 类中。

节点加入过程 概述

当一个新节点启动时,它尝试通过探测TcpDiscoveryIpFinder提供的地址列表来找到一个现有的集群。如果所有地址都不可用,则节点认为自己是唯一的节点,从自己形成一个集群,并成为此集群的协调者。否则,将执行如下节点加入过程。

节点加入过程包括以下几个阶段:

  1. 加入节点向集群中的随机节点发送TcpDiscoveryJoinRequestMessage,该节点会把消息被转发到协调器。
  2. 协调器将新节点放置在最后一个节点和自身之间,并通过在环中发送TcpDiscoveryNodeAddedMessage来传播拓扑更改消息。
  3. 在集群的所有成员接收到TcpDiscoveryNodeAddedMessage 之后,发送TcpDiscoveryNodeAddFinishedMessage 来完成更改。

创建连接

客户端创建连接代码跟踪如下:
Ignite ignite = Ignition.start(cfg);//客户端启动代码 -->IgnitionEx$IgniteNamedInstance grid0.start(2112行) --> … -->ClientImpl spiStart方法启动IgniteSpiThread线程

IgniteSpiThread线程,run:58, IgniteSpiThread, body();方法–>…–>joinTopology:629, ClientImpl–>sendJoinRequest:734, ClientImpl–>…–>TcpDiscoverySpi opensocket()方法,openSocket:1592, TcpDiscoverySpi

opensocket方法里,创建socket连接,并发0x00004747到服务端

    
    public static final byte[] IGNITE_HEADER = intToBytes(0x00004747);

    protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
        throws IOException, IgniteSpiOperationTimeoutException {
            ...
            writeToSocket(sock, null, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
            ...

Wireshark抓包如下:

TcpDiscoveryJoinRequestMessage

节点通过调用 ServerImpl#joinTopology(对于服务器节点)或 ClientImpl#joinTopology(对于客户端节点)开始节点加入过程,然后调用 TcpDiscoverySpi#collectExchangeData 来收集所有必要的
discovery data(例如来自 GridCacheProcessor 的缓存配置,请参阅不同的 GridComponent# collectJoiningNodeData 实现)。该数据被打包到加入请求(TcpDiscoveryJoinRequestMessage)中并发送给协调器。

以客户端ClientImpl作为代码示例:
ClientImpl类,joinTopology方法里调用sendJoinRequest方法,如下
(代码路径:run:58, IgniteSpiThread–>body:317, ClientImpl–>…–>tryJoin:2108, ClientImpl$MessageWorker–>…–>joinTopology:629, ClientImpl–>sendJoinRequest, ClientImpl)

    @Nullable private T3 sendJoinRequest(boolean recon,
        InetSocketAddress addr) {
                ...
                // 向服务端发送TcpDiscoveryHandshakeRequest 消息
                TcpDiscoveryHandshakeRequest req = new TcpDiscoveryHandshakeRequest(locNodeId);
                req.client(true);
                spi.writeToSocket(sock, req, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
                  ...       
                    // collectExchangeData 来收集所有必要的discovery data
                    if (discoveryData == null)
                        discoveryData = spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId()));
                    // 该数据被打包到加入请求中并发送给协调器
                    TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData);

discovery data结构如下:

当协调器收到请求时,它会验证消息并生成 TcpDiscoveryNodeAddedMessage,如果验证成功(请参阅 ServerImpl.RingMessageWorker#processJoinRequestMessage)。这条信息随后通过环发送出去。

服务端ClientImpl 处理TcpDiscoveryJoinRequestMessage代码示例:

        @Override protected void body() throws InterruptedException {
                       ...
                        else if (msg instanceof TcpDiscoveryJoinRequestMessage) {
                            TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg;

                            if (!req.responded()) {
                                boolean ok = processJoinRequestMessage(req, clientMsgWrk);

                                if (clientMsgWrk != null && ok)
                                    continue;
                                else
                                    // Direct join request - no need to handle this socket anymore.
                                    break;
                            }
                        }
                        ...

生成 TcpDiscoveryNodeAddedMessage代码示例:

        private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) {
                ...
                //生成 TcpDiscoveryNodeAddedMessage
                TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
                    node, data, spi.gridStartTime);

                nodeAddedMsg = tracing.messages().branch(nodeAddedMsg, msg);

                nodeAddedMsg.client(msg.client());

                processNodeAddedMessage(nodeAddedMsg);

                tracing.messages().finishProcessing(nodeAddedMsg);
                ...
TcpDiscoveryNodeAddedMessage

在处理 TcpDiscoveryNodeAddedMessage 时,集群中的每个节点将加入节点的discovery data应用于component,收集其本地discovery data,并将其添加到消息中(详情请参阅ServerImpl.RingMessageWorker#processNodeAddedMessage)。然后通过调用ServerImpl.RingMessageWorker#sendMessageAcrossRing 将消息进一步传播到环上。
当 TcpDiscoveryNodeAddedMessage 完成整个循环并再次到达协调器时,它将被协调器消费并且协调器发出TcpDiscoveryNodeAddFinishedMessage消息。

TcpDiscoveryNodeAddedMessage 也被传递到加入节点,它在所有其他节点已经处理它之后,在最后接收消息。

代码示例:

        private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
       				 ...
                    DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
      				  ...          
                    if (dataPacket.hasJoiningNodeData()) {
                        if (spiState == CONNECTED) {
                            // Node already connected to the cluster can apply joining nodes' disco data immediately
                            //加入节点的discovery data应用于component
                            spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));

                            if (!node.isDaemon())
                                spi.collectExchangeData(dataPacket);
                        } 
 					...
TcpDiscoveryNodeAddFinishedMessage

TcpDiscoveryNodeAddFinishedMessage 完成节点加入过程。 收到此消息时,每个节点都会触发 NODE_JOINED 事件以通知discovery manager有关新加入的节点的信息。

NodeAddFinished 和额外加入请求
如果加入节点没有及时收到TcpDiscoveryNodeAddFinishedMessage,将发送一个额外的加入请求。 该时间由 TcpDiscoverySpi#networkTimeout 定义,默认值为 5 秒 (TcpDiscoverySpi#DFLT_NETWORK_TIMEOUT)。

        private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg) {
                ...
                if (state == CONNECTED) {
                //触发 NODE_JOINED 事件
                    boolean notified = notifyDiscovery(EVT_NODE_JOINED, topVer, node, msg.spanContainer());
                ...

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4873792.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-11
下一篇 2022-11-11

发表评论

登录后才能评论

评论列表(0条)

保存