参考:
【RocketMQ】学习RocketMQ必须要知道的主从同步原理_Mr.ZhuRunHua的博客-CSDN博客_rocketmq主从同步原理
RocketMq 高可用实现逻辑_风火1989的博客-CSDN博客
rocketmq的broker如何同步信息的? - notlate - 博客园 (cnblogs.com)
源码版本:RocketMQ 4.9.3
文章目录
- 主从同步流程
- 提出问题
- 一、Slave 定时同步 Master 元数据
- 二、Slave 同步 Master 消息数据
- HA 相关类介绍
- 实例化 HAService
- Master端
- 1)监听端口
- 2)建立连接
- 3)读取 slave 汇报的当前 maxOffset
- 4)传输该偏移量后的所有消息数据给 slave
- Slave端
- 1)连接master
- 2)定时发送maxOffset给master
- 3)接收master传输来的消息数据
上次RocketMQ源码解析(三)——HA机制之读写分离说到:Master 、Slave 都在运行时,默认情况下,Consumer 从 Master 拉取消息,当 Master 积压的消息超过了物理内存的40%时,则建议从 Slave 拉取。
那么 当出现以上这种情况或者 Master 宕机了,Slave 接管消息消费。那当 Master 恢复正常后,Master 重新接管回消息消费(这个Master可能不是原来的 Master,可能是人工介入或者自动主从切换产生的 Master),该怎么知道最新的消息消费进度呢?这篇就来聊聊RocketMQ的主从同步机制。
主从同步流程Broker 主从同步流程如下:
- Slave 开启一个定时任务,每 10s 从 Master 同步元数据;
- Master 启动 HAService,监听指定端口(默认为 10912),等待与 Slave 建立连接;
- Slave 启动 HAService,主动连接Master,Master 接收 Slave 的连接并建立TCP连接;
- Slave 每 5s 发送汇报本地 commitLog 的最大偏移量的请求;
- Master 解析请求,如果是首次传输则传输最后一个 commitLog 的起始位置后所有消息,不是的话传输汇报的偏移量后的所有消息给 Slave;
- Slave 收到 Master 发送的消息后,将消息写入本地 commitlog 文件中,然后向 Master 反馈拉取进度,并更新下一次待汇报偏移量;
- 重复第4~6步。
看完上面的流程,问题来了:
- 为什么要分成两次同步,即分别同步元数据和消息?一起同步不行吗?
- Master 如何处理多个 Slave 汇报的 maxOffset 呢?如何保证一个Slave多次上报的 maxOffset 的顺序是正确的呢?
这里先说第一个问题,Master和Slave之间同步消息体,也就是同步CommitLog内容。CommitLog和元数据信息不同:
- 首先,CommitLog的数据量比元数据要大;
- 其次,对实时性和可靠性要求也不一样。元数据信息是定时同步的,在两次同步的时间差里,如果出现异常可能会造成 Master上的元数据内容和Slave上的元数据内容不一致,不过这种情况还可以补救 (手动调整Offset,重启Consumer等)。 CommitLog在高可靠性场景下如果没有及时同步,一旦Master机器出故障,消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。CommitLog的同步,不是经过netty command的方式,而是直接进行TCP连接,这样效率更高。
剩下的问题我们接下来边看源码边解决吧!
一、Slave 定时同步 Master 元数据如果 Broker 角色为 Slave,会通过定时任务调用slaveSynchronize.syncAll()
,定时(每10s)同步一次topic路由信息、消息消费进度、延迟队列处理进度、消费组订阅信息。
BrokerController启动时,调用handleSlaveSynchronize
。
org/apache/rocketmq/broker/BrokerController#start
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
org/apache/rocketmq/broker/BrokerController#handleSlaveSynchronize
private void handleSlaveSynchronize(BrokerRole role) {
// 只有slave节点,才进行同步 *** 作
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
// 设置master地址为空,避免一开始就进行同步
// 后续在registerBrokerAll() 的时候,再设置master相关信息
this.slaveSynchronize.setMasterAddr(null);
// 3s后开始,每10秒钟同步一次slave数据
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
org/apache/rocketmq/broker/slave/SlaveSynchronize#syncAll
public void syncAll() {
// 同步topic路由信息
this.syncTopicConfig();
// 同步消息消费进度
this.syncConsumerOffset();
// 同步延迟队列处理进度
this.syncDelayOffset();
// 同步消费组信息
this.syncSubscriptionGroupConfig();
}
但是这个定时任务只同步了元数据信息,而真正的消息数据的同步是怎么执行的?
二、Slave 同步 Master 消息数据消息同步,与存储相关,这些是由org/apache/rocketmq/store/ha
下的一系列类来实现的(在enableDLegerCommitLog
默认为 false 的情况下是这样的,为 true 时使用 Dledger 框架,此篇不涉及)。
HA相关类一共有下面这些类,UML类图如下:
HAService:HA核心类。其有三个内部类:
- HAClient:HA客户端,用于向 Master 请求数据;
- AcceptSocketService:负责监听指定端口(默认为 10912),建立与HAClient的连接;HAConection
- GroupTransferService:负责当主从同步复制结束后通知由于等待 HA 同步结果而阻塞的消息发送者线程。
HAConnection:负责ServerSocketChannel的初始化并对这个通道进行读写。其有两个内部类:
- ReadSocketService:负责创建一个Selector,监听给定通道上的读事件;
- WriteSocketService:负责向通道中写入当前commitlog的数据。
具体的类方法的介绍可以看这个👉RocketMq 高可用实现逻辑_风火1989的博客-CSDN博客,写的挺不错的。
WaitNotifyObject:管理以上类的线程的等待和唤醒。
在具体说明 Master 和 Slave 端是如何配合实现主从同步之前,先简单地说说 HAService 的实例化。
实例化 HAService实例化 BrokerController 时,调用
org/apache/rocketmq/store/DefaultMessageStore#DefaultMessageStore
// enableDLegerCommitLog默认为false,触发HAService初始化
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService = new HAService(this);
} else {
this.haService = null;
}
org/apache/rocketmq/store/ha/HAService#HAService
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
// 初始化server端服务
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
// 初始化主从同步通知服务
this.groupTransferService = new GroupTransferService();
// 初始化client端服务
this.haClient = new HAClient();
}
Master端
在主从同步中,Master 端有以下几个动作:
- 监听端口
- 建立连接
- 读取 slave 汇报的当前 maxOffset
- 传输消息给 slave
BrokerController 启动时,调用:
org/apache/rocketmq/store/DefaultMessageStore#start
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 启动HAService
this.haService.start();
// 如果broker是master,启动延迟消息服务,这里这不是重点
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
this.haService.start()
调用:
org/apache/rocketmq/store/ha/HAService#start
public void start() throws Exception {
// 1.监听端口
this.acceptSocketService.beginAccept();
// 2.建立连接
this.acceptSocketService.start();
// 数据中转服务,它会接收用户的写请求,然后吐数据给到各slave节点
this.groupTransferService.start();
// 客户端请求服务,由slave节点发起
this.haClient.start();
}
1)监听端口
使用NIO的接口,建立一个ServerSocketChannel,并且在对应的端口上监听接入请求。
org/apache/rocketmq/store/ha/HAService$AcceptSocketService#beginAccept
/**
* Starts listening to slave connections.
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
// 创建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
// 创建Selector
this.selector = RemotingUtil.openSelector();
// 设置TCP reuseAddress
this.serverSocketChannel.socket().setReuseAddress(true);
// 绑定监昕端口
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 设置为非阻塞模式
this.serverSocketChannel.configureBlocking(false);
// 注册OP_ACCEPT (连接事件)
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
2)建立连接
之前的问题二:Master 如何处理多个 Slave 汇报的 maxOffset 呢?如何保证一个Slave多次上报的 maxOffset 的顺序0…是正确的呢?0
这里AcceptSocketService
是负责监听服务通道的accept事件并建立连接的类,继承ServiceThread
,是一个线程类,调用start()
时执行run()
。
一个AcceptSocketService
线程就专门负责连接一个 Slave,处理其汇报的 commitLog 并向 Slave 传输数据。这样Master 就可以处理多个 Slave 汇报的commitLog了。同理,HAConnection$ReadSocketService
也是继承了ServiceThread
,Slave 仅使用一条线程进行数据同步,即拉取到的数据顺序是一致的,写入commitlog也是用同一条线程进行写入,自然就不会存在乱序问题了。这可能也是主从同步不能使用netty这种通信框架的原因,没必要也不能做。主从同步要求保证严格的顺序性,而无需过多考虑并发性。就像redis的单线程,同样撑起超高的性能。rocketmq主从同步基于原生 nio, 加上pagecache, mmap 同样实现了超高的性能。也就无需单线程同步会导致很大延迟了。
此处来自:RocketMQ(九):主从同步的实现 - 等你归去来 - 博客园 (cnblogs.com)
现在继续看 Master 建立连接的过程:
org/apache/rocketmq/store/ha/HAService$AcceptSocketService#run
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// selector每1s处理一次连接就绪事件。
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
// 连接事件就绪
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
// 调用ServerSocketChannel的accept()方法创建SocketChannel
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 为每一个连接创建一个HAConnection对象
HAConnection conn = new HAConnection(HAService.this, sc);
// accept 接入后,开启另外的线程负责主从数据同步逻辑
conn.start();
// 将该HaConnection对象加入到HAService#connectionList中
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
3)读取 slave 汇报的当前 maxOffset
读事件有两个作用:
- 读 Slave 请求拉取的消息偏移量;
- 读 Slave 的消息同步ACK确认消息。
上面的方法中的conn.start()
:
org/apache/rocketmq/store/ha/HAConnection#start
public void start() {
// 这里启动两个线程
// 1.创建一个Selector,监听给定通道上的读事件
this.readSocketService.start();
// 2.向通道中写入当前commitlog的数据
this.writeSocketService.start();
}
这里的this.readSocketService.start()
org/apache/rocketmq/store/ha/HAConnection$ReadSocketService#run
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// selector每1s处理一次读事件
this.selector.select(1000);
// 处理读事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// 设置停止标志位
this.makeStop();
writeSocketService.makeStop();
// 将连接删除
haService.removeConnection(HAConnection.this);
// 属性HAService#connectionCount减1
HAConnection.this.haService.getConnectionCount().decrementAndGet();
// 取消selector
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
// 关闭selector、通道
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
org/apache/rocketmq/store/ha/HAConnection$ReadSocketService#processReadEvent
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
//byteBufferRead中没有剩余数据
if (!this.byteBufferRead.hasRemaining()) {
// 恢复到初始可写状态
this.byteBufferRead.flip();
this.processPosition = 0;
}
//byteBufferRead中有剩余数据
while (this.byteBufferRead.hasRemaining()) {
try {
// 读取数据到byteBufferRead,并且接收读取到的字节数给readSize
// byteBufferRead里的是slave累计汇报的当前maxOffset,maxOffset是一个long型变量
int readSize = this.socketChannel.read(this.byteBufferRead);
// 如果readSize大于0,则意味着有数据可以读取并且处理
if (readSize > 0) {
// 读到字节数为0的次数readSizeZeroTimes重新赋值为0
readSizeZeroTimes = 0;
// 将上一次读取数据时间lastReadTimestamp赋值为当前时间
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 如果slave汇报的最新maxOffset与processPosition差值小于8,则下一次循环再一起处理
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// 比slave汇报的最新maxOffset小的且最接近8的整倍数的值,声明为pos
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 在byteBufferRead的pos - 8的位置读取一个long变量,声明为readOffset,即要
long readOffset = this.byteBufferRead.getLong(pos - 8);
// 记录已经处理到哪个位置了
this.processPosition = pos;
// slave确认已经读好的maxOffset
HAConnection.this.slaveAckOffset = readOffset;
// 第一次收到slave请求的待拉取的Offset
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
} else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
// slave请求的待拉取的Offset还没有master本地的maxOffset大,不更新了
log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
HAConnection.this.clientAddr,
HAConnection.this.slaveAckOffset,
HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
return false;
}
// 唤醒GroupTransferService中的等待线程
// 通过CAS *** 作,将入参的offset的值尝试写入push2SlaveMaxOffset,前提是入参的值大于当前值
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
// 如果readSize为0,则读到字节数为0的次数readSizeZeroTimes加1
// 如果readSizeZeroTimes大于等于3,则结束循环,否则继续循环
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
// 如果readSize为-1,说明通道关闭,返回false给方法调用者
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
4)传输该偏移量后的所有消息数据给 slave
org/apache/rocketmq/store/ha/HAConnection$WriteSocketService#run
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// selector每1s处理一次写事件
this.selector.select(1000);
//如果slave请求的待拉取的Offset为-1,说明没有消息要同步,休眠10ms
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 下一次要传输的commitLog上的开始偏移量是不是等于-1
// 如果等于-1说明之前没有传输过,则需要首先确定从哪一个位置开始启动传输
if (-1 == this.nextTransferFromWhere) {
// slave没有本地存储的消息,master推送给slave的起始位置是存储消息的最后一个commitLog开始位置
if (0 == HAConnection.this.slaveRequestOffset) {
// 获取master当前commitLog的maxOffset
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
// 将masterOffset修正到最接近mappedFileSizeCommitLog(1G)整倍数的数字
// 即最后一个commitLog的起始位置
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
// 下一次要传输的commitLog上的开始偏移量更新为masterOffset
this.nextTransferFromWhere = masterOffset;
} else {
// 如果不等于-1,则直接从上次slave汇报的偏移量开始传输即可
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 上一次写事件是否已将信息全部写到slave
if (this.lastWriteOver) {
// 当前时间和上一次数据写出时间的间隔
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 如果interval大于HA心跳发送间隔(5s),则发送一个心跳包,避免长连接由于空闲被关闭
// 心跳包头中包含slave汇报的偏移量和消息长度,消息长度默认为0
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
// 调用方法transferData发送数据,并且将结果赋值给lastWriteOver
this.lastWriteOver = this.transferData();
// 还没写出完,继续循环写
if (!this.lastWriteOver)
continue;
}
} else {
// 如果上一次socket写出数据没有完毕,执行transferData发送数据
this.lastWriteOver = this.transferData();
// 还没写出完,继续循环写
if (!this.lastWriteOver)
continue;
}
// 准备好发送本次要传输的commitLog内容
// selectResult中包含了从偏移量nextTransferFromWhere开始的commitLog内容的ByteBuffer对象
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
// 如果selectResult不为null(只有在线程关闭或者偏移量超出文件本身的时候才会是null)
if (selectResult != null) {
// 获取selectResult中的byteBuffer的大小,声明为size
int size = selectResult.getSize();
// 如果size大于HA传输一次同步任务最大传输的字节数(32KB),则修正到这个值
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// 构建传输头数据,偏移量是thisOffset,也就是上一次的nextTransferFromWhere,传输长度是size
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 调用transferData方法传输数据,并且将结果赋值给lastWriteOver属性
this.lastWriteOver = this.transferData();
} else {
// 如果selectResult为null,则通知所有等待线程继续等待1OOms
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// 从HaService的waitNotifyObject中移除自身线程
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
// 释放可能没有写完的SelectMappedBufferResult
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
// 停止自身
this.makeStop();
// 停止读线程
readSocketService.makeStop();
// 从HaService中移除当前的HaConnection
haService.removeConnection(HAConnection.this);
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
Slave端
- 连接master
- 定时报告maxOffset给master
- 接收master传输来的数据
启动HClient服务:
org/apache/rocketmq/store/ha/HAService#start
this.haClient.start();
HAClient
继承ServiceThread
,调用start()
时执行run()
。
org/apache/rocketmq/store/ha/HAService$HAClient#run
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 使用原生nio, 尝试连接至master
if (this.connectMaster()) {
// 判断是否需要发送报告
if (this.isTimeToReportOffset()) {
// 隔一段时间向master汇报一次本slave的maxOffset
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
// 如果连接无效,则关闭,下次再循环周期将会重新发起连接
if (!result) {
this.closeMaster();
continue;
}
}
// selector每1s处理一次读事件
this.selector.select(1000);
// 读Master传输的数据
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
continue;
}
// 反馈拉取进度给master
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
// 未连接成功,5秒后重试,可能会一直无用
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
1)连接master
org/apache/rocketmq/store/ha/HAService$HAClient#connectMaster
private boolean connectMaster() throws ClosedChannelException {
// 如果socketChannel为空, 则尝试连接Master
if (null == socketChannel) {
String addr = this.masterAddress.get();
// 如果Master 地址不为空
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 建立到Master的TCP连接
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// 注册OP_READ (网络读事件)
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 初始化currentReportedOffset为commitlog文件的最大偏移量
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// lastWriteTimestamp上次写入时间戳为当前时间戳
this.lastWriteTimestamp = System.currentTimeMillis();
}
// 返回是否成功连接上Master。
return this.socketChannel != null;
}
2)定时发送maxOffset给master
org/apache/rocketmq/store/ha/HAService$HAClient#isTimeToReportOffset
private boolean isTimeToReportOffset() {
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
// 是否需要向Master发送当前待拉取偏移量
// 如果离上次向master同步的时间差大于HA心跳发送间隔(默认为5s),则发送
// 也就是说就算目前的偏移量没更新,也发送请求,起到一个心跳包的作用
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
org/apache/rocketmq/store/ha/HAService$HAClient#reportSlaveMaxOffsetPlus
private boolean reportSlaveMaxOffsetPlus() {
boolean result = true;
// 获取当前的偏移量
long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 当前的偏移量大于已经汇报的偏移量
if (currentPhyOffset > this.currentReportedOffset) {
// 更新已经汇报的偏移量
this.currentReportedOffset = currentPhyOffset;
// 汇报salve偏移量
result = this.reportSlaveMaxOffset(this.currentReportedOffset);
// 如果汇报失败,则调用closeMaster方法关闭连接
if (!result) {
this.closeMaster();
log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
}
}
// 如果汇报失败返回false,其余情况均返回true。
return result;
}
org/apache/rocketmq/store/ha/HAService$HAClient#reportSlaveMaxOffset
private boolean reportSlaveMaxOffset(final long maxOffset) {
// 清空reportOffset
this.reportOffset.position(0);
this.reportOffset.limit(8);
// 写入long整型数字,也就是当前slave的偏移量
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
// 尝试将reportOffset的内容写入通道中,最多尝试3次
// 如果在写入通道过程中出现异常,则返回false
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
// 返回reportOffset是否已经完全写入的结果
return !this.reportOffset.hasRemaining();
}
3)接收master传输来的消息数据
org/apache/rocketmq/store/ha/HAService$HAClient#processReadEvent
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// readByteBuffer是否还有剩余空间
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
// 将读取到的所有消息全部追加到消息内存映射文件中,然后再次反馈拉取进度给master
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
// 如果连续3次从网络通道读取到0个字节,则结束本次读,返回true
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)