RocketMQ源码解析(四)——HA机制之主从同步

RocketMQ源码解析(四)——HA机制之主从同步,第1张

参考:

RocketMQ】学习RocketMQ必须要知道的主从同步原理_Mr.ZhuRunHua的博客-CSDN博客_rocketmq主从同步原理

RocketMq 高可用实现逻辑_风火1989的博客-CSDN博客

rocketmq的broker如何同步信息的? - notlate - 博客园 (cnblogs.com)

源码版本:RocketMQ 4.9.3


文章目录
  • 主从同步流程
  • 提出问题
  • 一、Slave 定时同步 Master 元数据
  • 二、Slave 同步 Master 消息数据
    • HA 相关类介绍
    • 实例化 HAService
    • Master端
      • 1)监听端口
      • 2)建立连接
      • 3)读取 slave 汇报的当前 maxOffset
      • 4)传输该偏移量后的所有消息数据给 slave
    • Slave端
      • 1)连接master
      • 2)定时发送maxOffset给master
      • 3)接收master传输来的消息数据


上次RocketMQ源码解析(三)——HA机制之读写分离说到:Master 、Slave 都在运行时,默认情况下,Consumer 从 Master 拉取消息,当 Master 积压的消息超过了物理内存的40%时,则建议从 Slave 拉取。

那么 当出现以上这种情况或者 Master 宕机了,Slave 接管消息消费。那当 Master 恢复正常后,Master 重新接管回消息消费(这个Master可能不是原来的 Master,可能是人工介入或者自动主从切换产生的 Master),该怎么知道最新的消息消费进度呢?这篇就来聊聊RocketMQ的主从同步机制。

主从同步流程

Broker 主从同步流程如下:

  1. Slave 开启一个定时任务,每 10s 从 Master 同步元数据;
  2. Master 启动 HAService,监听指定端口(默认为 10912),等待与 Slave 建立连接;
  3. Slave 启动 HAService,主动连接Master,Master 接收 Slave 的连接并建立TCP连接;
  4. Slave 每 5s 发送汇报本地 commitLog 的最大偏移量的请求;
  5. Master 解析请求,如果是首次传输则传输最后一个 commitLog 的起始位置后所有消息,不是的话传输汇报的偏移量后的所有消息给 Slave;
  6. Slave 收到 Master 发送的消息后,将消息写入本地 commitlog 文件中,然后向 Master 反馈拉取进度,并更新下一次待汇报偏移量;
  7. 重复第4~6步。
提出问题

看完上面的流程,问题来了:

  1. 为什么要分成两次同步,即分别同步元数据和消息?一起同步不行吗?
  2. Master 如何处理多个 Slave 汇报的 maxOffset 呢?如何保证一个Slave多次上报的 maxOffset 的顺序是正确的呢?

这里先说第一个问题,Master和Slave之间同步消息体,也就是同步CommitLog内容。CommitLog和元数据信息不同:

  • 首先,CommitLog的数据量比元数据要大;
  • 其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成 Master上的元数据内容和Slave上的元数据内容不一致,不过这种情况还可以补救 (手动调整Offset,重启Consumer等)。 CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出故障,消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接,这样效率更高。

剩下的问题我们接下来边看源码边解决吧!

一、Slave 定时同步 Master 元数据

如果 Broker 角色为 Slave,会通过定时任务调用slaveSynchronize.syncAll(),定时(每10s)同步一次topic路由信息、消息消费进度、延迟队列处理进度、消费组订阅信息。

BrokerController启动时,调用handleSlaveSynchronize

org/apache/rocketmq/broker/BrokerController#start

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    startProcessorByHa(messageStoreConfig.getBrokerRole());
    handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
    this.registerBrokerAll(true, false, true);
}

org/apache/rocketmq/broker/BrokerController#handleSlaveSynchronize

private void handleSlaveSynchronize(BrokerRole role) {
    // 只有slave节点,才进行同步 *** 作
    if (role == BrokerRole.SLAVE) {
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        // 设置master地址为空,避免一开始就进行同步
        // 后续在registerBrokerAll() 的时候,再设置master相关信息
        this.slaveSynchronize.setMasterAddr(null);
        // 3s后开始,每10秒钟同步一次slave数据
        slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.slaveSynchronize.syncAll();
                }
                catch (Throwable e) {
                    log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                }
            }
        }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
    } else {
        //handle the slave synchronise
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);
    }
}

org/apache/rocketmq/broker/slave/SlaveSynchronize#syncAll

public void syncAll() {
    // 同步topic路由信息
    this.syncTopicConfig();
    // 同步消息消费进度
    this.syncConsumerOffset();	
    // 同步延迟队列处理进度
    this.syncDelayOffset();
    // 同步消费组信息
    this.syncSubscriptionGroupConfig();
}

但是这个定时任务只同步了元数据信息,而真正的消息数据的同步是怎么执行的?

二、Slave 同步 Master 消息数据

消息同步,与存储相关,这些是由org/apache/rocketmq/store/ha下的一系列类来实现的(在enableDLegerCommitLog默认为 false 的情况下是这样的,为 true 时使用 Dledger 框架,此篇不涉及)。

HA 相关类介绍

HA相关类一共有下面这些类,UML类图如下:

HAService:HA核心类。其有三个内部类:

  • HAClient:HA客户端,用于向 Master 请求数据;
  • AcceptSocketService:负责监听指定端口(默认为 10912),建立与HAClient的连接;HAConection
  • GroupTransferService:负责当主从同步复制结束后通知由于等待 HA 同步结果而阻塞的消息发送者线程。

HAConnection:负责ServerSocketChannel的初始化并对这个通道进行读写。其有两个内部类:

  • ReadSocketService:负责创建一个Selector,监听给定通道上的读事件;
  • WriteSocketService:负责向通道中写入当前commitlog的数据。

具体的类方法的介绍可以看这个👉RocketMq 高可用实现逻辑_风火1989的博客-CSDN博客,写的挺不错的。

WaitNotifyObject:管理以上类的线程的等待和唤醒。

在具体说明 Master 和 Slave 端是如何配合实现主从同步之前,先简单地说说 HAService 的实例化。

实例化 HAService

实例化 BrokerController 时,调用

org/apache/rocketmq/store/DefaultMessageStore#DefaultMessageStore

// enableDLegerCommitLog默认为false,触发HAService初始化
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    this.haService = new HAService(this);
} else {
    this.haService = null;
}

org/apache/rocketmq/store/ha/HAService#HAService

public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
    this.defaultMessageStore = defaultMessageStore;
    // 初始化server端服务
    this.acceptSocketService =
        new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
    // 初始化主从同步通知服务
    this.groupTransferService = new GroupTransferService();
    // 初始化client端服务
    this.haClient = new HAClient();
}
Master端

在主从同步中,Master 端有以下几个动作:

  1. 监听端口
  2. 建立连接
  3. 读取 slave 汇报的当前 maxOffset
  4. 传输消息给 slave

BrokerController 启动时,调用:

org/apache/rocketmq/store/DefaultMessageStore#start

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
    // 启动HAService
    this.haService.start();
    // 如果broker是master,启动延迟消息服务,这里这不是重点
    this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}

this.haService.start()调用:

org/apache/rocketmq/store/ha/HAService#start

public void start() throws Exception {
    // 1.监听端口
    this.acceptSocketService.beginAccept();
    // 2.建立连接
    this.acceptSocketService.start();
    // 数据中转服务,它会接收用户的写请求,然后吐数据给到各slave节点
    this.groupTransferService.start();
    // 客户端请求服务,由slave节点发起
    this.haClient.start();
}
1)监听端口

使用NIO的接口,建立一个ServerSocketChannel,并且在对应的端口上监听接入请求。

org/apache/rocketmq/store/ha/HAService$AcceptSocketService#beginAccept

/**
 * Starts listening to slave connections.
 *
 * @throws Exception If fails.
 */
public void beginAccept() throws Exception {
    // 创建ServerSocketChannel
    this.serverSocketChannel = ServerSocketChannel.open();
    // 创建Selector
    this.selector = RemotingUtil.openSelector();
    // 设置TCP reuseAddress
    this.serverSocketChannel.socket().setReuseAddress(true);
    // 绑定监昕端口
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    // 设置为非阻塞模式
    this.serverSocketChannel.configureBlocking(false);
    // 注册OP_ACCEPT (连接事件)
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
2)建立连接

之前的问题二:Master 如何处理多个 Slave 汇报的 maxOffset 呢?如何保证一个Slave多次上报的 maxOffset 的顺序0…是正确的呢?0

这里AcceptSocketService是负责监听服务通道的accept事件并建立连接的类,继承ServiceThread,是一个线程类,调用start()时执行run()

一个AcceptSocketService线程就专门负责连接一个 Slave,处理其汇报的 commitLog 并向 Slave 传输数据。这样Master 就可以处理多个 Slave 汇报的commitLog了。同理,HAConnection$ReadSocketService也是继承了ServiceThread,Slave 仅使用一条线程进行数据同步,即拉取到的数据顺序是一致的,写入commitlog也是用同一条线程进行写入,自然就不会存在乱序问题了。这可能也是主从同步不能使用netty这种通信框架的原因,没必要也不能做。主从同步要求保证严格的顺序性,而无需过多考虑并发性。就像redis的单线程,同样撑起超高的性能。rocketmq主从同步基于原生 nio, 加上pagecache, mmap 同样实现了超高的性能。也就无需单线程同步会导致很大延迟了。

此处来自:RocketMQ(九):主从同步的实现 - 等你归去来 - 博客园 (cnblogs.com)

现在继续看 Master 建立连接的过程:

org/apache/rocketmq/store/ha/HAService$AcceptSocketService#run

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // selector每1s处理一次连接就绪事件。
            this.selector.select(1000);
            Set<SelectionKey> selected = this.selector.selectedKeys();

            // 连接事件就绪
            if (selected != null) {
                for (SelectionKey k : selected) {
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        // 调用ServerSocketChannel的accept()方法创建SocketChannel
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

                        if (sc != null) {
                            HAService.log.info("HAService receive new connection, "
                                + sc.socket().getRemoteSocketAddress());

                            try {
                                // 为每一个连接创建一个HAConnection对象
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                // accept 接入后,开启另外的线程负责主从数据同步逻辑
                                conn.start();
                                // 将该HaConnection对象加入到HAService#connectionList中
                                HAService.this.addConnection(conn);
                            } catch (Exception e) {
                                log.error("new HAConnection exception", e);
                                sc.close();
                            }
                        }
                    } else {
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                }

                selected.clear();
            }
        } catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}
3)读取 slave 汇报的当前 maxOffset

读事件有两个作用:

  • 读 Slave 请求拉取的消息偏移量;
  • 读 Slave 的消息同步ACK确认消息。

上面的方法中的conn.start()

org/apache/rocketmq/store/ha/HAConnection#start

public void start() {
    // 这里启动两个线程
    // 1.创建一个Selector,监听给定通道上的读事件
    this.readSocketService.start();
    // 2.向通道中写入当前commitlog的数据
    this.writeSocketService.start();
}

这里的this.readSocketService.start()

org/apache/rocketmq/store/ha/HAConnection$ReadSocketService#run

@Override
public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // selector每1s处理一次读事件
            this.selector.select(1000);
            // 处理读事件
            boolean ok = this.processReadEvent();
            if (!ok) {
                HAConnection.log.error("processReadEvent error");
                break;
            }

            long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
            if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
                break;
            }
        } catch (Exception e) {
            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }

    // 设置停止标志位
    this.makeStop();

    writeSocketService.makeStop();
	// 将连接删除
    haService.removeConnection(HAConnection.this);
	// 属性HAService#connectionCount减1
    HAConnection.this.haService.getConnectionCount().decrementAndGet();

    // 取消selector
    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }
	// 关闭selector、通道
    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        HAConnection.log.error("", e);
    }

    HAConnection.log.info(this.getServiceName() + " service end");
}

org/apache/rocketmq/store/ha/HAConnection$ReadSocketService#processReadEvent

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;

    //byteBufferRead中没有剩余数据
    if (!this.byteBufferRead.hasRemaining()) {
        // 恢复到初始可写状态
        this.byteBufferRead.flip();
        this.processPosition = 0;
    }

    //byteBufferRead中有剩余数据
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // 读取数据到byteBufferRead,并且接收读取到的字节数给readSize
            // byteBufferRead里的是slave累计汇报的当前maxOffset,maxOffset是一个long型变量
            int readSize = this.socketChannel.read(this.byteBufferRead);
            // 如果readSize大于0,则意味着有数据可以读取并且处理
            if (readSize > 0) {
                // 读到字节数为0的次数readSizeZeroTimes重新赋值为0
                readSizeZeroTimes = 0;
                // 将上一次读取数据时间lastReadTimestamp赋值为当前时间
                this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                // 如果slave汇报的最新maxOffset与processPosition差值小于8,则下一次循环再一起处理
                if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                    // 比slave汇报的最新maxOffset小的且最接近8的整倍数的值,声明为pos
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    // 在byteBufferRead的pos - 8的位置读取一个long变量,声明为readOffset,即要
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    // 记录已经处理到哪个位置了
                    this.processPosition = pos;

                    // slave确认已经读好的maxOffset
                    HAConnection.this.slaveAckOffset = readOffset;
                    // 第一次收到slave请求的待拉取的Offset
                    if (HAConnection.this.slaveRequestOffset < 0) {
                        HAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                    } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
                        // slave请求的待拉取的Offset还没有master本地的maxOffset大,不更新了
                        log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
                                HAConnection.this.clientAddr,
                                HAConnection.this.slaveAckOffset,
                                HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
                        return false;
                    }
                    
				  // 唤醒GroupTransferService中的等待线程
                    // 通过CAS *** 作,将入参的offset的值尝试写入push2SlaveMaxOffset,前提是入参的值大于当前值
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                }
            } else if (readSize == 0) {
                // 如果readSize为0,则读到字节数为0的次数readSizeZeroTimes加1
                // 如果readSizeZeroTimes大于等于3,则结束循环,否则继续循环
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                // 如果readSize为-1,说明通道关闭,返回false给方法调用者
                log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                return false;
            }
        } catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false;
        }
    }

    return true;
}
4)传输该偏移量后的所有消息数据给 slave

org/apache/rocketmq/store/ha/HAConnection$WriteSocketService#run

@Override
public void run() {
    HAConnection.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // selector每1s处理一次写事件
            this.selector.select(1000);

            //如果slave请求的待拉取的Offset为-1,说明没有消息要同步,休眠10ms
            if (-1 == HAConnection.this.slaveRequestOffset) {
                Thread.sleep(10);
                continue;
            }
            
		   // 下一次要传输的commitLog上的开始偏移量是不是等于-1
            // 如果等于-1说明之前没有传输过,则需要首先确定从哪一个位置开始启动传输
            if (-1 == this.nextTransferFromWhere) {
                // slave没有本地存储的消息,master推送给slave的起始位置是存储消息的最后一个commitLog开始位置
                if (0 == HAConnection.this.slaveRequestOffset) {
                    // 获取master当前commitLog的maxOffset
                    long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
                    // 将masterOffset修正到最接近mappedFileSizeCommitLog(1G)整倍数的数字
                    // 即最后一个commitLog的起始位置
                    masterOffset =
                        masterOffset
                            - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                            .getMappedFileSizeCommitLog());

                    if (masterOffset < 0) {
                        masterOffset = 0;
                    }
				  // 下一次要传输的commitLog上的开始偏移量更新为masterOffset
                    this.nextTransferFromWhere = masterOffset;
                } else {
                    // 如果不等于-1,则直接从上次slave汇报的偏移量开始传输即可
                    this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                }

                log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                    + "], and slave request " + HAConnection.this.slaveRequestOffset);
            }

            // 上一次写事件是否已将信息全部写到slave
            if (this.lastWriteOver) {
                // 当前时间和上一次数据写出时间的间隔
                long interval =
                    HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
			   // 如果interval大于HA心跳发送间隔(5s),则发送一个心跳包,避免长连接由于空闲被关闭
                // 心跳包头中包含slave汇报的偏移量和消息长度,消息长度默认为0
                if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaSendHeartbeatInterval()) {

                    // Build Header
                    this.byteBufferHeader.position(0);
                    this.byteBufferHeader.limit(headerSize);
                    this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                    this.byteBufferHeader.putInt(0);
                    this.byteBufferHeader.flip();
				   // 调用方法transferData发送数据,并且将结果赋值给lastWriteOver
                    this.lastWriteOver = this.transferData();
                    // 还没写出完,继续循环写
                    if (!this.lastWriteOver)
                        continue;
                }
            } else {
                // 如果上一次socket写出数据没有完毕,执行transferData发送数据
                this.lastWriteOver = this.transferData();
                // 还没写出完,继续循环写
                if (!this.lastWriteOver)
                    continue;
            }

            // 准备好发送本次要传输的commitLog内容
            // selectResult中包含了从偏移量nextTransferFromWhere开始的commitLog内容的ByteBuffer对象
            SelectMappedBufferResult selectResult =
                HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
            // 如果selectResult不为null(只有在线程关闭或者偏移量超出文件本身的时候才会是null)
            if (selectResult != null) {
                // 获取selectResult中的byteBuffer的大小,声明为size
                int size = selectResult.getSize();
                // 如果size大于HA传输一次同步任务最大传输的字节数(32KB),则修正到这个值
                if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                    size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                }
			   
                long thisOffset = this.nextTransferFromWhere;
                this.nextTransferFromWhere += size;

                selectResult.getByteBuffer().limit(size);
                this.selectMappedBufferResult = selectResult;

                // 构建传输头数据,偏移量是thisOffset,也就是上一次的nextTransferFromWhere,传输长度是size
                // Build Header
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(headerSize);
                this.byteBufferHeader.putLong(thisOffset);
                this.byteBufferHeader.putInt(size);
                this.byteBufferHeader.flip();

                // 调用transferData方法传输数据,并且将结果赋值给lastWriteOver属性
                this.lastWriteOver = this.transferData();
            } else {
			  // 如果selectResult为null,则通知所有等待线程继续等待1OOms
                HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
            }
        } catch (Exception e) {

            HAConnection.log.error(this.getServiceName() + " service has exception.", e);
            break;
        }
    }
    
	// 从HaService的waitNotifyObject中移除自身线程
    HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

    // 释放可能没有写完的SelectMappedBufferResult
    if (this.selectMappedBufferResult != null) {
        this.selectMappedBufferResult.release();
    }

    // 停止自身
    this.makeStop();

    // 停止读线程
    readSocketService.makeStop();

    // 从HaService中移除当前的HaConnection
    haService.removeConnection(HAConnection.this);

    SelectionKey sk = this.socketChannel.keyFor(this.selector);
    if (sk != null) {
        sk.cancel();
    }

    try {
        this.selector.close();
        this.socketChannel.close();
    } catch (IOException e) {
        HAConnection.log.error("", e);
    }

    HAConnection.log.info(this.getServiceName() + " service end");
}
Slave端
  1. 连接master
  2. 定时报告maxOffset给master
  3. 接收master传输来的数据

启动HClient服务:

org/apache/rocketmq/store/ha/HAService#start

this.haClient.start();

HAClient继承ServiceThread,调用start()时执行run()

org/apache/rocketmq/store/ha/HAService$HAClient#run

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 使用原生nio, 尝试连接至master
            if (this.connectMaster()) {
			   // 判断是否需要发送报告
                if (this.isTimeToReportOffset()) {
                    // 隔一段时间向master汇报一次本slave的maxOffset
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    // 如果连接无效,则关闭,下次再循环周期将会重新发起连接
                    if (!result) {
                        this.closeMaster();
                        continue;
                    }
                }
			   // selector每1s处理一次读事件
                this.selector.select(1000);
                
			   // 读Master传输的数据
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                    continue;
                }

                // 反馈拉取进度给master
                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                long interval =
                    HAService.this.getDefaultMessageStore().getSystemClock().now()
                        - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                        + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                }
            } else {
                // 未连接成功,5秒后重试,可能会一直无用
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }

    log.info(this.getServiceName() + " service end");
}
1)连接master

org/apache/rocketmq/store/ha/HAService$HAClient#connectMaster

private boolean connectMaster() throws ClosedChannelException {
    // 如果socketChannel为空, 则尝试连接Master
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        // 如果Master 地址不为空
        if (addr != null) {

            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                // 建立到Master的TCP连接
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    // 注册OP_READ (网络读事件)
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        
	    // 初始化currentReportedOffset为commitlog文件的最大偏移量
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        // lastWriteTimestamp上次写入时间戳为当前时间戳
        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    // 返回是否成功连接上Master。
    return this.socketChannel != null;
}
2)定时发送maxOffset给master

org/apache/rocketmq/store/ha/HAService$HAClient#isTimeToReportOffset

private boolean isTimeToReportOffset() {
    long interval =
        HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
    // 是否需要向Master发送当前待拉取偏移量
    // 如果离上次向master同步的时间差大于HA心跳发送间隔(默认为5s),则发送
    // 也就是说就算目前的偏移量没更新,也发送请求,起到一个心跳包的作用
    boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
        .getHaSendHeartbeatInterval();

    return needHeart;
}

org/apache/rocketmq/store/ha/HAService$HAClient#reportSlaveMaxOffsetPlus

private boolean reportSlaveMaxOffsetPlus() {
    boolean result = true;
    // 获取当前的偏移量
    long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
    // 当前的偏移量大于已经汇报的偏移量
    if (currentPhyOffset > this.currentReportedOffset) {
        // 更新已经汇报的偏移量
        this.currentReportedOffset = currentPhyOffset;
        // 汇报salve偏移量
        result = this.reportSlaveMaxOffset(this.currentReportedOffset);
        // 如果汇报失败,则调用closeMaster方法关闭连接
        if (!result) {
            this.closeMaster();
            log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
        }
    }

    // 如果汇报失败返回false,其余情况均返回true。
    return result;
}

org/apache/rocketmq/store/ha/HAService$HAClient#reportSlaveMaxOffset

private boolean reportSlaveMaxOffset(final long maxOffset) {
    // 清空reportOffset
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    // 写入long整型数字,也就是当前slave的偏移量
    this.reportOffset.putLong(maxOffset);
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    // 尝试将reportOffset的内容写入通道中,最多尝试3次
    // 如果在写入通道过程中出现异常,则返回false
    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            this.socketChannel.write(this.reportOffset);
        } catch (IOException e) {
            log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }

    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    // 返回reportOffset是否已经完全写入的结果
    return !this.reportOffset.hasRemaining();
}
3)接收master传输来的消息数据

org/apache/rocketmq/store/ha/HAService$HAClient#processReadEvent

private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    // readByteBuffer是否还有剩余空间
    while (this.byteBufferRead.hasRemaining()) {
        try {
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                // 将读取到的所有消息全部追加到消息内存映射文件中,然后再次反馈拉取进度给master
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    // 如果连续3次从网络通道读取到0个字节,则结束本次读,返回true
                    break;
                }
            } else {
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        } catch (IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }

    return true;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/916499.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存