简介
MM2.0主要是基于Kafka Connect实现的跨集群同步工具
主要的功能有:
- 同步Topic数据到目标集群,并保持分区信息
- 同步Topic配置到目标集群,与源集群保持一致
- 同步Topic ACL到目标集群,与源集群保持一致(没有WRITE权限)
- 自动感知新的Topic和分区并同步到目标集群
- 同步组Offset到目标集群
- 基于Connect实现,特性:高可用、水平扩展
- 指定Topic、Group同步规则
- 自定义目标集群Topic名称规则
支持的模式:
双活:A->B, B->A
主备:A->B
聚合(多对一):A->K, B->K, C->K
分发(一对多):K->A, K->B, K->C
转发:A->B, B->C, C→D
限制:
为保证Offset同步,源集群Topic的分区数必须和目标集群Topic的分区数保持一致(目标Topic分区数>=源Topic分区数),不能出现源120分区目标80分区情况,这个分区数MM任务会同步
为保证Offset同步,Topic数据同步必须是一对一同步
对于双活、聚合这种模式,每个集群存在同一个Topic的多个镜像Topic
依赖:源端最低要求2.0.0版本,目标端最低要求2.5.0版本
KIP-545: support automated consumer offset sync across clusters in MM 2.0 2.7.0
KIP-396: Add Commit/List Offsets Operations to AdminClient 2.5.0
KIP-382: MirrorMaker 2.0 2.4.0
KIP-222 - Add Consumer Group operations to Admin API 2.0.0
实现
MM2.0主要有三个子任务,同步数据任务、同步Offset任务、心跳任务。
同步数据任务
多实例,按topic partition分配任务,同步topic partition数据,并产生上下游offset对应关系
MirrorSourceConnector 初始化工作,同步创建Topic以及配置、ACL,分配任务信息
start(config):
- 创建OffsetSyncsTopic,单分区、Compact(在源集群创建) "mm2-offset-syncs." + targetClusterAlias() + ".internal"
- 加载源集群、目标集群Topic Partiton信息,Config: topics、topics.exclude
- 创建目标集群Topic,如果不存在就创建,存在保持分区数一致(目标集群Topic分区数不少于源集群)
- 刷新目标集群Topic Partiton信息
- 同步Topic ACL到目标集群(定时任务,默认间隔10min),这里不同步对Topic的WRITE权限,也就是目标集群Topic一般用户不具备写权限只有MM可以写入(这里考虑切换的时候需要关注)
- 同步Topic配置到目标集群(定时任务,默认间隔10min),配置过滤,Config: config.properties.exclude
- 刷新源集群Topic Partiton信息并同步到目标集群(定时任务,默认间隔10min),如果分区信息发生变化请求重新进行任务配置requestTaskReconfiguration()
taskConfigs(maxTasks): connect框架方法,分配任务配置,按task数轮训分配topic partiton任务,每个任务消费被分配到的topic partiton进行数据同步
- 分配每一个任务的配置信息(按分区分配任务)
MirrorSourceTask 同步数据任务,产生offset映射关系
poll():
- 拉取源集群Topic数据
- 转换目标topic名称返回数据
- 数据由connect框架写入目标topic
commitRecord(SourceRecord record, Recordmetadata metadata):
- 记录Topic partition上下游offset映射关系,写入topicPartition -> upstreamOffset, downstreamOffset映射关系到OffsetSyncsTopic里面
同步Offset任务
多实例,按group分配任务,产生group + topic partition -> upstreamOffset, downstreamOffset的checkpoint数据,并提交下游group的offset
MirrorCheckpointConnector
start(config):
- 创建CheckpointsTopic,单分区、Compact(在目标集群创建) sourceClusterAlias() + ".checkpoints.internal"
- 加载所有需要同步的组,Config: groups、groups.exclude
- 刷新所有需要同步的组,(定时任务,默认间隔10min),如果组信息发生变化请求重新进行任务配置requestTaskReconfiguration()
taskConfigs(maxTasks): connect框架方法,分配任务配置,按task数进行group分配,每个任务同步被分配到的group进行offset同步
- 分配每一个任务的配置信息(按group分配任务)
MirrorCheckpointTask
start(config):
- 刷新需要同步的group在目标集群为空闲状态的group信息以及提交的offset信息(定时任务,默认间隔60s)
- 同步group offset到目标集群(定时任务,默认间隔60s),将最新的checkpoint数据提交到目标集群空闲的group中
poll():connect框架方法
- checkpoint默认间隔时间60s
- 不断消费OffsetSyncsTopic,直到发送checkpoint时间为止,这个topic保存的是topicPartition -> upstreamOffset, downstreamOffset映射关系
- 查询分配到的group的提交offset信息,生产checkpoint数据Checkpoint(group, targetTopicPartition, upstreamOffset, downstreamOffset, metadata)
- downstreamOffset计算方法(核心逻辑) long upstreamStep = upstreamOffset - offsetSync.upstreamOffset(); downstreamOffset = offsetSync.downstreamOffset() + upstreamStep;
- checkpoint数据由connect框架写入目标topic
心跳任务
单实例,产生心跳数据
MirrorHeartbeatConnector
- 创建HeartbeatsTopic,单分区、Compact(在目标集群创建) heartbeats
- 分配任务配置,只有一个任务
MirrorHeartbeatTask
poll():connect框架方法
- 默认每1秒发送一条心跳数据,Heartbeat(sourceClusterAlias, targetClusterAlias, timestamp)
国内最大最权威的 Kafka中文社区 ,在这里你可以结交各大互联网Kafka大佬以及近2000+Kafka爱好者,一起实现知识共享,实时掌控最新行业资讯,免费加入中~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)