Flink配置执行环境和参数

Flink配置执行环境和参数,第1张

Flink配置执行环境和参数 一、配置DataStream API、DataSet API的执行环境

执行环境(Execution Environment)表示当前执行程序的上下文,它决定了Flink应用程序在什么执行环境(本地或集群)中执行。

不同的执行环境也决定了应用程序的不同类型。批处理和流处理作业分别使用不同的执行环境。

创建执行环境的类:

StreamExecutionEnvironment:用来创建流处理执行环境ExecutionEnvironment:用来创建批处理执行环境

获取执行环境的方法:

getExecutionEnvironment():该方法自动获取当前执行环境,是常用的创建执行环境的方式。如果没有设置并行度,则以flink-conf.yaml文件中配置的并行度为准,默认值是1

createLocalEnvironment():该方法返回本地执行环境,需要在调用时指定并行度。如果将编译后的应用程序发布到集群中,则需要把源码改成远程执行环境

createRemoteEnvironment():该方法返回集群的执行环境,但需要在调用时指定作业管理器的IP地址、端口号和集群中运行的JAR包位置等。

StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost",6021,5,"/flink_application.jar");
二、配置Table API、SQL的执行环境

BatchTableEnvironment类。该类用来创建Table API、SQL的批数据处理执行环境

// 获取批数据处理执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建Table API、SQL程序的执行环境
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

StreamTableEnvironment类。该类用来创建Table API、SQL的流数据处理执行环境

// 获取流数据处理的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建Table API、SQL程序的执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
三、配置执行环境参数

setParallelism():用来设置并行度。在此设置的并行度将使所有算子与并行实例一起运行。此方法将覆盖执行环境中的默认并行度

LocalStraemEnvironment类默认使用等于硬件上下文(CPU内核/线程)数量的值。在通过命令行客户端执行程序时,才使用在这里设置的并行度

setBufferTimeout():用来设置刷新输出缓冲区的最大时间频率(ms)。在默认情况下,输出缓冲区会频繁刷新,以提供低延迟

正整数:以该数值定期触发,优势是提高了吞吐量,劣势是增加了延迟0:在每条记录后触发,从而最大限度的减少等待时间,这会产生性能的损耗-1:缓存中的数据一满就会被发送,这回移除超时机制

setMaxParallelism():用来设置最大并行度,以指定动态缩放的上限。

最大并行度的范围:0setStateBackend():用于设置状态后端。Flink内置了以下3种状态后端

MemoryStateBackend:用内存存储状态(此为默认值),用于小状态,本地调试FsStateBackend:用文件系统存储状态,用于大状态、长窗口、高可用场景RocksDBStateBackend:用Rocks数据库存储状态,用于超大状态、长窗口、可增量检查点和高可用场景

setStreamTimeCharacteristic():用来设置应用程序处理数据流的时间特性。Flink定义了以下3类时间特性的值:

ProcessingTime:处理时间Ingestion Time:摄入时间EventTime:事件时间

setRestartStrategy():用于设置故障重启策略。当任务的失败率上升到一定的程度时,Flink认为本次任务最终是失败的

// 最大失败次数为2,衡量失败次数的时间间隔是1min,也可以将单位设置为s或ms
env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.of(1, TimeUnit.MINUTES)));

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5703620.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存