Apache Flink快速入门-基本架构、核心概念和运行流程

Apache Flink快速入门-基本架构、核心概念和运行流程,第1张

Flink是一个基于流计算的分布式引擎,以前的名字叫stratosphere,从2010年开始在德国一所大学里发起,也是有好几年的 历史 了,2014年来借鉴了社区其它一些项目的理念,快速发展并且进入了Apache顶级孵化器,后来更名为Flink。

Flink在德语中是快速和灵敏的意思 ,用来体现流式数据处理速度快和灵活性强等特点。

Flink提供了同时支持高吞吐、低延迟和exactly-once 语义的实时计算能力,另外Flink 还提供了基于流式计算引擎处理批量数据的计算能力,真正意义上实现了流批统一。

Flink 独立于Apache Hadoop,且能在没有任何 Hadoop 依赖的情况下运行。

但是,Flink 可以很好的集成很多 Hadoop 组件,例如 HDFS、YARN 或 HBase。 当与这些组件一起运行时,Flink 可以从 HDFS 读取数据,或写入结果和检查点(checkpoint)/快照(snapshot)数据到 HDFS 。 Flink 还可以通过 YARN 轻松部署,并与 YARN 和 HDFS Kerberos 安全模块集成。

Flink具有先进的架构理念、诸多的优秀特性,以及完善的编程接口。

Flink的具体优势有如下几点:

(1)同时支持高吞吐、低延迟、高性能;

(2)支持事件时间(Event Time)概念;

事件时间的语义使流计算的结果更加精确,尤其在事件到达无序或者延迟的情况下,保持了事件原本产生时的时序性,尽可能避免网络传输或硬件系统的影响。

(3)支持有状态计算;

所谓状态就是在流计算过程中,将算子的中间结果数据保存在内存或者文件系统中,等下一个事件进入算子后,可以从之前的状态中获取中间结果,计算当前的结果,从而无需每次都基于全部的原始数据来统计结果。

(4)支持高度灵活的窗口(Window) *** 作;

(5)基于轻量级分布式快照(Snapshot)实现的容错;

(6)基于JVM实现独立的内存管理;

(7)Save Points(保存点);

保存点是手动触发的,触发时会将它写入状态后端(State Backends)。Savepoints的实现也是依赖Checkpoint的机制。Flink 程序在执行中会周期性的在worker 节点上进行快照并生成Checkpoint。因为任务恢复的时候只需要最后一个完成的Checkpoint的,所以旧有的Checkpoint会在新的Checkpoint完成时被丢弃。Savepoints和周期性的Checkpoint非常的类似,只是有两个重要的不同。一个是由用户触发,而且不会随着新的Checkpoint生成而被丢弃。

在Flink整个软件架构体系中,统一遵循了分层的架构设计理念,在降低系统耦合度的同时,为上层用户构建Flink应用提供了丰富且友好的接口。

整个Flink的架构体系可以分为三层:

Deployment层: 该层主要涉及了Flink的部署模式,Flink支持多种部署模式:本地、集群(Standalone/YARN),云(GCE/EC2),Kubernetes等。

Runtime层:Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。

API层: 主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。

Libraries层:该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的计算框架,也分别对应于面向流处理和面向批处理两类。

核心概念:Job Managers,Task Managers,Clients

Flink也是典型的master-slave分布式架构。Flink的运行时,由两种类型的进程组成:

Client: Client不是运行时和程序执行的一部分,它是用来准备和提交数据流到JobManagers。之后,可以断开连接或者保持连接以获取任务的状态信息。

当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager, JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。 TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。

每个Worker(Task Manager)是一个JVM进程,通常会在单独的线程里执行一个或者多个子任务。为了控制一个Worker能够接受多少个任务,会在Worker上抽象多个Task Slot (至少一个)。

只有一个slot的TaskManager意味着每个任务组运行在一个单独JVM中。 在拥有多个slot的TaskManager上,subtask共用JVM,可以共用TCP连接和心跳消息,同时可以共用一些数据集和数据结构,从而减小任务的开销。

Flink的任务运行其实是多线程的方式,这和MapReduce多JVM进程的方式有很大的区别,Flink能够极大提高CPU使用效率,在多个任务之间通过TaskSlot方式共享系统资源,每个TaskManager中通过管理多个TaskSlot资源池对资源进行有效管理。

思考问题:

1怎么样实现并行计算?

答:设置并行度。多线程,不同任务放到不同线程上。

2并行的任务,需要占用多少slot?

3一个流处理程序,到底包含多少个任务?

一、TaskManager和Slot的关系介绍

process:进程

Treads:线程

二、并行度(parallelism)

每一个线程占用一个slot,上图一中任务合并为上图二所示(任务链,后续讲解),图中算子并行度最大的(算子后面的中括号数字代表并行度)为2,所以整个flink程序的并行度为2,所以只需要2个slot就可以跑起来。

 One-to-one:

stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖

 Redistributing:

stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖

图中:A4 代表 A任务有4个,C2表示C任务2个,以此类推

taskmanagernumberOfTaskSlots:3 每个taskmanager设置了并行度为3

设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以接收3个task,一共9个TaskSlot,如果我们设置parallelismdefault=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。

三、思考

假设当前可用的slot只有1个,任务有4个,slot不够用的时候,则会一直等待分配资源,直到超时报错。

slot推荐设置为当前机器的核心数,假设cpu核心数为4核,则设置4。

slot占用数量与并行度最大的算子一致。

Flink从 1120 上对 流式API 新增一项特性:可以根据你的使用情况和Job的特点, 可以选择不同的运行时执行模式(runtime execution modes)

流式API的传统执行模式我们称之为 STREAMING 执行模式, 这种模式一般用于无界流, 需要持续的在线处理

1120新增了一个 BATCH 执行模式, 这种执行模式在执行方式上类似于MapReduce框架 这种执行模式一般用于有界数据

默认是使用的 STREAMING 执行模式

BATCH 执行模式 仅仅用于有界数据 , 而 STREAMING 执行模式可以用在 有界数据和无界数据

一个公用的规则就是: 当你处理的数据是 有界 的就应该 使用BATCH执行模式 , 因为它更加高效 当你的数据是 无界 的, 则必须 使用STREAMING 执行模式 , 因为只有这种模式才能处理持续的数据流

执行模式有3个选择可配:

批处理与流处理的区别:

有界与无界的理解:

程序比较简单,就没加注释

结果

结果

执行模式所支持的模式:

转换成批处理

结果

注意:

Flink 是一个流处理框架,支持流处理和批处理,特点是流处理有限,可容错,可扩展,高吞吐,低延迟。

流处理是处理一条,立马下一个节点会从缓存中取出,在下一个节点进行计算

批处理是只有处理一批完成后,才会经过网络传输到下一个节点

流处理的优点是低延迟 批处理的优点是高吞吐

flink同时支持两种,flink的网络传输是设计固定的缓存块为单位,用户可以设置缓存块的超时值来决定换存块什么时候进行传输。 数据大于0 进行处理就是流式处理。

如果设置为无限大就是批处理模型。

Flink 集群包括 JobManager 和 TaskManager

JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包 等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行。

TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要 部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

flink on yarn 是由client 提交 app到 RM 上, 然后RM 分配一个 AppMaster负责运行 Flink JobManager 和 Yarn AppMaster, 然后 AppMaster 分配 容器去运行 Flink TaskManger

SparkStreaming 是将流处理分成微批处理的作业, 最后的处理引擎是spark job

Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块,Spark Streaming会把每块数据作为一个RDD,并使用RDD *** 作处理每一小块数据。每个块都会生成一个Spark Job处理,然后分批次提交job到集群中去运行,运行每个 job的过程和真正的spark 任务没有任何区别。

JobScheduler, 负责 Job的调度通过定时器每隔一段时间根据Dstream的依赖关系生一个一个DAG图

ReceiverTracker负责数据的接收,管理和分配

ReceiverTracker在启动Receiver的时候他有ReceiverSupervisor,其实现是ReceiverSupervisorImpl, ReceiverSupervisor本身启 动的时候会启动Receiver,Receiver不断的接收数据,通过BlockGenerator将数据转换成Block。定时器会不断的把Block数据通会不断的把Block数据通过BlockManager或者WAL进行存储,数据存储之后ReceiverSupervisorlmpl会把存储后的数据的元数据Metadate汇报给ReceiverTracker,其实是汇报给ReceiverTracker中的RPC实体ReceiverTrackerEndpoin

spark on yarn 的cluster模式, Spark client 向RM提交job请求, RM会分配一个 AppMaster, driver 和 运行在AppMAster节点里, AM然后把Receiver作为一个Task提交给Spark Executor 节点, Receive启动接受数据,生成数据块,并通知Spark Appmaster, AM会根据数据块生成相应的Job, 并把Job 提交给空闲的 Executor 去执行。

1:需要关注流数据是否需要进行状态管理

2:At-least-once或者Exectly-once消息投递模式是否有特殊要求

3:对于小型独立的项目,并且需要低延迟的场景,建议使用storm

4:如果你的项目已经使用了spark,并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming

5:要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink

Flink 提供的Api右 DataStream 和 DataSet ,他们都是不可变的数据集合,不可以增加删除中的元素, 通过 Source 创建 DataStream 和 DataSet

在创建运行时有:

Flink的每一个Operator称为一个任务, Operator 的每一个实例称为子任务,每一个任务在JVM线程中执行。可以将多个子任务链接成一个任务,减少上下文切换的开销,降低延迟。

source 和 算子map 如果是 one by one 的关系,他们的数据交换可以通过缓存而不是网络通信

TaskManager 为控制执行任务的数量,将计算资源划分多个slot,每个slot独享计算资源,这种静态分配利于任务资源隔离。

同一个任务可以共享一个slot, 不同作业不可以。

这里因为 Source 和 Map 并行度都是4 采用直连方式,他们的数据通信采用缓存形式

所以一共需要两个TaskManager source,Map 一个,reduce一个, 每个TaskManager 要3个slot

JobManager 将 JobGraph 部署 ExecutionGraph

设置的并行度,可以让一个ExecJobVertex 对应 多个并行的ExecVertex 实例。

Flink通过状态机管理 ExecGraph的作业执行进度。

Flink 将对象序列化为固定数量的预先分配的内存段,而不是直接把对象放在堆内存上。

Flink TaskManager 是由几个内部组件组成的:actor 系统(负责与 Flink master 协调)、IOManager(负责将数据溢出到磁盘并将其读取回来)、MemoryManager(负责协调内存使用。

数据源:

Sink:

时间:

处理时间:取自Operator的机器系统时间

事件时间: 由数据源产生

进入时间: 被Source节点观察时的系统时间

如果数据源没有自己正确创建水印,程序必须自己生成水印来确保基于事件的时间窗口可以正常工作。。

DataStream 提供了 周期性水印,间歇式水印,和递增式水印

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。

现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们所提供的SLA(Service-Level-Aggreement)是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理。

Flink从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的; 批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。

Flink流处理特性:

Flink以层级式系统形式组件其软件栈,不同层的栈建立在其下层基础上,并且各层接受程序不同层的抽象形式。

1 流、转换、 *** 作符

Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个 *** 作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。

Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

2 并行数据流

一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask,每一个Operator Subtask是在不同的线程中独立执行的。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度总是等于生成它的Operator的并行度。

One-to-one模式

比如从Source[1]到map()[1],它保持了Source的分区特性(Partitioning)和分区内元素处理的有序性,也就是说map()[1]的Subtask看到数据流中记录的顺序,与Source[1]中看到的记录顺序是一致的。

Redistribution模式

这种模式改变了输入数据流的分区,比如从map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多个不同的Subtask发送数据,改变了数据流的分区,这与实际应用所选择的Operator有关系。

3任务、 *** 作符链

Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。

4 时间

处理Stream中的记录时,记录中通常会包含各种典型的时间字段:

Event Time:表示事件创建时间

Ingestion Time:表示事件进入到Flink Dataflow的时间

Processing Time:表示某个Operator对事件进行处理的本地系统时间

Flink使用WaterMark衡量时间的时间,WaterMark携带时间戳t,并被插入到stream中。

5 窗口

Flink支持基于时间窗口 *** 作,也支持基于数据的窗口 *** 作:

窗口分类:

Tumbling/Sliding Time Window

// Stream of (sensorId, carCnt)

val vehicleCnts: DataStream[(Int, Int)] =

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts

// key stream by sensorId

keyBy(0)

// tumbling time window of 1 minute length

timeWindow(Timeminutes(1))

// compute sum over carCnt

sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts

keyBy(0)

// sliding time window of 1 minute length and 30 secs trigger interval

timeWindow(Timeminutes(1), Timeseconds(30))

sum(1)

Tumbling/Sliding Count Window

// Stream of (sensorId, carCnt)

val vehicleCnts: DataStream[(Int, Int)] =

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts

// key stream by sensorId

keyBy(0)

// tumbling count window of 100 elements size

countWindow(100)

// compute the carCnt sum

sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts

keyBy(0)

// sliding count window of 100 elements size and 10 elements trigger interval

countWindow(100, 10)

sum(1)

自定义窗口

基本 *** 作:

6 容错

Barrier机制:

对齐:

当Operator接收到多个输入的数据流时,需要在Snapshot Barrier中对数据流进行排列对齐:

基于Stream Aligning *** 作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。

CheckPoint:

Snapshot并不仅仅是对数据流做了一个状态的Checkpoint,它也包含了一个Operator内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。状态包含两种:

7 调度

在JobManager端,会接收到Client提交的JobGraph形式的Flink Job,JobManager会将一个JobGraph转换映射为一个ExecutionGraph,ExecutionGraph是JobGraph的并行表示,也就是实际JobManager调度一个Job在TaskManager上运行的逻辑视图。

物理上进行调度,基于资源的分配与使用的一个例子:

8 迭代

机器学习和图计算应用,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括Iterate和Delta Iterate两种类型。

Iterate

Iterate Operator是一种简单的迭代形式:每一轮迭代,Step函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果。

流程伪代码:

IterationState state = getInitialState();

while (!terminationCriterion()) {

state = step(state);

}

setFinalState(state);

Delta Iterate

Delta Iterate Operator实现了增量迭代。

流程伪代码:

IterationState workset = getInitialState();

IterationState solution = getInitialSolution();

while (!terminationCriterion()) {

(delta, workset) = step(workset, solution);

solutionupdate(delta)

}

setFinalState(solution);

最小值传播:

9 Back Pressure监控

流处理系统中,当下游Operator处理速度跟不上的情况,如果下游Operator能够将自己处理状态传播给上游Operator,使得上游Operator处理速度慢下来就会缓解上述问题,比如通过告警的方式通知现有流处理系统存在的问题。

Flink Web界面上提供了对运行Job的Backpressure行为的监控,它通过使用Sampling线程对正在运行的Task进行堆栈跟踪采样来实现。

默认情况下,JobManager会每间隔50ms触发对一个Job的每个Task依次进行100次堆栈跟踪调用,过计算得到一个比值,例如,radio=001,表示100次中仅有1次方法调用阻塞。Flink目前定义了如下Backpressure状态:

OK: 0 <= Ratio <= 010

LOW: 010 < Ratio <= 05

HIGH: 05 < Ratio <= 1

1 Table

Flink的Table API实现了使用类SQL进行流和批处理。

详情参考:>

以上就是关于Apache Flink快速入门-基本架构、核心概念和运行流程全部的内容,包括:Apache Flink快速入门-基本架构、核心概念和运行流程、10-flink TaskManager 和 Slots、Flink(1.13) 执行模式(Execution Mode)等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/10119615.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存