Flink学习笔记之流处理核心编程--数据源读取

Flink学习笔记之流处理核心编程--数据源读取,第1张

本篇博客主要是写了如何从不同的数据源里读取数据,包括Flink从Java集合中读取数据、从本地文件当中读取数据、从Socket读取数据、从Kafka读取数据、从自定义Source获取数据。


目录
  • 1、从Java集合中读取数据
  • 2、从本地文件当中读取数据
  • 3、从Socket读取数据
    • 写法1
    • 写法2
  • 4、从Kafka读取数据
    • 新版写法
    • 老版写法
  • 5、从自定义Source获取数据


1、从Java集合中读取数据

一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。

package com.lqs.five.part1_source;

import com.lqs.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.List;

/**
 * @Author lqs
 * @Date 2022年03月28日 16:42:20
 * @Version 1.0.0
 * @ClassName SourceList
 * @Describe 从集合里面读取数据
 */
public class Test01_SourceList {

    public static void main(String[] args) throws Exception {
        
        //TODO 1、获取流的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //TODO 2、从集合中获取数据

        List<WaterSensor> waterSensors = Arrays.asList(
                new WaterSensor("1", 134L, 34),
                new WaterSensor("2", 1434L, 55),
                new WaterSensor("4", 1367L, 354)
        );

        env.fromCollection(waterSensors).print();

        /*//创建集合
        List list = Arrays.asList(1, 2, 3, 4, 5);

        DataStreamSource streamSource = env.fromCollection(list).setParallelism(2);

        //多并行度方法
        env.fromCollection();*/

        //从元素中获取数据
        //The parallelism of non parallel operator must be 1
        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5).setParallelism(1);

        streamSource.print();

        env.execute("SourceList");

    }
    
}

2、从本地文件当中读取数据

注意:

  1. 参数可以是目录也可以是文件
  2. 路径可以是相对路径也可以是绝对路径
  3. 相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
  4. 也可以从hdfs目录下读取, 使用路径:hdfs://nwh120:8020/…, 由于Flink没有提供hadoop相关依赖, 需要pom中添加
    关依赖:

    org.apache.hadoop
    hadoop-client
    3.1.3
package com.lqs.five.part1_source;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author lqs
 * @Date 2022年03月28日 16:46:27
 * @Version 1.0.0
 * @ClassName Test02_SourceFile
 * @Describe 从文件读取数据
 */
public class Test02_SourceFile {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2、读取文件里面的数据
        env.readTextFile("input/hcc").print();

        env.execute("SourceFile");

        /**
         * 注意;
         * 1.	参数可以是目录也可以是文件
         * 2.	路径可以是相对路径也可以是绝对路径
         * 3.	相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录
         * 4.	也可以从hdfs目录下读取, 使用路径:hdfs://hadoop102:8020/...., 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖:
         * 
         *     org.apache.hadoop
         *     hadoop-client
         *     3.1.3
         * 
         */

    }

}
3、从Socket读取数据 写法1
package com.lqs.five.part1_source;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author lqs
 * @Date 2022年03月28日 17:02:10
 * @Version 1.0.0
 * @ClassName Test03_SourceUnboundStream
 * @Describe 读取无界数据流
 */
public class Test03_SourceUnboundStream {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //TODO 2、读取无界数据流
        //使用的nacat进行数据模拟
        DataStreamSource<String> streamSource = env.socketTextStream("nwh120", 8888);

        //TODO 3、将数据按照空格切分,切出每个单词
        SingleOutputStreamOperator<String> wordDataStream = streamSource.flatMap(
                new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String s, Collector<String> out) throws Exception {
                        for (String s1 : s.split(" ")) {
                            out.collect(s1);
                        }
                    }
                }
        )
                //设置共享度
//                .slotSharingGroup("gourp1")
                //与前后都断开
//                .disableChaining()
                //与前面都断开任务链
//                .startNewChain()
                //为某个算子单独设置并行度
//                .setParallelism(5)
                ;

        //TODO 4、将每个单词组成Tuple2元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordToOneDataStream = wordDataStream.map(
                new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                }
        );

        //TODO 5、将相同的单词聚合到一块
        KeyedStream<Tuple2<String, Integer>, String> keyedDataStream = wordToOneDataStream.keyBy(
                new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }
        );

        //TODO 1、做累加计数
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDataStream.sum(1);

        result.print();

        env.execute("SourceUnboundStream");

    }

}
写法2
package com.lqs.five.part1_source;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author lqs
 * @Date 2022年05月08日 21:08:48
 * @Version 1.0.0
 * @ClassName Test06_SourceSocket
 * @Describe 从Socket读取数据,也是无界流数据的读取
 */
public class Test03_SourceSocket {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取流数据执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //TODO 2、读取无界流数据
        DataStreamSource<String> streamSource = env.socketTextStream("nwh120", 8888);

        //TODO 3、转换数据结构
        //TODO 写法1
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = streamSource.flatMap(
                new RichFlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
                        //按空格切分
                        String[] split = value.split(" ");
                        //循环将切分好的数据发送到下游(下一级)
                        for (String word : split) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }
        );

        //TODO 写法2
        /*SingleOutputStreamOperator lineToWord = streamSource.flatMap(
                new RichFlatMapFunction() {
                    @Override
                    public void flatMap(String value, Collector out) throws Exception {
                        String[] words = value.split(" ");
                        for (String word : words) {
                            out.collect(word);
                        }
                    }
                }
        );
        SingleOutputStreamOperator> wordAndOne = lineToWord.map(
                new MapFunction>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        return Tuple2.of(value, 1L);
                    }
                }
        );*/

        //TODO 写法3
        /*SingleOutputStreamOperator> wordAndOne = streamSource.flatMap(
                        (String line, Collector out) -> {
                            Stream words = Arrays.stream(line.split(" "));
//                    words.forEach(word -> out.collect(word));
                            words.forEach(out::collect);
                        }
                ).returns(Types.STRING)
                .map(value -> Tuple2.of(value, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));*/

        //TODO 4、分组
        KeyedStream<Tuple2<String, Long>, String> keyedStream = wordAndOne.keyBy(
                new KeySelector<Tuple2<String, Long>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
                        return value.f0;
                    }
                }
        );
        //简写
//        KeyedStream, String> keyedStream = wordAndOne.keyBy(value -> value.f0);

        //TODO 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = keyedStream.sum(1);

        //TODO 6、打印
        result.print();

        //TODO 7、执行
        env.execute("WordCount");

    }

}
4、从Kafka读取数据

添加依赖

<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-connector-kafka_2.12artifactId>
    <version>1.13.0version>
dependency>
新版写法
package com.lqs.five.part1_source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author lqs
 * @Date 2022年03月28日 17:30:05
 * @Version 1.0.0
 * @ClassName Test04_SourceKafka
 * @Describe 从Kafka读取数据流
 */
public class Test04_SourceKafka {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //TODO 2、从Kafka读取数据流
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("nwh120:9092")
                .setTopics("test_kafka_source")
                .setGroupId("lqs_test")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        streamSource.print();

        env.execute("SourceKafka");

    }

}
老版写法
package com.lqs.five.part1_source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * @Author lqs
 * @Date 2022年03月28日 17:30:05
 * @Version 1.0.0
 * @ClassName Test04_SourceKafka
 * @Describe 从Kafka读取数据流
 */
public class Test04_SourceKafkaOld {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //TODO 2、从Kafka读取数据流
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "nwh120:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "lqs_test");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        env.addSource(new FlinkKafkaConsumer<String>("test_kafka_source", new SimpleStringSchema(), properties)).print();

        env.execute("SourceKafkaOld");

    }

}
5、从自定义Source获取数据
package com.lqs.five.part1_source;

import com.lqs.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.Random;

/**
 * @Author lqs
 * @Date 2022年03月28日 09:38:19
 * @Version 1.0.0
 * @ClassName CustomizeSource
 * @Describe 自定义source
 */
public class Test05_CustomizeSource {

    public static void main(String[] args) throws Exception {

        //TODO 1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        //TODO 2、从自定义的数据源获取数据
        env.addSource(new MySource()).print();

        env.execute("CustomizeSource");

    }

    public static class MySource implements ParallelSourceFunction<WaterSensor>{

        private Boolean isRuning=true;

        /**
         * 自定义数据生成
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            while (isRuning){
                ctx.collect(
                        new WaterSensor(
                                "test_kafka_source"+new Random().nextInt(100),
                                System.currentTimeMillis(),
                                new Random().nextInt(1000)
                        )
                );
                Thread.sleep(1000);
            }
        }

        /**
         * 取消数据生成
         * 一般在run方法中会有个while循环,通过此方法终止while循环
         */
        @Override
        public void cancel() {
            isRuning=false;
        }
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/877671.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存