方便和我一样的萌新,省去翻文档的时间
下面是官方文档的示例
KafkaStreamer对示例里函数用法的补充kafkaStreamer = new KafkaStreamer<>(); IgniteDataStreamer stmr = ignite.dataStreamer("myCache")); // allow overwriting cache data stmr.allowOverwrite(true); kafkaStreamer.setIgnite(ignite);//ignite客户端配置参考官方文档 kafkaStreamer.setStreamer(stmr); // set the topic kafkaStreamer.setTopic(someKafkaTopic);//见下 // set the number of threads to process Kafka streams kafkaStreamer.setThreads(4); // set Kafka consumer configurations kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);//参考kafka官方文档配置,和Consumer配置一样的 // set extractor kafkaStreamer.setSingleTupleExtractor(strExtractor);//见下 kafkaStreamer.start(); ... // stop on shutdown kafkaStreamer.stop(); strm.close();
- kafkaStreamer.setTopic(someKafkaTopic);
kafkaStreamer.setTopic(Arrays.asList("topic1","topic2"));
-
kafkaStreamer.setSingleTupleExtractor(strExtractor);
kafkaStreamer.setSingleTupleExtractor(new StreamSingleTupleExtractor
() { @Override public Map.Entry extract(ConsumerRecord record) { //写自己的数据转换 String key=..... Int val=... return new IgniteBiTuple<>(key, val); } }); -
如果一次返回多组数据,使用StreamMultipleTupleExtractor
-
参考:
ZeroMqStringSingleTupleExtractor.java
KafkaIgniteStreamerSelfTest.java
-
-
stmr.flush()
接收到的数据不会自动加入cache,此函数在不关闭stmr的情况下加入数据到cache。如果使用stmr.close(),会在关闭之前加入剩余的数据
相关参考数据流处理 | Apache Ignite技术服务 (ignite-service.cn)
Kafka流处理器 | Apache Ignite技术服务 (ignite-service.cn)
Interface IgniteDataStreamer
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)