IDEA本地运行Flink-java版

IDEA本地运行Flink-java版,第1张

1、背景

flink作为当前最火实时大数据框架,也想阅读其源码,并实战一下它

2、具体步骤 2.1 环境准备

jdk1.8+ 即可 ,因为flink 大部分是Java编写的

2.2 创建idea项目

和普通的创建maven java项目一样,没有区别

2.3 pom.xml配置

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0modelVersion>

    <groupId>org.examplegroupId>
    <artifactId>FirstFlinkartifactId>
    <version>1.0-SNAPSHOTversion>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-compiler-pluginartifactId>
                <configuration>
                    <source>8source>
                    <target>8target>
                configuration>
            plugin>
        plugins>
    build>

    <dependencies>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-javaartifactId>
            <version>1.11.1version>
        dependency>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_2.12artifactId>
            <version>1.11.1version>
            <scope>providedscope>
        dependency>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-clients_2.12artifactId>
            <version>1.11.1version>
        dependency>
        
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-scala_2.12artifactId>
            <version>1.11.1version>
        dependency>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-scala_2.12artifactId>
            <version>1.11.1version>
            <scope>providedscope>
        dependency>
    dependencies>

project>
2.4 批程序示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @Author CaiWencheng
 * @Date 2022-05-04 22:52
 */
public class WordCountBatch {
    public static void main(String[] args) throws Exception {
        // 第一个参数为输入路径,第二个参数为输出路径
        String inPath = "E:\IdeaProjects\FirstFlink\data\input\hello.txt";
        String outPath = "E:\IdeaProjects\FirstFlink\data\output\output.txt";
        // 获取Flink批处理执行环境
        ExecutionEnvironment executionEnvironment =
                ExecutionEnvironment.getExecutionEnvironment();
        // 获取文件中内容
        DataSet<String> text = executionEnvironment.readTextFile(inPath);
        // 对数据进行处理
        DataSet<Tuple2<String, Integer>> dataSet = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
        dataSet.writeAsCsv(outPath,"\n"," ").setParallelism(1);
        // 触发执行程序
        executionEnvironment.execute("wordcount batch process");
    }
    static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>>
                collector) throws Exception {
            for (String word:line.split(" ")) {
                collector.collect(new Tuple2<String, Integer>(word,1));
            }
        }
    }
}

hello.txt

hello flink
hello zk
hello spark

输出结果:output.txt

zk 1
flink 1
hello 3
spark 1

2.5 流程序示例
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author CaiWencheng
 * @Date 2022-05-04 23:11
 */
public class WordCountStream {
    public static void main(String[] args) throws Exception {
        // 获取Flink流执行环境
        StreamExecutionEnvironment streamExecutionEnvironment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取socket输入数据
        DataStreamSource<String> textStream =
                streamExecutionEnvironment.socketTextStream("hadoop2", 7777, "\n");
        SingleOutputStreamOperator<Tuple2<String, Long>>
                sum = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Long>>
                    collector) throws Exception {
                String[] splits = s.split("\s");
                for (String word : splits) {
                    collector.collect(Tuple2.of(word, 1L));
                }
            }
        }).keyBy(0).sum(1);
        // 打印数据
        sum.print();
        // 触发任务执行
        streamExecutionEnvironment.execute("wordcount stream process");
    }
}

1)在hadoop2机器上,先执行

# 启动7777端口
nc -lp 7777

2)启动 WordCountStream 的main函数
3)在hadoop2 nc命令后,接着输入

hello flink hello flink hello hello
hello spark stream flink hello

4)idea控制台输出

5> (hello,1)
13> (flink,1)
5> (hello,2)
5> (hello,3)
13> (flink,2)
5> (hello,4)
13> (flink,3)
5> (hello,5)
1> (spark,1)
5> (hello,6)
16> (stream,1)

注意事项:第1次执行流程序会报错,勾选如下配置即可

3、总结改进
  1. idea 创建flink java 批/流处理程序,只需要添加对应的依赖即可,flink-java / flink-streaming-java_2.12 ,此外还需要额外加入flink-clients_2.12
  2. 创建scala版程序和Java版本类似

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/869874.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存