Flink使用KafkaSource从Kafka消息队列中读取数据

Flink使用KafkaSource从Kafka消息队列中读取数据,第1张

Flink使用KafkaSource从Kafka消息队列读取数据

Flink使用KafkaSource从Kafka消息队列中读取数据

使用KafkaSource从Kafka消息队列中读取数据
1.KafkaSource创建的DataStream是一个并行的DataStream
2.KafkaSource创建的DataStream是一个无限的数据流

使用步骤:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

1.导入依赖:

org.apache.flink
flink-connector-kafka_2.11
1.13.2

2.new FlinkKafkaConsumer
3.调用env的addSource传入FlinkKafkaConsumer实例

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;


public class KafkaSource {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8081);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        //设置Evn的并行度
        env.setParallelism(2);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092");
        properties.setProperty("group.id", "test777");
        properties.setProperty("auto.offset.reset", "earliest"); //如果没有记录历史偏移量就从头读
        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>(
                "worldcount",
                new SimpleStringSchema(),
                properties
        );


        //调用env的addSource方法创建DataStream
        DataStreamSource lines = env.addSource(flinkKafkaConsumer);
        System.out.println("kafkaSource的并行度:" + lines.getParallelism());
        lines.print();
        env.execute();

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5691324.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存