延迟:表示处理一个事件所需要的的时间
吞吐:用来衡量系统处理能力(处理速率)的指标
处理速率取决于数据到来速率,因此吞吐低不意味着性能差
通过并行处理多条数据流,可以在处理更多事件的同时降低延迟
无状态:处理事件时无需依赖已经处理过的事件
有状态:维持内部状态
数据接入与输出
转换 *** 作
滚动聚合(例如求和 最小值 最大值)
窗口 *** 作(“桶”的有限事件集合):滚动窗口 滑动窗口 会话窗口
处理时间:当前流处理算子所在机子的本地时钟时间
事件时间:数据流实际发生时间(将处理速度和内容结果彻底解耦)
问题:如何处理延迟事件
水位线:一个全局进度指标,表示我们确信不会再有延迟事件到来的某个时间点
虽然处理时间提供了很低的延迟,但是结果依赖于处理速度,具有不确定性
事件时间能保证结果的准确性并且允许处理延迟甚至无序的事件
传统的处理无限数据的通常方法:将到来的事件分成小批次,不停地在批处理系统上调度并运行作业,其结果都会写入持久化储存中,同时所有算子的状态都将不复存在
状态管理
状态划分
状态恢复
任务的执行步骤
接收事件并将它们缓存在本地缓冲区
选择性地更新内部状态
产生输出记录
Flink EventTime和Watermark
>
反压(backpressure)是流式计算中十分常见的问题。 反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速 。由于实时计算应用通常使用消息队列来进行生产端和消费端的解耦,消费端数据源是 pull-based 的,所以 反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。
反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟 。通常来说,对于一些对延迟要求不高或者数据量较少的应用,反压的影响可能并不明显。然而对于规模比较大的 Flink 作业,反压可能会导致严重的问题。
网络流控的实现: 动态反馈/自动反压
Flink 的数据交换有3种:① 同一个 Task 的数据交换 ,② 不同 Task 同 JVM 下的数据交换 ,③ 不同 Task 且不同 TaskManager 之间的交换 。
通过 算子链 operator chain 串联多个算子 ,主要作用是避免了 序列化 和 网络通信 的开销。
在 TaskA 中,算子输出的数据首先通过 record Writer 进行序列化,然后传递给 result Partition 。接着,数据通过 local channel 传递给 TaskB 的 Input Gate,然后传递给 record reader 进行反序列。
与上述(2)的不同点是数据先传递给 netty ,通过 netty 把数据推送到远程端的 Task 。
15 版本之前是采用 TCP 流控机制,而没有采用feedback机制 。
发送端 Flink 有一层Network Buffer,底层用Netty通信即有一层Channel Buffer,最后Socket通信也有Buffer,同理接收端也有对应的3级 Buffer。Flink (before V15)实质是利用 TCP 的流控机制来实现 feedback 。
TCP报文段首部有16位窗口字段,当接收方收到发送方的数据后,ACK响应报文中就将自身缓冲区的剩余大小设置到放入16位窗口字段 。该窗口字段值是随网络传输的情况变化的,窗口越大,网络吞吐量越高。
参考:1 计算机网络31 运输层 - TCP/UDP协议
2 Apache Flink 进阶教程(七):网络流控及反压剖析
例子:TCP 利用滑动窗口限制流量
步骤1 :发送端将 4,5,6 发送,接收端也能接收全部数据。
步骤2 :consumer 消费了 2 ,接收端的窗口会向前滑动一格,即窗口空余1格。接着向发送端发送 ACK = 7、window = 1 。
步骤3:发送端将 7 发送后,接收端接收到 7 ,但是接收端的 consumer 故障不能消费数据。这时候接收端向发送端发送 ACK = 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。
在 Flink 层面实现反压机制,通过 ResultPartition 和 InputGate 传输 feedback 。
Storm 在每一个 Bolt 都会有一个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队列(recv queue)出现了严重阻塞就会把这个情况写到 ZooKeeper 里,ZooKeeper 会一直被 Spout 监听,监听到有反压的情况就会停止发送 。因此,通过这样的方式匹配上下游的发送接收速率。
组件 RateController 监听负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息。RateEstimator 依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream 将 rate 转发给 Executor 的 BlockGenerator,并更新RateLimiter 。
Flink、Storm、Spark Streaming 的反压机制都采用动态反馈/自动反压原理,可以动态反映节点限流情况,进而实现自动的动态反压。
Flink Web UI 的反压监控提供了 Subtask 级别 的反压监控。监控的原理是 通过ThreadgetStackTrace() 采集在 TaskManager 上正在运行的所有线程,收集在缓冲区请求中阻塞的线程数(意味着下游阻塞),并计算缓冲区阻塞线程数与总线程数的比值 rate 。其中,rate < 01 为 OK,01 <= rate <= 05 为 LOW,rate > 05 为 HIGH。
Network 和 task I/O metrics 是轻量级反压监视器,用于正在持续运行的作业,其中一下几个 metrics 是最有用的反压指标。
采用 Metrics 分析反压的思路: 如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游 。
下表把 inPoolUsage 分为 floatingBuffersUsage 和 exclusiveBuffersUsage ,并且总结上游 Task outPoolUsage 与 floatingBuffersUsage 、 exclusiveBuffersUsage 的关系,进一步的分析一个 Subtask 和其上游 Subtask 的反压情况。
上述主要通过 TaskThread 定位反压,而分析反压原因 类似一个普通程序的性能瓶颈 。
通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认 ,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。解决方式把数据分组的 key 进行本地/预聚合来消除/减少数据倾斜。
对 TaskManager 进行 CPU profile ,分析 TaskThread 是否跑满一个 CPU 核:如果没有跑满,需要分析 CPU 主要花费在哪些函数里面,比如生产环境中偶尔会卡在 Regex 的用户函数(ReDoS);如果没有跑满,需要看 Task Thread 阻塞在哪里,可能是 用户函数本身有些同步的调用 ,可能是 checkpoint 或者 GC 等系统活动 。
TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。可以加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。推荐TaskManager 启用 G1 垃圾回收器来优化 GC。
1、fromCollection(Collection) - 从 Java 的 JavautilCollection 创建数据流。集合中的所有元素类型必须相同。
2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。
3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。
4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。
5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。
1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。
2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。
3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingModePROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingModePROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。
实现:
重要注意:
socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。
addSource - 添加一个新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer011<>(…)) 以从 Apache Kafka 读取数据。
1、基于集合:有界数据集,更偏向于本地测试用
2、基于文件:适合监听文件修改并读取其内容
3、基于 Socket:监听主机的 host port,从 Socket 中获取数据
4、自定义 addSource:大多数的场景数据都是无界的,会源源不断的过来。比如去消费 Kafka 某个 topic 上的数据,这时候就需要用到这个 addSource,可能因为用的比较多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等类可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 这个基础类,它是 Flink Kafka 消费的最根本的类。
5、flink目前支持的source详细可以阅读官网connects部分;
flink窗口的种类及详述:
滚动窗口(tumblingwindow)将事件分配到长度固定且互不重叠的桶中。
实际案例:简单且常见的分维度分钟级别同时在线用户数、总销售额
Java设置语句:window(TumblingProcessingTimeWindowsof(Timeseconds(5)))
该语句为设置滚动窗口的窗口时长为5秒钟
sql设置语句:FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND))
Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND)),包含三部分参数。
第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND 声明滚动窗口大小为 1 min
滑动窗口:分配器将每个元素分配给固定窗口大小的窗口。与滚动窗口分配器类似,窗口的大小由 window size 参数配置。还有一个window slide参数用来控制滑动窗口的滑动大小。因此,如果滑动大小小于窗口大小,则滑动窗口会重叠。在这种情况下,一个元素会被分配到多个窗口中。
实际案例:简单且常见的分维度分钟级别同时在线用户数,1 分钟输出一次,计算最近 5 分钟的数据
java设置语句:window(SlidingProcessingTimeWindowsof(Timeseconds(10), Timeseconds(5)))
window size :窗口大小为 10秒钟
window slide:窗口间隔为5秒钟
sql设置语句: hop(row_time, interval '1' minute, interval '5' minute)
第一个参数为事件时间的时间戳;第二个参数为滑动窗口的滑动步长;第三个参数为滑动窗口大小。
会话窗口:分配器通过活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。当会话窗口在一段时间内没有接收到元素时会关闭。会话窗口分配器需要配置一个会话间隙,定义了所需的不活动时长。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。
实际案例:计算每个用户在活跃期间(一个 Session)总共购买的商品数量,如果用户 5 分钟没有活动则视为 Session 断开
设置语句:基于事件时间的会话窗口window(EventTimeSessionWindowswithGap(Timeminutes(10)))
基于处理时间的会话窗口
Java设置:window(ProcessingTimeSessionWindowswithGap(Timeminutes(10)))
会话间隙,不活动时长为10秒钟
sql设置:session(row_time, interval '5' minute)
Group Window Aggregation 中 Session 窗口的写法就是把 session window 的声明写在了 group by 子句中
Session 窗口即支持 处理时间 也支持 事件时间。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。
渐进式窗口:在其实就是 固定窗口间隔内提前触发的的滚动窗口,其实就是 Tumble Window + early-fire 的一个事件时间的版本。例如,从每日零点到当前这一分钟绘制累积 UV,其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。渐进式窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。
应用场景:周期内累计 PV,UV 指标(如每天累计到当前这一分钟的 PV,UV)。这类指标是一段周期内的累计状态,对分析师来说更具统计分析价值,而且几乎所有的复合指标都是基于此类指标的统计(不然离线为啥都要累计一天的数据,而不要一分钟累计的数据呢)。
实际案例:每天的截止当前分钟的累计 money(sum(money)),去重 id 数(count(distinct id))。每天代表渐进式窗口大小为 1 天,分钟代表渐进式窗口移动步长为分钟级别
sql设置:FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND
, INTERVAL '1' DAY))
Windowing TVF 滚动窗口的写法就是把 cumulate window 的声明写在了数据源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)),其中包含四部分参数:
第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND 声明渐进式窗口触发的渐进步长为 1 min。第四个参数 INTERVAL '1' DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计
全局窗口:分配器将具有相同 key 的所有元素分配给同一个全局窗口。仅当我们指定自定义触发器时,窗口才起作用。否则,不会执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然结束的点(译者注:即本身自己不知道窗口的大小,计算多长时间的元素)
window(GlobalWindowscreate())
平时滑动窗口用得比较多,其次是滚动窗口
需要在主机下安装jq,jq是Linux下面把文本字符串格式化成json格式的工具
#! /bin/bash
current=`date "+%Y-%m-%d %H:%M:%S"`
timeStamp=`date -d "$current" +%s`
currentTimeStamp=$((timeStamp1000+10#`date "+%N"`/1000000))
echo $currentTimeStamp
flink run -c detectZcDataAudit detect_flink-10-SNAPSHOTjar -dbconnect "/mdata1/hadoop/resources/dbconnectxml" -FlowID 9900001 -taskId 3 -Parallel 36 -billingCycleId 12009 -tms $currentTimeStamp
RESULT=`curl >
以上就是关于2020-10-31-Flink-7(流处理基础)全部的内容,包括:2020-10-31-Flink-7(流处理基础)、Flink的处理机制以及侧输出应用、【Flink 精选】如何分析及处理反压等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)