Flink运行模式

Flink运行模式,第1张

    在idea中运行Flink程序的方式就是开发模式。

    Flink中的Local-cluster(本地集群)模式,单节点运行,主要用于测试, 学习。

        独立集群模式,由Flink自身提供计算资源。

把Flink应用提交给Yarn的ResourceManager

Flink会根据运行在JobManger上的job的需要的slot的数量动态的分配TaskManager资源

Yarn又分3种模式
Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。

这个Flink集群会常驻在yarn集群中,除非手工停止。

在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交

缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job

所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job
一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

Per-job模式执行结果,一个job对应一个Application
Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行 只要应用程序执行结束, Flink集群会马上被关闭 也可以手动停止集群

与Per-Job-Cluster的区别:就是Application Mode下, 用户的main函数式在集群中执行的,并且当一个application中有多个job的话,per-job模式则是一个job对应一个yarn中的application,而Application Mode则这个application中对应多个job。

Application Mode模式执行结果,多个job对应一个Application
官方建议:

出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!

>反压(backpressure)是流式计算中十分常见的问题。 反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速 。由于实时计算应用通常使用消息队列来进行生产端和消费端的解耦,消费端数据源是 pull-based 的,所以 反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。

反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟 。通常来说,对于一些对延迟要求不高或者数据量较少的应用,反压的影响可能并不明显。然而对于规模比较大的 Flink 作业,反压可能会导致严重的问题。

网络流控的实现: 动态反馈/自动反压

Flink 的数据交换有3种:① 同一个 Task 的数据交换 ,② 不同 Task 同 JVM 下的数据交换 ,③ 不同 Task 且不同 TaskManager 之间的交换 。

通过 算子链 operator chain 串联多个算子 ,主要作用是避免了 序列化 和 网络通信 的开销。

在 TaskA 中,算子输出的数据首先通过 record Writer 进行序列化,然后传递给 result Partition 。接着,数据通过 local channel 传递给 TaskB 的 Input Gate,然后传递给 record reader 进行反序列。

与上述(2)的不同点是数据先传递给 netty ,通过 netty 把数据推送到远程端的 Task 。

15 版本之前是采用 TCP 流控机制,而没有采用feedback机制 。

发送端 Flink 有一层Network Buffer,底层用Netty通信即有一层Channel Buffer,最后Socket通信也有Buffer,同理接收端也有对应的3级 Buffer。Flink (before V15)实质是利用 TCP 的流控机制来实现 feedback  。

TCP报文段首部有16位窗口字段,当接收方收到发送方的数据后,ACK响应报文中就将自身缓冲区的剩余大小设置到放入16位窗口字段 。该窗口字段值是随网络传输的情况变化的,窗口越大,网络吞吐量越高。

参考:1 计算机网络31 运输层 - TCP/UDP协议

           2 Apache Flink 进阶教程(七):网络流控及反压剖析

例子:TCP 利用滑动窗口限制流量

步骤1 :发送端将 4,5,6 发送,接收端也能接收全部数据。

步骤2 :consumer 消费了 2 ,接收端的窗口会向前滑动一格,即窗口空余1格。接着向发送端发送 ACK = 7、window = 1 。

步骤3:发送端将 7 发送后,接收端接收到 7 ,但是接收端的 consumer 故障不能消费数据。这时候接收端向发送端发送 ACK = 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。

在 Flink 层面实现反压机制,通过 ResultPartition 和 InputGate 传输 feedback 。

  Storm 在每一个 Bolt 都会有一个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队列(recv queue)出现了严重阻塞就会把这个情况写到 ZooKeeper 里,ZooKeeper 会一直被 Spout 监听,监听到有反压的情况就会停止发送 。因此,通过这样的方式匹配上下游的发送接收速率。

组件 RateController 监听负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息。RateEstimator 依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream 将 rate 转发给 Executor 的 BlockGenerator,并更新RateLimiter 。

Flink、Storm、Spark Streaming 的反压机制都采用动态反馈/自动反压原理,可以动态反映节点限流情况,进而实现自动的动态反压。

Flink Web UI 的反压监控提供了 Subtask 级别 的反压监控。监控的原理是 通过ThreadgetStackTrace() 采集在 TaskManager 上正在运行的所有线程,收集在缓冲区请求中阻塞的线程数(意味着下游阻塞),并计算缓冲区阻塞线程数与总线程数的比值 rate 。其中,rate < 01 为 OK,01 <= rate <= 05 为 LOW,rate > 05 为 HIGH。

Network  和  task I/O  metrics 是轻量级反压监视器,用于正在持续运行的作业,其中一下几个 metrics 是最有用的反压指标。

采用 Metrics 分析反压的思路: 如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游 。

下表把 inPoolUsage 分为 floatingBuffersUsage 和 exclusiveBuffersUsage ,并且总结上游 Task outPoolUsage 与 floatingBuffersUsage 、 exclusiveBuffersUsage 的关系,进一步的分析一个 Subtask 和其上游 Subtask 的反压情况。

上述主要通过 TaskThread 定位反压,而分析反压原因 类似一个普通程序的性能瓶颈 。

通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认 ,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。解决方式把数据分组的 key 进行本地/预聚合来消除/减少数据倾斜。

对 TaskManager 进行 CPU profile ,分析 TaskThread 是否跑满一个 CPU 核:如果没有跑满,需要分析 CPU 主要花费在哪些函数里面,比如生产环境中偶尔会卡在 Regex 的用户函数(ReDoS);如果没有跑满,需要看 Task Thread 阻塞在哪里,可能是 用户函数本身有些同步的调用 ,可能是 checkpoint 或者 GC 等系统活动 。

TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。可以加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。推荐TaskManager 启用 G1 垃圾回收器来优化 GC。

Apache Fink是一种大规模的数据处理工具,它以大数据量的低数据延迟和高容错性快速处理大数据。它的定义特征是它能够实时处理流数据。

Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎,是一种与 Hadoop 相似的开源集群计算环境。

相同点:

都是apache 软件基金会(ASF)旗下顶级项目,都是通用数据处理平台。它们可以应用在很多的大数据应用和处理环境。两者均可在不依赖于其他环境的情况下运行于standalone模式,或是运行在基于hadoop(YARN,HDFS)之上,由于它们均是运行于内存,所以他们表现的都比hadoop要好很多。

二者的不同:

Flink在进行集合的迭代转换时可以是循环或是迭代计算处理。flink的流式处理的是真正的流处理。流式数据一旦进入就实时进行处理,这就允许流数据灵活地在 *** 作窗口。

Spark 在另一方面是基于d性分布式数据集(RDD),这(主要的)给于spark基于内存内数据结构的函数式编程。它可以通过固定的内存给于大批量的计算。

以上为Flink的运行模型,Flink的程序主要由三部分构成,分别为Source、Transformation、Sink。DataSource主要负责数据的读取,Transformation主要负责对属于的转换 *** 作,Sink负责最终数据的输出。

每个Flink程序都包含以下的若干流程:

执行环境StreamExecutionEnvironment是所有Flink程序的基础。
创建执行环境有三种方式,分别为:

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

返回本地执行环境,需要在调用时指定默认的并行度。

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。
Flink有许多封装在DataStream *** 作里的内置输出格式。

将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的toString()方法来获取。

将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。

打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。

自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。

根据SerializationSchema 将元素写入到socket中。

DataStream → DataStream:输入一个参数产生一个参数。

DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。

DataStream → DataStream:结算每个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

DataStream → DataStream:对两个或者两个以上的DataStream进行union *** 作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union *** 作,在新的DataStream中,你将看到每一个元素都出现两次。

DataStream → KeyedStream:输入必须是Tuple类型,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

KeyedStream → DataStream:一个分组数据流的聚合 *** 作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

KeyedStream → DataStream:一个有初始值的分组数据流的滚动折叠 *** 作,合并当前元素和前一次折叠 *** 作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。

KeyedStream → DataStream:分组数据流上的滚动聚合 *** 作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

在2310之前的算子都是可以直接作用在Stream上的,因为他们不是聚合类型的 *** 作,但是到2310后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/dianzi/13119057.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-06-03
下一篇 2023-06-03

发表评论

登录后才能评论

评论列表(0条)

保存