全个人理解-mark 有错误望指出
当我们部署完成一个maxwell服务 我们想要监听某个mysql表。其官方为我们提供的脚本
可以看到是使用 bin 目录下面maxwell 这个脚本进行启动的, 接着我们来看这个脚本。
可以看到其都是通过调用 com.zendesk.maxwell.Maxwell 进行的 java 嘛 我们都知道调用这个类肯定执行的为它的main方法。
所以我们源码的解读从这个类的main方法开始
Maxwell整个main方法中 比较关键的三个步骤
- 构建maxwell配置
- 通过配置来创建 Maxwell对象 -- 没什么重要的事情
- maxwell.start()启动
主要看maxwell.start() 启动,在其内部又调用了 startInner()方法
startInner()主要的步骤-中间细节忽略
// 1. binlog format row 确保binlog为 row格式 MaxwellMysqlStatus.ensureReplicationMysqlState(connection); // 2. 仅读模式 MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection); // 3. gtid 全局唯一事务id if (config.gtidMode) { // gtid MaxwellMysqlStatus.ensureGtidMysqlState(connection); } // 4. 确保 maxwell相关元数据表已经创建完成- 如果未创建则通过sql文件初始化创建 SchemaStoreSchema.ensureMaxwellSchema(rawConnection, this.config.databaseName) // 5. 确保 相关表中有关键字段 SchemaStoreSchema.upgradeSchemaStoreSchema(schemaConnection); // 6. 获取 producer 默认没有 会通过我们传递的参数来选择 --producer=kafka // 这边在创建完以后就会直接进行启动 AbstractProducer producer = this.context.getProducer(); case "kafka": this.producer = new MaxwellKafkaProducer(this, this.config.getKafkaProperties(), this.config.kafkaTopic); // 以线程方式启动 然后start() 中会不断的从queue中拿取数据,并发送 // 7. 获取初始化位置 Position initPosition = getInitialPosition(); // 8. 创建binlog连接复制器 this.replicator = new BinlogConnectorReplicator(xxx,x,x,x,x,,x,x,x,) // 9. 偏移量存储服务启动 this.context.start(); getPositionStoreThread()->this.positionStoreThread.start()-> this.thread = new Thread(this, "Position Flush Thread")->run()->runLoop()->work() [找到实现类] // 10. 并启动服务 其调用了 BinaryLogClient.connect this.replicator.startReplicator() -->> this.client.connect(5000); // 11. 复制服务真正的启动 replicator.runLoop();步骤详解 1. MaxwellMysqlStatus.ensureReplicationMysqlState(connection);
binlog format row 确保binlog为 row格式
m.ensureVariableState("log_bin", "ON"); m.ensureVariableState("binlog_format", "ROW");6.获取 producer 默认没有 会通过我们传递的参数来选择 --producer=kafka
// 这边在创建完以后就会直接进行启动
AbstractProducer producer = this.context.getProducer();
-- 通过我们脚本传输的命令 进行判断选用了哪种Producer
然后进入 new MaxwellKafkaProducer的构造方法可以看到, 初始化以后就直接进行了启动
对于java来说 其thread启动以后 真正运行的是它的run方法 我们紧着看run方法实现
可以看到是不断的从queue中拿取消息并发送, 那queue里面的消息 其实就是我们下面binlogConnectorReplicator 塞进去的。
7. 获取初始化位置Position initPosition = getInitialPosition();
其实就是获取我们初始偏移量位置,会有几种方式
- 我们是否自己初始化了位置
- 正在从 主从交换中恢复嘛
- 以前是否有cliend_id, 如果是这样我们必须从这里恢复
- 抓取当前master的position
创建binlog连接复制器,来到此类的构造方法,我们查看几个关键步骤逻辑
// 1. 这一步是关键,通过构建binaryLogClient 可以去拉取 mysql log相关信息 一个开源的组件 this.client = new BinaryLogClient(mysqlConfig.host, mysqlConfig.port, mysqlConfig.user, mysqlConfig.password); // 2. binlog断点续传关键 BinlogPosition startBinlog = start.getBinlogPosition(); // 3. 反序列化 EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setCompatibilityMode( EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG_MICRO, EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY, EventDeserializer.CompatibilityMode.INVALID_DATE_AND_TIME_AS_MIN_VALUE ); // 4. 设置监听者 当运行后会调用其内部的 onEvent()方法来真正的去处理 事件 this.binlogEventListener = new BinlogConnectorEventListener(client, queue, metrics, outputConfig); // 5. 再调用 client.connect() 的时候 会去实际启动运行BinlogConnectorEventListener-onEvent()
while (mustStop.get() != true) { try { // 向队列插入数据 if ( queue.offer(ep, 100, TimeUnit.MILLISEConDS ) ) { break; } } catch (InterruptedException e) { return; } }9.偏移量存储服务启动
this.content.start()
this.content.start()-getPositionStoreThread()->this.positionStoreThread.start()-> this.thread = new Thread(this, "Position Flush Thread")->run()->runLoop()->work() 真正运行的是其 PositionStoreThread-work() 方法10.this.replicator.startReplicator()
启动服务 其调用了 BinaryLogClient.connect 在这时 其复制抓取服务才真正的启动起来
11. replicator.runLoop();runLoop() -> work() -> BinlogConnectorReplicator - work()
processRow(row) -> producer.push(row)
未命名文件 | ProcessOn免费在线作图,在线流程图,在线思维导图 |
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)