flink教程-2-Source

flink教程-2-Source,第1张

flink教程-2-Source

文章目录
      • 1.Source
        • 1.1 集合
        • 1.2 文件File
        • 1.3 kafka
        • 1.4 自定义数据源UDF

1.Source

数据源

bean

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SensorReading {
    private String id;
    private Long timestamp;
    private Double temperature;
}
1.1 集合

env.fromCollection

public class SourceTest01 {
    public static void main(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream dataStream = env.fromCollection(Arrays.asList(
                new SensorReading("sensor_1", 1111L, 11.1),
                new SensorReading("sensor_2", 1411L, 11.2),
                new SensorReading("sensor_3", 1211L, 11.3),
                new SensorReading("sensor_4", 1215L, 11.6)
        ));

        DataStream streamSource = env.fromElements(1, 2, 3, 4, 5);

        dataStream.print("list");
        streamSource.print("int");

        env.execute();

    }

}

1.2 文件File

env.readTextFile

public class SourceTest02_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        String path = "E:\atguiguDemo03\leet-code\flink04_java\src\main\resources\1.txt";
        DataStream dataStream = env.readTextFile(path);
        dataStream.print("txt");
        env.execute();
    }
}

或者用反射

public class SourceTest02_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        URL resource = SourceTest02_File.class.getResource("/1.txt");
        DataStream dataStream = env.readTextFile(resource.getPath());
        dataStream.print("txt");
        env.execute();
    }
}

1.3 kafka

maven

		
            org.apache.flink
            flink-connector-kafka-0.11_2.12
            1.10.1
        

env.addSource(new FlinkKafkaConsumer011())

public class SourceTest03_Kafka {
    public static void main(String[] args)throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        Properties properties = new Properties();
        properties.setProperty("", "");
        DataStream dataStream = env.addSource(new FlinkKafkaConsumer011("sensor", new SimpleStringSchema(), properties));
        dataStream.print("txt");
        env.execute();
    }
}

中间可以设置参数

kafka-console-producer.sh --brocker-list locahost:9092 --topic sensor


1.4 自定义数据源UDF
public class SourceTest04_UDF {
    public static void main(String[] args)throws Exception{
        // 自定义数据源
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream dataStream = env.addSource(new MySource());
        dataStream.print("txt");
        env.execute();
    }
    public static class MySource implements SourceFunction{
        private boolean running = true;
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            Random random = new Random();
            HashMap hashMap = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                hashMap.put("sensor_" + (i + 1), random.nextGaussian() * 20 + 60);
            }
            while (running){
                for (String s : hashMap.keySet()) {
                    Double nextTemp = hashMap.get(s) + random.nextGaussian();
                    hashMap.put(s, nextTemp);
                    sourceContext.collect(new SensorReading(s, System.currentTimeMillis(), nextTemp));

                }
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5638840.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存