【Kafka Connect】

【Kafka Connect】,第1张

【Kafka Connect】

Kafka Connect
  • 概念
  • 特点
  • 组件
    • Connectors
    • Tasks
    • Workers
    • Converters
    • Transforms
  • Dead Letter Queue
  • rebalance触发场景
  • 参考文章

概念

Kafka Connect是一个用于将数据流输入和输出Kafka的框架。Confluent平台附带了几个内置connector,可以使用这些connector进行关系数据库或HDFS等常用系统到kafka的数据传输

特点
  1. 数据从数据源读出或写入时延低;
  2. 从从不同数据源获取数据或将数据写入到不同数据源(如:客户端、数据库、HDFS、静态文件等等)
组件 Connectors

通过管理task来协调数据流的高级抽象。
可分为两种connectors:

  • Source connector
    源连接器可以从多种渠道(如:数据库、静态文件、HDFS客户端等)拉取数据到kafka topic中
  • Sink connector
    宿连接器将topic中的数据push到多种目的端消费。将Kafka主题中的数据传递到Elasticsearch等二级索引中,或Hadoop等批处理系统中,用于离线分析。
Tasks

如何将数据复制到Kafka或从Kafka复制数据的实现。实际进行数据传输的单元,和连接器一样同样分为 Source和Sink
Task是Connect数据模型中的主要处理数据的角色。每个connector实例协调一组实际复制数据的task。通过允许connector将单个作业分解为多个task,Kafka Connect提供了内置的对并行性和可伸缩数据复制的支持,只需很少的配置。这些任务没有存储任何状态。任务状态存储在Kafka中的特殊主题config.storage.topic和status.storage.topic中。因此,可以在任何时候启动、停止或重新启动任务,以提供d性的、可伸缩的数据管道。

Workers

执行Connector和Task的运行进程。Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程

Standalone Workers
Standalone模式是最简单的模式,用单一进程负责执行所有connector和task。适用于特定场景,如收集主机日志

Distributed Workers
分布式模式为Kafka Connect提供了可扩展性和自动容错能力,使用更广。在分布式模式下,相同group.id的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,其余的worker将检测到这一点,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)

Converters

用于在Connect和外部系统发送或接收数据之间转换数据的代码。Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换。
在向Kafka写入或从Kafka读取数据时,Converter是使Kafka Connect支持特定数据格式所必需的。task使用转换器将数据格式从字节更改为连接内部数据格式,反之亦然。

默认支持以下Converter

  • AvroConverter(建议)

io.confluent.connect.avro.AvroConverter 与Schema Registry一起使用

  • ProtobufConverter

io.confluent.connect.protobuf.ProtobufConverter: 需要使用 Schema Registry

  • JsonConverter
    org.apache.kafka.connect.json.JsonConverter (无需 Schema Registry): 转换为json结构
  • JsonSchemaConverter

io.confluent.connect.json.JsonSchemaConverter: 需要使用 Schema Registry。适合结构数据

  • StringConverter

org.apache.kafka.connect.storage.StringConverter: 简单的字符串格式

  • ByteArrayConverter

org.apache.kafka.connect.converters.ByteArrayConverter 提供不进行转换的“传递”选项

转换器与连接器本身解耦,以便在连接器之间自然地重用转换器。下图展示了在Kafka Connect中,Converter 在何时进行数据转换

Transforms

更改由连接器生成或发送到连接器的每个消息的简单逻辑

Connector可以配置转换,以便对单个消息(对应代码中的Record)进行简单且轻量的修改。可以配置多个Transform 组成一个链。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决。
然而,应用于多个消息的更复杂的转换最好使用KSQL和Kafka Stream实现。

转换是一个简单的函数,输入一条记录,并输出一条修改过的记录。Kafka Connect提供许多转换,它们都执行简单但有用的修改。可以使用自己的逻辑定制实现转换接口,将它们打包为Kafka Connect插件,将它们与connector一起使用。

Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行。当转换与source connector一起使用时,Kafka Connect通过第一个转换传递connector生成的每条源记录,第一个转换对其进行修改并输出一个新的源记录。将更新后的源记录传递到链中的下一个转换,该转换再生成一个新的修改后的源记录。最后更新的源记录会被转换为二进制格式写入到kafka。转换也可以与sink connector一起使用。

转换算子功能Cast将字段或整个键或值强制转换为特定类型(例如,将整数字段强制转换为较小的宽度)Drop从记录中删除键或值,并将其设置为null。ExtractField在存在架构时,从结构中提取指定的字段,在无架构数据的情况下,从map中提取特定字段。null不做修改直接传递。ExtractTopic用新topic替换旧topicFilter (Apache Kafka)Drop all records. Designed to be used in conjunction with a Predicate.Filter (Confluent)Include or drop records that match a configurable filter.condition.FlattenFlatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character.HoistFieldWrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data.InsertFieldInsert field using attributes from the record metadata or a configured static value.MaskFieldMask specified fields with a valid null value for the field type.MessageTimeStampRouterUpdate the record’s topic field as a function of the original topic value and the record’s timestamp field.RegexRouter当前不适用于managed connector。使用配置的正则表达式和替换字符串更新记录主题。ReplaceField过滤或重命名字段。SetSchemametadataSet the schema name, version, or both on the record’s key or value schema.TimestampConverterConvert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types.TimestampRouterTimestampRouterTombstoneHandlerManage tombstone records. A tombstone record is defined as a record with the entire value field being null, whether or not it has ValueSchema.ValueToKey将记录键替换为由记录值中的字段子集形成的新键。 Dead Letter Queue

与其他MQ不同,Kafka 并没有死信队列这个功能。但是Kafka Connect提供了这一功能。

当Sink Task遇到无法处理的消息,会根据errors.tolerance配置项决定如何处理,默认情况下(errors.tolerance=none) Sink 遇到无法处理的记录会直接抛出异常,Task进入Fail 状态。开发人员需要根据Worker的错误日志解决问题,然后重启Task,才能继续消费数据

设置 errors.tolerance=all,Sink Task 会忽略所有的错误,继续处理。Worker中不会有任何错误日志。可以通过配置errors.deadletterqueue.topic.name = 让无法处理的消息路由到 Dead Letter Topic

rebalance触发场景
  1. 有新的connector加入或退出集群
  2. 当connector增加或减少它们所需的task数量,或者更改connector的配置时
  3. 当一个worker失败时,task在active的worker之间重新平衡

PS: 当一个task失败时,不会触发再平衡,因为task失败被认为是一个例外情况。因此,失败的task不会被框架自动重新启动,应该通过REST API重新启动

参考文章

https://docs.confluent.io/platform/current/connect/index.html
https://www.jianshu.com/p/fae25cc63997
https://segmentfault.com/a/1190000039395164

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5654399.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存