Kafka跨集群同步工具-MirrorMaker2.0详解

Kafka跨集群同步工具-MirrorMaker2.0详解,第1张

Kafka跨集群同步工具-MirrorMaker2.0详解

简介

MM2.0主要是基于Kafka Connect实现的跨集群同步工具

 

主要的功能有:

  • 同步Topic数据到目标集群,并保持分区信息
  • 同步Topic配置到目标集群,与源集群保持一致
  • 同步Topic ACL到目标集群,与源集群保持一致(没有WRITE权限)
  • 自动感知新的Topic和分区并同步到目标集群
  • 同步组Offset到目标集群
  • 基于Connect实现,特性:高可用、水平扩展
  • 指定Topic、Group同步规则
  • 自定义目标集群Topic名称规则

支持的模式:

双活:A->B, B->A

主备:A->B

聚合(多对一):A->K, B->K, C->K

分发(一对多):K->A, K->B, K->C

转发:A->B, B->C, C→D

限制:

为保证Offset同步,源集群Topic的分区数必须和目标集群Topic的分区数保持一致(目标Topic分区数>=源Topic分区数),不能出现源120分区目标80分区情况,这个分区数MM任务会同步

为保证Offset同步,Topic数据同步必须是一对一同步

对于双活、聚合这种模式,每个集群存在同一个Topic的多个镜像Topic

依赖:源端最低要求2.0.0版本,目标端最低要求2.5.0版本

KIP-545: support automated consumer offset sync across clusters in MM 2.0 2.7.0

KIP-396: Add Commit/List Offsets Operations to AdminClient 2.5.0

KIP-382: MirrorMaker 2.0 2.4.0

KIP-222 - Add Consumer Group operations to Admin API 2.0.0

实现

MM2.0主要有三个子任务,同步数据任务、同步Offset任务、心跳任务。

同步数据任务

多实例,按topic partition分配任务,同步topic partition数据,并产生上下游offset对应关系

MirrorSourceConnector 初始化工作,同步创建Topic以及配置、ACL,分配任务信息

start(config):

  1. 创建OffsetSyncsTopic,单分区、Compact(在源集群创建) "mm2-offset-syncs." + targetClusterAlias() + ".internal"
  2. 加载源集群、目标集群Topic Partiton信息,Config: topics、topics.exclude
  3. 创建目标集群Topic,如果不存在就创建,存在保持分区数一致(目标集群Topic分区数不少于源集群)
  4. 刷新目标集群Topic Partiton信息
  5. 同步Topic ACL到目标集群(定时任务,默认间隔10min),这里不同步对Topic的WRITE权限,也就是目标集群Topic一般用户不具备写权限只有MM可以写入(这里考虑切换的时候需要关注)
  6. 同步Topic配置到目标集群(定时任务,默认间隔10min),配置过滤,Config: config.properties.exclude
  7. 刷新源集群Topic Partiton信息并同步到目标集群(定时任务,默认间隔10min),如果分区信息发生变化请求重新进行任务配置requestTaskReconfiguration()

taskConfigs(maxTasks): connect框架方法,分配任务配置,按task数轮训分配topic partiton任务,每个任务消费被分配到的topic partiton进行数据同步

  1. 分配每一个任务的配置信息(按分区分配任务)

MirrorSourceTask 同步数据任务,产生offset映射关系

poll():

  1. 拉取源集群Topic数据
  2. 转换目标topic名称返回数据
  3. 数据由connect框架写入目标topic

commitRecord(SourceRecord record, Recordmetadata metadata):

  1. 记录Topic partition上下游offset映射关系,写入topicPartition -> upstreamOffset, downstreamOffset映射关系到OffsetSyncsTopic里面

同步Offset任务

多实例,按group分配任务,产生group + topic partition -> upstreamOffset, downstreamOffset的checkpoint数据,并提交下游group的offset

MirrorCheckpointConnector

start(config):

  1. 创建CheckpointsTopic,单分区、Compact(在目标集群创建) sourceClusterAlias() + ".checkpoints.internal"
  2. 加载所有需要同步的组,Config: groups、groups.exclude
  3. 刷新所有需要同步的组,(定时任务,默认间隔10min),如果组信息发生变化请求重新进行任务配置requestTaskReconfiguration()

taskConfigs(maxTasks): connect框架方法,分配任务配置,按task数进行group分配,每个任务同步被分配到的group进行offset同步

  1. 分配每一个任务的配置信息(按group分配任务)

MirrorCheckpointTask

start(config):

  1. 刷新需要同步的group在目标集群为空闲状态的group信息以及提交的offset信息(定时任务,默认间隔60s)
  2. 同步group offset到目标集群(定时任务,默认间隔60s),将最新的checkpoint数据提交到目标集群空闲的group中

poll():connect框架方法

  1. checkpoint默认间隔时间60s
  2. 不断消费OffsetSyncsTopic,直到发送checkpoint时间为止,这个topic保存的是topicPartition -> upstreamOffset, downstreamOffset映射关系
  3. 查询分配到的group的提交offset信息,生产checkpoint数据Checkpoint(group, targetTopicPartition, upstreamOffset, downstreamOffset, metadata)
  4. downstreamOffset计算方法(核心逻辑) long upstreamStep = upstreamOffset - offsetSync.upstreamOffset(); downstreamOffset = offsetSync.downstreamOffset() + upstreamStep;
  5. checkpoint数据由connect框架写入目标topic

心跳任务

单实例,产生心跳数据

MirrorHeartbeatConnector

  1. 创建HeartbeatsTopic,单分区、Compact(在目标集群创建) heartbeats
  2. 分配任务配置,只有一个任务

MirrorHeartbeatTask

poll():connect框架方法

  1. 默认每1秒发送一条心跳数据,Heartbeat(sourceClusterAlias, targetClusterAlias, timestamp)

 

国内最大最权威的 Kafka中文社区 ,在这里你可以结交各大互联网Kafka大佬以及近2000+Kafka爱好者,一起实现知识共享,实时掌控最新行业资讯,免费加入中~

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5676222.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存