Flink 【一】

Flink 【一】,第1张

flink.apache.org
Flink是有状态的(sateful):Stateful Computations over Data Streams

起源欧洲,后被阿里收购,才在中国普及。在此之前都是用spark。

Flink也是做客户端,Flink on k8s、Yarn、Mesos,目前还是 Flink on Yarn,以后 on k8s.
on k8s.可以实现资源隔离,各个任务不用存在资源抢占。

可以接实时的数据,做流处理;也可以接DB,dfs的数据 ,做批处理。

Flink 更强于做流处理 【DataSet API (Legacy)】,Spark更强于做批处理

Flink 的特点

批流一体 DataSet-批 、DataStream-流。
高吞吐、低延迟、高性能。
真正的流处理,支持基于Event-time的 *** 作。也支持window *** 作。
支持带状态的sateful的Exactly-Once(*****)。

DataStream

位置: documentation/Application Development/DataStream API
DataStream 也是不可变的。

简单上手

可以通过maven快速创建一个fink工程demo

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.14.4 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/datastream_api.html

Example
  1. 引入依赖
 <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-streaming-scala_2.12artifactId>
     <version>${flink.version}version>
 dependency>
  
 <dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-clients_2.12artifactId> 
    <version>${flink.version}version>
 dependency>
  1. 批处理
// 隐式转换的包,不然会报错 could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object BatchWCApp {

  def main(args: Array[String]): Unit = {

    // 1. 获取批处理上下文对象  --> SparkContext
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 2. 批处理 获取数据 readTextFile
    val text: DataSet[String] = env.readTextFile("data/wc.txt") // 返回的就是 DataSet

    // 需要导入隐式转换的包  import org.apache.flink.api.scala._
    val result = text.flatMap(_.split(","))
      .map((_, 1))
      .groupBy(0)  // 索引下标
      .sum(1)  // 索引下标
      
    result.print()
  }
}
  1. 流处理
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object StreamingWCApp {

  def main(args: Array[String]): Unit = {
    // 1. 获取流处理上下文对象
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. 流处理 DataStream
    val text: DataStream[String] = env.socketTextStream("gargantua", 9527) // 返回的就是 DataStream
    
    text.flatMap(_.split(","))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .print().setParallelism(1)

    // 3. 流处理需要 手动 execute
    env.execute(this.getClass.getSimpleName)

    // 现在终端启动 [liqiang@Gargantua ~]$ nc -lk 9527
    // 再启动main方法
    // 在终端输入需要统计的单词(要回车)就能实时统计9527的数据
	
	4>(pig,1)
    4>(dog,2)  // 4代表并行度
    4>(dog,3) 
  }
}
  1. java 版本的批处理
public static void main(String[] args) throws Exception {
    // java 版本对应的 gargantua.basic.BatchWCApp.scala
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSource<String> source = env.readTextFile("data/wc.txt");
    // 匿名内部类
    // test01(source);
    // lambda + stream流
    test02(source);
}
private static void test02(DataSource<String> source) throws Exception {
    System.out.println("------------");
    // 生成的 lambda 写法. 注意每一步都要先返回,再做下一步
    FlatMapOperator<String, String> flatMapOperator = source.flatMap((FlatMapFunction<String, String>) (s, collector) -> {
        String[] words = s.split(",");
        for (String word : words) {
            collector.collect(word);
        }
    }).returns(Types.STRING);
    
    //  stream流 写法:注意每一步都要先返回,再做下一步
   /* FlatMapOperator flatMapOperator = source.flatMap((String line, Collector collector) -> {
        Arrays.stream(line.split(",")).forEach(collector::collect);
    }).returns(Types.STRING);*/
    
    MapOperator<String, Tuple2<String, Integer>> mapOperator = flatMapOperator.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
    mapOperator.groupBy(0).sum(1).print();
    // mapOperator.groupBy(x -> x.f0).sum(1).print();

  1. java 版本的流处理
Flink 的编程模型

Anatomy of a Flink Program

获取执行环境 [val env = getExecutionEnvironment()、createLocalEnvironment()]
加载/创建初始数据  [val input = env.readTextFile("data/wc.txt")、env.socketTextStream("localhost", 9999)]
作用此数据的transformations算子 [input.map { x => x.toInt }...]
指定计算结果的存放位置 [writeAsText(path: String))、print()、writeToSocket()、addSink()]
触发程序执行(流处理)  [env.execute(this.getClass.getSimpleName)]
获取执行环境
  1. 获取批处理执行环境 ,如同SparkConf、SparkContext

    val env = ExecutionEnvironment.getExecutionEnvironment
    
  2. 获取流处理执行环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
  3. 获取环境时可以指定让这个任务展示到Web UI。
    引入依赖

     
         org.apache.flink
         flink-runtime-web_${scala.binary.version}
     
    

    获取带Web UI 的 Environment

    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    

    默认启动在8081端口。如果有打开的UI页面提前打开,或前一次没有关闭,但后台重启,会报错。需要把UI关掉再打开。
    也可以自己设置端口号:

    val configuration = new Configuration()
    configuration.setInteger("rest.port",7777);
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
    

    其实就用val env = StreamExecutionEnvironment.getExecutionEnvironment也会有Web UI界面的,不过端口不固定,需要通过启动日志查看。

数据源

基于文件(批处理):

readTextFile()
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter) 

基于socket(流处理):

socketTextStream 

基于scala/java集合

// 内置
fromCollection(Seq)  // 单并行,
fromCollection(Iterator)
fromElements(elements: _*)
fromParallelCollection(SplittableIterator)  // 并行,多个task
generateSequence(from, to)
// 自定义
addSource
Transformations
map、 filter 、keyby 、window...
数据输出
writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr() 
writeUsingOutputFormat() / FileOutputFormat
writeToSocket 
addSink
并行度 获取当前并行度:stream.parallelism

env.socketTextStream 、env.fromCollection(List()) 、fromElements 的时候,并行度是1, 意味着不能并行接收。
且不能自己强制设置并行度,因为源码设置为1,或只要实现SourceFuncation的也都是单并行。

env.readTextFile() 、env.fromParallelCollection()、env.generateSequence()是多并行度读取。

对带并行度的数据源ParallSourceFunction如果不指定并行度,就会使用当前机器的CPU线程数。会把资源占尽。
transformation : fliter、flatmap 等,如果不指定并行度,也是取决于CPU线程。
print():如果不指定并行度,也是取决于CPU线程。

设置并行度

(仅针对能设置并行度时)

env阶段: env.setParallelism(2)  
source阶段 :  env.addSource(...).setParallelism(2)
transformation阶段: 默认是用完CPU,在生产一般都要自己重新设置。
sink阶段:
自定义数据源 addSource

内置的数据源如fromCollection(),底层也是用的addSource()。

定义数据源 addSource(new xxxSourceFunction())

对于xxxSourceFunction,有很多,都是常用Function的子接口和子类,如可以并行的ParalleSourceFunction、RichSourceFunction、RichParalleSourceFunction

继承自单并行度的自定义sourceFunction只能串行,继承自多并行度的自定义sourceFunction默认并行度为CPU线程数,也可以自己setParallelism。

自定义sourceFunction,需要实现run() 和cancel() 方法。源码有demo参考。

继承自RichFunction(增强)的自定义sourceFunction,除了run() 和cancel() ,还可以实现open()和close()方法。open是每一个并行度执行前都会调用的,如数据需要和mysql关联,可以在open中加载连接。

jdbc 读取数据源到source

继承RichSourceFunction,重写以下方法:在open方法中加载驱动、获取jdbc连接、准备查询语句,在run 方法执行SQL语句,在close中关闭连接释放资源。

kafka 读取数据源到source

导入依赖

 
     org.apache.flink
     flink-connector-kafka_2.12
     ${flink.version}
 

新API可以直接使用KafkaSource.builder() 出一个kafkaSource。是多并行度的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = KafkaSource.builder()
	.setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
	 .setTopics("flinktopic")
	 .setGroupId("ruozedata_flink_topic_group")       // pk-group
	 .setStartingOffsets(OffsetsInitializer.latest()) // earliest
	 .setValueOnlyDeserializer(new SimpleStringSchema())
	 .build()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print()
Transformations

Map、FlatMap、Filter、KeyBy、Reduce、Window、WindowAll 、Window Apply、WindowReduce、Union、Window Join、Interval Join 、Window CoGroup、Connect、CoMap, CoFlatMap、Iterate

map 是将一个 DataStream转另一个 DataStream
.map(_*2)的底层 *** 作是

.transform("my map",new SteamMap(new MapFunction(
	override def map(value:Int) = value * 2
))

union: 两个或多个数据源 合并成一个数据源。 union时是必须要相同的数据类型
自己合并自己会得到两倍…

connect:两个数据源,关联成一个,但是内部其实两个流还是独立,知识可以共用State状态信息,且两个流数据类型还可以不一样

connect vs union
1) 合并后:一个流 / 多个流
2) 数据类型:是否一定要相同的数据类型
3) 个数问题:connect的map *** 作,也是会有两个参数,分开 *** 作

需求:两个流分别做,stream1转大写,stream转Int

stream1.connect(stream2).map(new CoMapFunction[String,Int, String] {
    override def map1(value: String): String = value.toUpperCase()
    override def map2(value: Int): String = value * 10 + ""
})
分区器 Partitioner

自定义分区器MyPartitioner,继承Partitioner,重写partition方法,在其中指定分区规则。

通过partitionCustom指定使用分区

.partitionCustom(new MyPartitioner, x => x._1)
Sink

Flink03:https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/

自定义 Sink。继承RichSinkFunction,重写open方法和invoke方法,在invoke方法中指定具体逻辑输出。

输出到文件

输出到文件系统,由于多并行,会产生很多小文件夹。流处理就不适合输出文件系统。
但是setParallelism(1)时可以输出到一个文件

输出到 kafka

需要引入依赖(和source是同一个)


    org.apache.flink
    flink-connector-kafka_${scala.binary.version}

官网提供API可以直接使用KafkaSource.builder() 出一个sink,使用sinkTo输出到kafka

val sink: KafkaSink[String] = KafkaSink.builder()
  .setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
  .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    .setTopic("flinktopic")
    .setValueSerializationSchema(new SimpleStringSchema())
    .build()
  ).build()
输出到 redis

需要引入依赖

 
     org.apache.bahir
     flink-connector-redis_2.11
     1.0
 

API只提供了RedisSink,还需要自己实现RedisMapper。

数据到 jdbc

参考官网。

输出到 socket
 accessStream.writeToSocket("gargantua", 9526, new SimpleStringSchema())

输出到 nc - lk 9526 的窗口了

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/788207.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存