kafkaRedisElasticsearchJDBC自定义sink
Flink没有类似于spark中foreach方法,让用户进行迭代的 *** 作。虽有对外的输出 *** 作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出 *** 作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。
kafka// 从Kafka中读取数据 DataStreamRedisinputStream = env.addSource( new FlinkKafkaConsumer ("sensor", new SimpleStringSchema(), properties)); // 将数据写入Kafka dataStream.addSink( new FlinkKafkaProducer ("localhost:9092", "sinktest", new SimpleStringSchema()));
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("localhost") .setPort(6379) .setPassword("123456") .setDatabase(0) .build(); dataStream.addSink(new RedisSink<>(config, new MyRedisMapper()));Elasticsearch
ListJDBC自定义sinkhttpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("localhost", 9200)); dataStream.addSink( new ElasticsearchSink.Builder (httpHosts, new MyEsSinkFunction()).build());
Flink之Mysql数据CDC
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)