批处理 是有界数据流处理的范例。在这种模式下,可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。
流处理 正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LnYlQ9Z7-1640774068983)(20200914090758635.png)]
Apache Flink 是一个在无界和有界数据流上进行状态计算的框架和分布式处理引擎。
flink运行组件 JobManagerJobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
-
ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Mesos、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。
-
Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
-
JobMaster
JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。
始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby
TaskManagersTaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。
必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fDzknGDu-1640774068984)(processes.svg)]
其中slot个数等于最大并行数,一个slot中可以执行多个算子
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qsmSMpLE-1640774068984)(tasks_slots.svg)]
默认情况下,Flink 允许 subtask 共享 slot,即便它们是不同的 task 的 subtask,只要是来自于同一作业即可。结果就是一个 slot 可以持有整个作业管道。允许 slot 共享有两个主要优点:
- Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。
- 容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window) 一样多的资源。通过 slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tWVHv2Cc-1640774068985)(https://ci.apache.org/projects/flink/flink-docs-release-1.12/fig/slot_sharing.svg)]
slot内存是隔离的互不影响 同一个taskmanager上共用cpu
前后发生的不同任务可以共享同一个slot
slot共享可以让资源充分利用
运行模式 Flink Session 集群- 集群生命周期:在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。
- 资源隔离:TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。
- 其他注意事项:拥有一个预先存在的集群可以节省大量时间申请资源和启动 TaskManager。有种场景很重要,作业执行时间短并且启动时间长会对端到端的用户体验产生负面的影响 — 就像对简短查询的交互式分析一样,希望作业可以使用现有资源快速执行计算。
- 集群生命周期:在 Flink Job 集群中,可用的集群管理器(例如 YARN 或 Kubernetes)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。
- 资源隔离:JobManager 中的致命错误仅影响在 Flink Job 集群中运行的一个作业。
- 其他注意事项:由于 ResourceManager 必须应用并等待外部资源管理组件来启动 TaskManager 进程和分配资源,因此 Flink Job 集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。
- 集群生命周期:Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且 main()方法在集群上而不是客户端上运行。提交作业是一个单步骤过程:无需先启动 Flink 集群,然后将作业提交到现有的 session 集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用 main()方法来提取 JobGraph。例如,这允许你像在 Kubernetes 上部署任何其他应用程序一样部署 Flink 应用程序。因此,Flink Application 集群的寿命与 Flink 应用程序的寿命有关。
- 资源隔离:在 Flink Application 集群中,ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。
注意: Flink Job 集群可以看做是 Flink Application 集群”客户端运行“的替代方案。
API介绍[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-C0leiDOR-1640774068986)(levels_of_abstraction.svg)]
Flink API 最底层的抽象是Stateful Stream Processing, 抽象实现为process Function(获取状态、注册定时器、获取当前事件的信息)
Flink API 第二层抽象是 Core APIs(使用api中的算子进行相对简单的 *** 作)
Flink API 第三层抽象是 Table API(使用类sql语言进行更加简洁的 *** 作,Flink 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用)
Flink API 最顶层抽象是 SQL(直接使用sql进行 *** 作)
应用场景- 事件驱动型应用(规则报警、流程监控、异常检测)
- 数据分析应用
- 数据管道应用(实时数仓)
Event Time : 事件创建的时间 (一般为kafka中消息中的时间字段,为事件消息的创建事件)
Ingestion Time:数据进入Flink的时间 (如source读取到kafka流时的时间)
Processing Time:执行 *** 作算子的本地系统时间,与机器有关(算子执行当前时间时的时间)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SUWpr12S-1640774068986)(f461d93e248f0c51291055debd50ad00.png)]
Watermark实时计算的输入数据是持续不断的,当我们进行窗口 *** 作时需要一个有效的进度指标。Watermark就是一种衡量事件进展的有效机制。(窗口关闭前闭后开)
- 通常在读入数据源之后直接声明Watermark(取事件消息中的事件时间)
- Watermark的生成是不可逆的
- Flink 应用程序可以通过Watermark得知事件时间的进度,从而关闭窗口。
注意:在上有游数据源输入是多个分区(分片)输入时,Watermark取所有并行数据源中Watermark的最小值。(也就是说如果有一个分区无数据发送,Watermark不会更新)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U5lO3eKk-1640774068987)(parallel_kafka_watermarks.svg)]
作用:watermark处理乱序数据,在开窗口 *** 作时,设置watermark和延迟时间,等待乱序时间到来触发计算。
Watermark策略:
- 周期性Watermark
- 标记生成
会话窗口不重叠,也没有固定的开始和结束时间。当会话窗口在特定时间段内没有接收到元素时,会话窗口将关闭。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1MQZuw7b-1640774068987)(session-windows.svg)]
TimeWindow 滑动窗口可指定窗口大小和滑动步长,具有固定长度的窗口长度,可能重叠。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-egi2HuRN-1640774068988)(sliding-windows.svg)]
滚动窗口翻滚窗口具有固定大小并且不重叠
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-77xKNPPp-1640774068988)(tumbling-windows.svg)]
滑动窗口计算
@Override public CollectionassignWindows( Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } // 计算最后一个窗口的开始时间 public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
这块来一个算窗口的图
processFunctionProcessFunction是一个低级流处理 *** 作,可以访问所有(非循环)流应用程序的基本构建块:
- 事件(流元素)
- 状态(容错性,一致性,仅在键控流上)
- 计时器(事件时间和处理时间,仅在键控流上)
Checkpoint 是 Flink 应用状态的一个一致性副本,定时记录了程序运行中的算子状态、Keyed State等,使得 Flink 能够恢复状态和在流中的位置,从而向应用提供和无故障执行时一样的语义。(可自动恢复)
精准一次程序从checkpoint恢复时读取的是最新一次的完整Checkpoint 。
设置Exactly Once,barrier对齐(多分区时)。
-
对于并行度是1的情况,每个算子收到barrier时,对barrier之前的计算结果进行checkpoint。
-
对于并行度是n的情况,每个算子收到上游所有的barrier时,对之前的的所有计算结果进行checkpoint。在这之前收到单个分区的barrier后,将barrier后的数据进行缓存,不向下游发送。
checkpoint调优
- 频率不宜过高,一般1-5分钟,精确度要求高的可设置20-30s
- 超时时间一般设置频率的一半
- 设置EXACTLY_ONCE
背压问题
- 执行效率慢,通过增大并行度解决。
- 数据倾斜导致,解决数据倾斜问题。
优化GC: 如果频繁发生full GC
- 检查代码是否有频繁创建新对象的地方(优化代码)
- 根据情况调整新生代、老年代比例
- 堆内存大小
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qx0vwEnr-1640774068989)(image-20210831192643652.png)]
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)