8.1.1 Flink快速应用【批量处理数据、实时流处理数据】

8.1.1 Flink快速应用【批量处理数据、实时流处理数据】,第1张

8.1.1 Flink快速应用【批量处理数据、实时流处理数据】 Flink快速应用

文章目录
  • Flink快速应用
  • 第 1 节 单词统计案例(批数据)
    • 1.1 需求
    • 1.2 代码实现
      • Java程序
      • scala程序
  • 第 2 节 单词统计案例(流数据)
    • 2.1 需求
    • 2.2 代码实现
      • scala程序
      • java程序


通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)

第 1 节 单词统计案例(批数据) 1.1 需求

统计一个文件中各个单词出现的次数,把统计结果输出到文件
步骤:
1、读取数据源
2、处理数据源
a、将读到的数据源文件中的每一行根据空格切分
b、将切分好的每个单词拼接1 c、根据单词聚合(将相同的单词放在一起)
d、累加相同的单词(单词后面的1进行累加)
3、保存处理结果

1.2 代码实现

引入依赖pom.xml


        
        
            org.apache.flink
            flink-java
            1.11.1
        
        
        
            org.apache.flink
            flink-streaming-java_2.12
            1.11.1
        
        
        
            org.apache.flink
            flink-clients_2.12
            1.11.1
        

        
        
            org.apache.flink
            flink-scala_2.12
            1.11.1
        

        
        
            org.apache.flink
            flink-streaming-scala_2.12
            1.11.1
            
        

Java程序
package com.lagou;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCountJavaBatch {
    public static void main(String[] args) throws Exception {
        String inputPath="D:\data\input\hello.txt";
        String outputPath="D:\data\output";

        //获取flink的运行环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSet text = executionEnvironment.readTextFile(inputPath);
        FlatMapOperator> wordondOnes = text.flatMap(new SplitClz());

        //(hello 1) (you 1) (hi 1) (him 1)
        UnsortedGrouping> groupedWordAndOne = wordOndOnes.groupBy(0);
        //(hello 1) (hello 1)
        AggregateOperator> out = groupedWordAndOne.sum(1);//1代表第1个元素

        out.writeAsCsv(outputPath, "n", " ").setParallelism(1);//设置并行度
        executionEnvironment.execute();//人为调用执行方法

    }

    static class SplitClz implements FlatMapFunction>{

        @Override
        public void flatMap(String s, Collector> collector) throws Exception {
            String[] s1 = s.split(" ");
            for (String word:s1) {
                collector.collect(new Tuple2(word,1));//发送到下游

            }

        }
    }
}

原文件
输出文件

scala程序
import org.apache.flink.api.scala._

object WordCountScalaBatch {
  def main(args: Array[String]): Unit = {
    val inputPath = "D:\data\input\hello.txt"
    val outputPath = "D:\data\output"
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    val text: DataSet[String] = environment.readTextFile(inputPath)
    val out: AggregateDataSet[(String, Int)] = text.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
    out.writeAsCsv(outputPath,"n", " ").setParallelism(1)
    environment.execute("scala batch process")
  }
}

第 2 节 单词统计案例(流数据) 2.1 需求

Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。

2.2 代码实现 scala程序
import org.apache.flink.streaming.api.scala._

object WordCountScalaStream {
  def main(args: Array[String]): Unit = {
    //处理流式数据
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment  //拿到运行环境
    val streamData: DataStream[String] = environment.socketTextStream("hadoop102", 7777)
    val out = streamData.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
    out.print()
    environment.execute()
  }

}

java程序
package com.lagou;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountJavaStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource dataStream = executionEnvironment.socketTextStream("linux121", 7777);
        SingleOutputStreamOperator> sum = dataStream.flatMap(new FlatMapFunction>() {
            public void flatMap(String s, Collector> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(new Tuple2(word, 1));
                }
            }
        }).keyBy(0).sum(1);
        sum.print();
        executionEnvironment.execute();
    }
}

运行之前需要勾选

#控制台输入,7777表示端口
nc -lp 7777
#然后输入单词,查看统计

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5661411.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存