继上一篇关于suervisor启动流程的文章,发现一了关于kafkaRecordSupplier的一些疑问。本篇就单独拎出这个类来看一下做的什么事情。
首先KafkaRecordSupplier是干什么用的?它主要是 *** 作kafka数据真正执行的地方,比如拉取kafka数据的poll函数在这个类里面实现的。
KafkaRecordSupplier继承了RecordSupplier, 而RecordSupplier只有kafka和Kinesis两个地方做了具体的实现,RecordSupplier是在正在运行的task中调用使用的。
详解看一下这个类中有哪些方法和属性。
下面是KafkaRecordSupplier类中主要的属性,都是和连接 *** 作kafka相关的:
KafkaConsumerkafka consumer客户端,在构造函数中创建consumer
Mapkafka consumer客户端的参数配置,在构造函数中创建consumerProperties
String dataSourcedatasource
下面是KafkaRecordSupplier类中的主要方法,都是和 *** 作kafka相关的:
public void assign(Set告诉kafka从指定的topic-partations进行消费,不受group-id的限制> streamPartitions)
public void seek(StreamPartition用于指定partation的offset(druid是自己管理offsetde )partition, Long sequenceNumber)
public Listkafka客户端获取数据> poll(long timeout)
从以上属性和方法可以看出KafkaRecordSupplier是干什么用的了。说白了就是kafka消费者的客户端。
疑问正常理解的话,一个task任务只对应一个kafkaRecordSupplier对象的时候才能正常摄入数据才对,但是发现kafka-index会有三个地方创建了kafkaRecordSupplier,这就让我很迷惑了。下面就看看这个三个创建kafkaRecordSupplier对象的地方以及用法。
(1) kafka supervisor在创建supervisor线程之前创建了KafkaRecordSupplier对象,通过对该对象的分析,该对象只调用的assign()方法和seek()方法,并没有实际的获取数据的 *** 作。该对象存在的作用是对topic进行初始化,和元数据库中的partations信息做一个对比并更新,比如对supervisor执行reset *** 作。
@Override protected RecordSuppliersetupRecordSupplier() { return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper, emitter, spec.getDataSchema().getDataSource()); }
(2) kafka index task在创建task任务的时候创建了kafkaRecordSupplier对象,该对象主要是用于拉取kafka中的数据用的,执行poll()函数。
@Override protected KafkaRecordSupplier newTaskRecordSupplier() { ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); final Mapprops = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties()); props.put("auto.offset.reset", "none"); props.put("key.deserializer", ByteArrayDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); return new KafkaRecordSupplier(props, configMapper, emitter, dataSource); } finally { Thread.currentThread().setContextClassLoader(currCtxCl); } }
(3) 创建KafkaSamplerFirehose的时候创建了KafkaRecordSupplier对象,主要是用于通过firehose的方式获取数据的时候用到。不过在文档中没有找到关于type=firehose的时候关于配置kafa的描述。具体怎么使用还没有研究明白。猜测是在tranquility的时候使用的?
protected class KafkaSamplerFirehose extends SeekableStreamSamplerFirehose { private KafkaSamplerFirehose(InputRowParser parser) { super(parser); } @Override protected RecordSupplier getRecordSupplier() { ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); final Mapprops = new HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties()); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "none"); props.put("key.deserializer", ByteArrayDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs())); return new KafkaRecordSupplier(props, objectMapper, null, ""); } finally { Thread.currentThread().setContextClassLoader(currCtxCl); } } }
总之,三个kafkaRecordSupplier对象各自负责自己用途,互补影响
END以上,单独对KafkaRecordSupplier做了简单的描述,通过疑问其实可以对kafka的数据摄入有了更直观更深入的了解。如有描述不对的地方还请读者指教~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)