object createProgramma { def main(args: Array[String]): Unit = { //批处理的创建方式 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //流处理的创建方式 val env1: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env1.setParallelism(1) env1.execute() //flink SQL&& table 创建方式 基于流式创建 StreamTableEnvironment.create(env1) //基于老式流式处理创建的方法 var setting= EnvironmentSettings.newInstance() .useOldPlanner() .inStreamingMode() //设置流式 .build() StreamTableEnvironment.create(env1,setting) //基于老式批出处理创建的方法 var setting1= EnvironmentSettings.newInstance() BatchTableEnvironment.create(env) //基于blink版本的流处理 var bsSetting = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() StreamTableEnvironment.create(env1,bsSetting) //基于blink版本的批处理 var bsSetting1 = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build() TableEnvironment.create(bsSetting1) } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)