Maxwell 源码部分解读

Maxwell 源码部分解读,第1张

Maxwell 源码部分解读

全个人理解-mark 有错误望指出

当我们部署完成一个maxwell服务 我们想要监听某个mysql表。其官方为我们提供的脚本

 

可以看到是使用 bin 目录下面maxwell 这个脚本进行启动的, 接着我们来看这个脚本。

可以看到其都是通过调用 com.zendesk.maxwell.Maxwell 进行的 java 嘛 我们都知道调用这个类肯定执行的为它的main方法

所以我们源码的解读从这个类的main方法开始

Maxwell

整个main方法中 比较关键的三个步骤

  • 构建maxwell配置
  • 通过配置来创建 Maxwell对象 -- 没什么重要的事情
  • maxwell.start()启动

主要看maxwell.start() 启动,在其内部又调用了 startInner()方法

startInner()

主要的步骤-中间细节忽略

// 1. binlog format row 确保binlog为 row格式
MaxwellMysqlStatus.ensureReplicationMysqlState(connection);
// 2. 仅读模式
MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection);
// 3. gtid 全局唯一事务id
if (config.gtidMode) {
   // gtid
   MaxwellMysqlStatus.ensureGtidMysqlState(connection);
}
// 4. 确保 maxwell相关元数据表已经创建完成- 如果未创建则通过sql文件初始化创建
SchemaStoreSchema.ensureMaxwellSchema(rawConnection, this.config.databaseName)
       
// 5. 确保 相关表中有关键字段
SchemaStoreSchema.upgradeSchemaStoreSchema(schemaConnection);

// 6. 获取 producer 默认没有 会通过我们传递的参数来选择 --producer=kafka
// 这边在创建完以后就会直接进行启动
AbstractProducer producer = this.context.getProducer();
    case "kafka":
    this.producer = new MaxwellKafkaProducer(this, this.config.getKafkaProperties(), this.config.kafkaTopic);
    // 以线程方式启动
    然后start() 中会不断的从queue中拿取数据,并发送
// 7. 获取初始化位置
Position initPosition = getInitialPosition();

// 8. 创建binlog连接复制器
this.replicator = new BinlogConnectorReplicator(xxx,x,x,x,x,,x,x,x,)
// 9. 偏移量存储服务启动
this.context.start();
    getPositionStoreThread()->this.positionStoreThread.start()->
    this.thread = new Thread(this, "Position Flush Thread")->run()->runLoop()->work() [找到实现类]
    

// 10. 并启动服务 其调用了 BinaryLogClient.connect
this.replicator.startReplicator() -->> this.client.connect(5000); 
// 11. 复制服务真正的启动
replicator.runLoop();

步骤详解 1. MaxwellMysqlStatus.ensureReplicationMysqlState(connection);

binlog format row 确保binlog为 row格式

m.ensureVariableState("log_bin", "ON");

m.ensureVariableState("binlog_format", "ROW");
6.获取 producer 默认没有 会通过我们传递的参数来选择 --producer=kafka

// 这边在创建完以后就会直接进行启动

AbstractProducer producer = this.context.getProducer();

-- 通过我们脚本传输的命令 进行判断选用了哪种Producer

然后进入 new MaxwellKafkaProducer的构造方法可以看到, 初始化以后就直接进行了启动

对于java来说 其thread启动以后 真正运行的是它的run方法 我们紧着看run方法实现

可以看到是不断的从queue中拿取消息并发送, 那queue里面的消息 其实就是我们下面binlogConnectorReplicator 塞进去的。

7. 获取初始化位置

Position initPosition = getInitialPosition();

其实就是获取我们初始偏移量位置,会有几种方式

  • 我们是否自己初始化了位置
  • 正在从 主从交换中恢复嘛
  • 以前是否有cliend_id, 如果是这样我们必须从这里恢复
  • 抓取当前master的position
8.this.replicator = new BinlogConnectorReplicator

创建binlog连接复制器,来到此类的构造方法,我们查看几个关键步骤逻辑

// 1. 这一步是关键,通过构建binaryLogClient 可以去拉取 mysql log相关信息 一个开源的组件
this.client = new BinaryLogClient(mysqlConfig.host, mysqlConfig.port, mysqlConfig.user, mysqlConfig.password);
// 2. binlog断点续传关键
BinlogPosition startBinlog = start.getBinlogPosition();
// 3. 反序列化
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(
   EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG_MICRO,
   EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY,
   EventDeserializer.CompatibilityMode.INVALID_DATE_AND_TIME_AS_MIN_VALUE
);
// 4. 设置监听者 当运行后会调用其内部的 onEvent()方法来真正的去处理 事件
this.binlogEventListener = new BinlogConnectorEventListener(client, queue, metrics, outputConfig);
// 5. 再调用 client.connect() 的时候 会去实际启动运行

BinlogConnectorEventListener-onEvent()
while (mustStop.get() != true) {
   try {
      // 向队列插入数据
      if ( queue.offer(ep, 100, TimeUnit.MILLISEConDS ) ) {
         break;
      }
   } catch (InterruptedException e) {
      return;
   }
}

9.偏移量存储服务启动

this.content.start()

this.content.start()-getPositionStoreThread()->this.positionStoreThread.start()->
    this.thread = new Thread(this, "Position Flush Thread")->run()->runLoop()->work()
    
真正运行的是其 PositionStoreThread-work() 方法

10.this.replicator.startReplicator() 

启动服务 其调用了 BinaryLogClient.connect 在这时 其复制抓取服务才真正的启动起来

11. replicator.runLoop();

runLoop() -> work() -> BinlogConnectorReplicator - work()

processRow(row) -> producer.push(row)

未命名文件 | ProcessOn免费在线作图,在线流程图,在线思维导图 |

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5689012.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存