执行环境(Execution Environment)表示当前执行程序的上下文,它决定了Flink应用程序在什么执行环境(本地或集群)中执行。
不同的执行环境也决定了应用程序的不同类型。批处理和流处理作业分别使用不同的执行环境。
创建执行环境的类:
StreamExecutionEnvironment:用来创建流处理执行环境ExecutionEnvironment:用来创建批处理执行环境
获取执行环境的方法:
getExecutionEnvironment():该方法自动获取当前执行环境,是常用的创建执行环境的方式。如果没有设置并行度,则以flink-conf.yaml文件中配置的并行度为准,默认值是1
createLocalEnvironment():该方法返回本地执行环境,需要在调用时指定并行度。如果将编译后的应用程序发布到集群中,则需要把源码改成远程执行环境
createRemoteEnvironment():该方法返回集群的执行环境,但需要在调用时指定作业管理器的IP地址、端口号和集群中运行的JAR包位置等。
StreamExecutionEnvironment.createRemoteEnvironment("JobManagerHost",6021,5,"/flink_application.jar");二、配置Table API、SQL的执行环境
BatchTableEnvironment类。该类用来创建Table API、SQL的批数据处理执行环境
// 获取批数据处理执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建Table API、SQL程序的执行环境 BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
StreamTableEnvironment类。该类用来创建Table API、SQL的流数据处理执行环境
// 获取流数据处理的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建Table API、SQL程序的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);三、配置执行环境参数
setParallelism():用来设置并行度。在此设置的并行度将使所有算子与并行实例一起运行。此方法将覆盖执行环境中的默认并行度
LocalStraemEnvironment类默认使用等于硬件上下文(CPU内核/线程)数量的值。在通过命令行客户端执行程序时,才使用在这里设置的并行度
setBufferTimeout():用来设置刷新输出缓冲区的最大时间频率(ms)。在默认情况下,输出缓冲区会频繁刷新,以提供低延迟
正整数:以该数值定期触发,优势是提高了吞吐量,劣势是增加了延迟0:在每条记录后触发,从而最大限度的减少等待时间,这会产生性能的损耗-1:缓存中的数据一满就会被发送,这回移除超时机制
setMaxParallelism():用来设置最大并行度,以指定动态缩放的上限。
最大并行度的范围:0
MemoryStateBackend:用内存存储状态(此为默认值),用于小状态,本地调试FsStateBackend:用文件系统存储状态,用于大状态、长窗口、高可用场景RocksDBStateBackend:用Rocks数据库存储状态,用于超大状态、长窗口、可增量检查点和高可用场景
setStreamTimeCharacteristic():用来设置应用程序处理数据流的时间特性。Flink定义了以下3类时间特性的值:
ProcessingTime:处理时间Ingestion Time:摄入时间EventTime:事件时间
setRestartStrategy():用于设置故障重启策略。当任务的失败率上升到一定的程度时,Flink认为本次任务最终是失败的
// 最大失败次数为2,衡量失败次数的时间间隔是1min,也可以将单位设置为s或ms env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.of(1, TimeUnit.MINUTES)));
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)