KafkaStreamer简单使用

KafkaStreamer简单使用,第1张

KafkaStreamer简单使用

方便和我一样的萌新,省去翻文档的时间
下面是官方文档的示例

KafkaStreamer kafkaStreamer = new KafkaStreamer<>();
IgniteDataStreamer stmr = ignite.dataStreamer("myCache"));

// allow overwriting cache data
stmr.allowOverwrite(true);

kafkaStreamer.setIgnite(ignite);//ignite客户端配置参考官方文档
kafkaStreamer.setStreamer(stmr);

// set the topic
kafkaStreamer.setTopic(someKafkaTopic);//见下

// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(4);

// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);//参考kafka官方文档配置,和Consumer配置一样的

// set extractor
kafkaStreamer.setSingleTupleExtractor(strExtractor);//见下

kafkaStreamer.start();

...

// stop on shutdown
kafkaStreamer.stop();

strm.close();

对示例里函数用法的补充
  • kafkaStreamer.setTopic(someKafkaTopic);

    kafkaStreamer.setTopic(Arrays.asList("topic1","topic2"));

  • kafkaStreamer.setSingleTupleExtractor(strExtractor);

    kafkaStreamer.setSingleTupleExtractor(new StreamSingleTupleExtractor() {
    
        @Override
        public Map.Entry extract(ConsumerRecord record) {
            
            //写自己的数据转换        
            String key=.....
            Int val=...
    
            return new IgniteBiTuple<>(key, val);
        }
    });
    
    
    • 如果一次返回多组数据,使用StreamMultipleTupleExtractor

    • 参考:
      ZeroMqStringSingleTupleExtractor.java
      KafkaIgniteStreamerSelfTest.java

  • stmr.flush()

    接收到的数据不会自动加入cache,此函数在不关闭stmr的情况下加入数据到cache。如果使用stmr.close(),会在关闭之前加入剩余的数据

相关参考

数据流处理 | Apache Ignite技术服务 (ignite-service.cn)

Kafka流处理器 | Apache Ignite技术服务 (ignite-service.cn)

Interface IgniteDataStreamer

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5576424.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存