flink.apache.org
Flink是有状态的(sateful):Stateful Computations over Data Streams
起源欧洲,后被阿里收购,才在中国普及。在此之前都是用spark。
Flink也是做客户端,Flink on k8s、Yarn、Mesos,目前还是 Flink on Yarn,以后 on k8s.
on k8s.可以实现资源隔离,各个任务不用存在资源抢占。
可以接实时的数据,做流处理;也可以接DB,dfs的数据 ,做批处理。
Flink 更强于做流处理 【DataSet API (Legacy)】,Spark更强于做批处理
Flink 的特点批流一体 DataSet-批 、DataStream-流。
高吞吐、低延迟、高性能。
真正的流处理,支持基于Event-time的 *** 作。也支持window *** 作。
支持带状态的sateful的Exactly-Once(*****)。
位置: documentation/Application Development/DataStream API
DataStream 也是不可变的。
可以通过maven快速创建一个fink工程demo
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-java \
-DarchetypeVersion=1.14.4 \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/datastream_api.html
Example- 引入依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.12artifactId>
<version>${flink.version}version>
dependency>
- 批处理
// 隐式转换的包,不然会报错 could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object BatchWCApp {
def main(args: Array[String]): Unit = {
// 1. 获取批处理上下文对象 --> SparkContext
val env = ExecutionEnvironment.getExecutionEnvironment
// 2. 批处理 获取数据 readTextFile
val text: DataSet[String] = env.readTextFile("data/wc.txt") // 返回的就是 DataSet
// 需要导入隐式转换的包 import org.apache.flink.api.scala._
val result = text.flatMap(_.split(","))
.map((_, 1))
.groupBy(0) // 索引下标
.sum(1) // 索引下标
result.print()
}
}
- 流处理
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object StreamingWCApp {
def main(args: Array[String]): Unit = {
// 1. 获取流处理上下文对象
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 流处理 DataStream
val text: DataStream[String] = env.socketTextStream("gargantua", 9527) // 返回的就是 DataStream
text.flatMap(_.split(","))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(_._1)
.sum(1)
.print().setParallelism(1)
// 3. 流处理需要 手动 execute
env.execute(this.getClass.getSimpleName)
// 现在终端启动 [liqiang@Gargantua ~]$ nc -lk 9527
// 再启动main方法
// 在终端输入需要统计的单词(要回车)就能实时统计9527的数据
4>(pig,1)
4>(dog,2) // 4代表并行度
4>(dog,3)
}
}
- java 版本的批处理
public static void main(String[] args) throws Exception {
// java 版本对应的 gargantua.basic.BatchWCApp.scala
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = env.readTextFile("data/wc.txt");
// 匿名内部类
// test01(source);
// lambda + stream流
test02(source);
}
private static void test02(DataSource<String> source) throws Exception {
System.out.println("------------");
// 生成的 lambda 写法. 注意每一步都要先返回,再做下一步
FlatMapOperator<String, String> flatMapOperator = source.flatMap((FlatMapFunction<String, String>) (s, collector) -> {
String[] words = s.split(",");
for (String word : words) {
collector.collect(word);
}
}).returns(Types.STRING);
// stream流 写法:注意每一步都要先返回,再做下一步
/* FlatMapOperator flatMapOperator = source.flatMap((String line, Collector collector) -> {
Arrays.stream(line.split(",")).forEach(collector::collect);
}).returns(Types.STRING);*/
MapOperator<String, Tuple2<String, Integer>> mapOperator = flatMapOperator.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
mapOperator.groupBy(0).sum(1).print();
// mapOperator.groupBy(x -> x.f0).sum(1).print();
- java 版本的流处理
…
Anatomy of a Flink Program
获取执行环境 [val env = getExecutionEnvironment()、createLocalEnvironment()]
加载/创建初始数据 [val input = env.readTextFile("data/wc.txt")、env.socketTextStream("localhost", 9999)]
作用此数据的transformations算子 [input.map { x => x.toInt }...]
指定计算结果的存放位置 [writeAsText(path: String))、print()、writeToSocket()、addSink()]
触发程序执行(流处理) [env.execute(this.getClass.getSimpleName)]
获取执行环境
-
获取批处理执行环境 ,如同SparkConf、SparkContext
val env = ExecutionEnvironment.getExecutionEnvironment
-
获取流处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
-
获取环境时可以指定让这个任务展示到Web UI。
引入依赖org.apache.flink flink-runtime-web_${scala.binary.version}获取带Web UI 的 Environment
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
默认启动在8081端口。如果有打开的UI页面提前打开,或前一次没有关闭,但后台重启,会报错。需要把UI关掉再打开。
也可以自己设置端口号:val configuration = new Configuration() configuration.setInteger("rest.port",7777); val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
其实就用val env = StreamExecutionEnvironment.getExecutionEnvironment也会有Web UI界面的,不过端口不固定,需要通过启动日志查看。
基于文件(批处理):
readTextFile()
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter)
基于socket(流处理):
socketTextStream
基于scala/java集合
// 内置
fromCollection(Seq) // 单并行,
fromCollection(Iterator)
fromElements(elements: _*)
fromParallelCollection(SplittableIterator) // 并行,多个task
generateSequence(from, to)
// 自定义
addSource
Transformations
map、 filter 、keyby 、window...
数据输出
writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink
并行度
获取当前并行度:stream.parallelism
env.socketTextStream 、env.fromCollection(List()) 、fromElements 的时候,并行度是1, 意味着不能并行接收。
且不能自己强制设置并行度,因为源码设置为1,或只要实现SourceFuncation的也都是单并行。
env.readTextFile() 、env.fromParallelCollection()、env.generateSequence()是多并行度读取。
对带并行度的数据源ParallSourceFunction如果不指定并行度,就会使用当前机器的CPU线程数。会把资源占尽。
transformation : fliter、flatmap 等,如果不指定并行度,也是取决于CPU线程。
print():如果不指定并行度,也是取决于CPU线程。
(仅针对能设置并行度时)
env阶段: env.setParallelism(2)
source阶段 : env.addSource(...).setParallelism(2)
transformation阶段: 默认是用完CPU,在生产一般都要自己重新设置。
sink阶段:
自定义数据源 addSource
内置的数据源如fromCollection(),底层也是用的addSource()。
定义数据源 addSource(new xxxSourceFunction())对于xxxSourceFunction,有很多,都是常用Function的子接口和子类,如可以并行的ParalleSourceFunction、RichSourceFunction、RichParalleSourceFunction
继承自单并行度的自定义sourceFunction只能串行,继承自多并行度的自定义sourceFunction默认并行度为CPU线程数,也可以自己setParallelism。
自定义sourceFunction,需要实现run() 和cancel() 方法。源码有demo参考。
继承自RichFunction(增强)的自定义sourceFunction,除了run() 和cancel() ,还可以实现open()和close()方法。open是每一个并行度执行前都会调用的,如数据需要和mysql关联,可以在open中加载连接。
jdbc 读取数据源到source继承RichSourceFunction,重写以下方法:在open方法中加载驱动、获取jdbc连接、准备查询语句,在run 方法执行SQL语句,在close中关闭连接释放资源。
kafka 读取数据源到source导入依赖
org.apache.flink
flink-connector-kafka_2.12
${flink.version}
新API可以直接使用KafkaSource.builder() 出一个kafkaSource。是多并行度的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = KafkaSource.builder()
.setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
.setTopics("flinktopic")
.setGroupId("ruozedata_flink_topic_group") // pk-group
.setStartingOffsets(OffsetsInitializer.latest()) // earliest
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print()
Transformations
Map、FlatMap、Filter、KeyBy、Reduce、Window、WindowAll 、Window Apply、WindowReduce、Union、Window Join、Interval Join 、Window CoGroup、Connect、CoMap, CoFlatMap、Iterate
map 是将一个 DataStream转另一个 DataStream
.map(_*2)的底层 *** 作是
.transform("my map",new SteamMap(new MapFunction(
override def map(value:Int) = value * 2
))
union: 两个或多个数据源 合并成一个数据源。 union时是必须要相同的数据类型
自己合并自己会得到两倍…
connect:两个数据源,关联成一个,但是内部其实两个流还是独立,知识可以共用State状态信息,且两个流数据类型还可以不一样
connect vs union
1) 合并后:一个流 / 多个流
2) 数据类型:是否一定要相同的数据类型
3) 个数问题:connect的map *** 作,也是会有两个参数,分开 *** 作
需求:两个流分别做,stream1转大写,stream转Int
stream1.connect(stream2).map(new CoMapFunction[String,Int, String] {
override def map1(value: String): String = value.toUpperCase()
override def map2(value: Int): String = value * 10 + ""
})
分区器 Partitioner
自定义分区器MyPartitioner,继承Partitioner,重写partition方法,在其中指定分区规则。
通过partitionCustom指定使用分区
.partitionCustom(new MyPartitioner, x => x._1)
Sink
Flink03:https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/
自定义 Sink。继承RichSinkFunction,重写open方法和invoke方法,在invoke方法中指定具体逻辑输出。
输出到文件输出到文件系统,由于多并行,会产生很多小文件夹。流处理就不适合输出文件系统。
但是setParallelism(1)时可以输出到一个文件
需要引入依赖(和source是同一个)
org.apache.flink
flink-connector-kafka_${scala.binary.version}
官网提供API可以直接使用KafkaSource.builder() 出一个sink,使用sinkTo输出到kafka
val sink: KafkaSink[String] = KafkaSink.builder()
.setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flinktopic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
).build()
输出到 redis
需要引入依赖
org.apache.bahir
flink-connector-redis_2.11
1.0
API只提供了RedisSink,还需要自己实现RedisMapper。
数据到 jdbc参考官网。
输出到 socket
accessStream.writeToSocket("gargantua", 9526, new SimpleStringSchema())
输出到 nc - lk 9526 的窗口了
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)