Druid kafka-index KafkaRecordSupplier详解

Druid kafka-index KafkaRecordSupplier详解,第1张

Druid kafka-index KafkaRecordSupplier详解 前言

    继上一篇关于suervisor启动流程的文章,发现一了关于kafkaRecordSupplier的一些疑问。本篇就单独拎出这个类来看一下做的什么事情。

    首先KafkaRecordSupplier是干什么用的?它主要是 *** 作kafka数据真正执行的地方,比如拉取kafka数据的poll函数在这个类里面实现的。

    KafkaRecordSupplier继承了RecordSupplier, 而RecordSupplier只有kafka和Kinesis两个地方做了具体的实现,RecordSupplier是在正在运行的task中调用使用的。

 详解

    看一下这个类中有哪些方法和属性。

    下面是KafkaRecordSupplier类中主要的属性,都是和连接 *** 作kafka相关的:

KafkaConsumer consumer
kafka consumer客户端,在构造函数中创建
Map consumerProperties
kafka consumer客户端的参数配置,在构造函数中创建
String dataSource
datasource

    下面是KafkaRecordSupplier类中的主要方法,都是和 *** 作kafka相关的:

public void assign(Set> streamPartitions)
告诉kafka从指定的topic-partations进行消费,不受group-id的限制
public void seek(StreamPartition partition, Long sequenceNumber)
用于指定partation的offset(druid是自己管理offsetde )
public List> poll(long timeout)
kafka客户端获取数据

    从以上属性和方法可以看出KafkaRecordSupplier是干什么用的了。说白了就是kafka消费者的客户端。

疑问

    正常理解的话,一个task任务只对应一个kafkaRecordSupplier对象的时候才能正常摄入数据才对,但是发现kafka-index会有三个地方创建了kafkaRecordSupplier,这就让我很迷惑了。下面就看看这个三个创建kafkaRecordSupplier对象的地方以及用法。

   (1) kafka supervisor在创建supervisor线程之前创建了KafkaRecordSupplier对象,通过对该对象的分析,该对象只调用的assign()方法和seek()方法,并没有实际的获取数据的 *** 作。该对象存在的作用是对topic进行初始化,和元数据库中的partations信息做一个对比并更新,比如对supervisor执行reset *** 作。

  @Override
  protected RecordSupplier setupRecordSupplier()
  {
    return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper, emitter, spec.getDataSchema().getDataSource());
  }

    (2) kafka index task在创建task任务的时候创建了kafkaRecordSupplier对象,该对象主要是用于拉取kafka中的数据用的,执行poll()函数。

  @Override
  protected KafkaRecordSupplier newTaskRecordSupplier()
  {
    ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
    try {
      Thread.currentThread().setContextClassLoader(getClass().getClassLoader());

      final Map props = new HashMap<>(((KafkaIndexTaskIOConfig) super.ioConfig).getConsumerProperties());

      props.put("auto.offset.reset", "none");
      props.put("key.deserializer", ByteArrayDeserializer.class.getName());
      props.put("value.deserializer", ByteArrayDeserializer.class.getName());

      return new KafkaRecordSupplier(props, configMapper, emitter, dataSource);
    }
    finally {
      Thread.currentThread().setContextClassLoader(currCtxCl);
    }
  }

    (3) 创建KafkaSamplerFirehose的时候创建了KafkaRecordSupplier对象,主要是用于通过firehose的方式获取数据的时候用到。不过在文档中没有找到关于type=firehose的时候关于配置kafa的描述。具体怎么使用还没有研究明白。猜测是在tranquility的时候使用的?

protected class KafkaSamplerFirehose extends SeekableStreamSamplerFirehose
  {
    private KafkaSamplerFirehose(InputRowParser parser)
    {
      super(parser);
    }

    
    @Override
    protected RecordSupplier getRecordSupplier()
    {
      ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
      try {
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());

        final Map props = new HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties());

        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "none");
        props.put("key.deserializer", ByteArrayDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());
        props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs()));

        return new KafkaRecordSupplier(props, objectMapper, null, "");
      }
      finally {
        Thread.currentThread().setContextClassLoader(currCtxCl);
      }
    }
  }

总之,三个kafkaRecordSupplier对象各自负责自己用途,互补影响

END

    以上,单独对KafkaRecordSupplier做了简单的描述,通过疑问其实可以对kafka的数据摄入有了更直观更深入的了解。如有描述不对的地方还请读者指教~

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5665261.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存