覆盖了怎么写

覆盖了怎么写,第1张

flume写kafkatopic覆盖问题fix

结构:

nginx-flume->;卡夫卡->;水槽->;卡夫卡(因为涉及到跨主机房的问题,两个卡夫卡中间加了个水槽,睾丸疼。。)


状态:

第二层,卡夫卡加载的话题和卡夫卡加载的话题是一样的,手动设置的sink话题没有作用。


打开调试日志:

源实例化:

21 Apr 2015 19:24:03,146 INFO [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41) - Creating instance of source kafka1, type org.apache.flume.source.kafka.KafkaSource 21 Apr 2015 19:24:03,146 DEBUG [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.getClass:61)  - Source type org.apache.flume.source.kafka.KafkaSource is a custom type 21 Apr 2015 19:24:03,152 INFO  [conf-file-poller-0] (org.apache.flume.source.kafka.KafkaSourceUtil.getKafkaProperties:37)  - context={ parameters:{topic=bigdata_api_ele_me_access, batchDurationMillis=5000, groupId=nginx, zookeeperConnect=xxx, channels=bigdata_api_ele_me_access-channel4, batchSize=2000, type=org.apache.flume.source.kafka.KafkaSource} }

接收器实例化:

21 Apr 2015 19:24:03,185 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: web-sink2, type: org.apache.flume.sink.kafka.KafkaSink 21 Apr 2015 19:24:03,185 DEBUG [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.getClass:63)  - Sink type org.apache.flume.sink.kafka.KafkaSink is a custom type 21 Apr 2015 19:24:03,190 DEBUG [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:220)  - Using batch size: 2000 21 Apr 2015 19:24:03,190 INFO  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:229)  - Using the static topic: nginx-access this may be over-ridden by event headers 21 Apr 2015 19:24:03,191 INFO  [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)  - context={ parameters:{topic=nginx-access, brokerList=1xxx, requiredAcks=1, batchSize=2000, type=org.apache.flume.sink.kafka.KafkaSink, channel=bigdata_api_ele_me_access-channel4} } 21 Apr 2015 19:24:03,191 DEBUG [conf-file-poller-0] (org.apache.flume.sink.kafka.KafkaSink.configure:236)  - Kafka producer properties: {metadata.broker.list=192.168.101.43:9092,192.168.101.44:9092,192.168.101.45:9092, request.required.acks=1, key.serializer.class=kafka.serializer.StringEncoder, serializer.class=kafka.serializer.DefaultEncoder}

当您可以看到建立接收器和源案例的案例时,上下文中的主题根据设置进行设置,但您可以在日志中看到以下段落:

Using the static topic: nginx-access this may be over-ridden by event headers

分析KafkaSink源代码:

org.Apache.flume.sink.Kafka.Kafkasink.process道:

  public static final String KEY_HDR = "key";   public static final String TOPIC_HDR = "topic";   ...         if ((eventTopic = headers.get(TOPIC_HDR)) == null) {           eventTopic = topic;         } //eventTopic的赋值,会从header中获得,假如header中沒有才会应用配备的topic         ...         eventKey = headers.get(KEY_HDR);         ...         KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>           (eventTopic, eventKey, eventBody);         messageList.add(data);

其中主题的分配在配置中:

    topic = context.getString(KafkaSinkConstants.TOPIC,       KafkaSinkConstants.DEFAULT_TOPIC); //根据flume的配备获得topic,要是没有设定topic按默认设置default-flume-topic解决     if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {       logger.warn("The Property 'topic' is not set. "          "Using the default topic name: "          KafkaSinkConstants.DEFAULT_TOPIC);     } else {       logger.info("Using the static topic: "  topic          " this may be over-ridden by event headers"); //这儿提醒很有可能会被header遮盖     }

标题来自:

1)1)Kafka中的数据信息没有header的定义。

2)flume中的信息由header/body定义。

在这种结构下,数据信息从kafkasource进入flume,添加头信息内容,然后注入kafkasink。

kafkasource中header的添加是通过org.Apache.flume.source.Kafka.Kafkasource.process的方式解决的:

        if (iterStatus) {           // get next message           MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();           kafkaMessage = messageAndMetadata.message();           kafkaKey = messageAndMetadata.key();           // Add headers to event (topic, timestamp, and key)           headers = new HashMap<String, String>();           headers.put(KafkaSourceConstants.TIMESTAMP,                   String.valueOf(System.currentTimeMillis()));           headers.put(KafkaSourceConstants.TOPIC, topic);

由于kafka中不使用header,只需在org.Apache.flume.sink.Kafka.Kafkasink.process中注释掉这些代码即可:

        /*         if ((eventTopic = headers.get(TOPIC_HDR)) == null) {           eventTopic = topic;         }         */         eventTopic = topic; //提升这一段,不然会出现npe不正确

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/783451.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-04
下一篇 2022-05-04

发表评论

登录后才能评论

评论列表(0条)

保存