import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)2、代码实现读取流数据转成表
将结果流转成流
—toRetractStream:更新的表
—toAppendStream:追加的表
object Demo01API { def main(args: Array[String]): Unit = { val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() //使用blink的计划器 .inStreamingMode() //使用流模型 .build() //创建table的环境 val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) //构建一个流 val lineDS: DataStream[String] = bsEnv.socketTextStream("master", 8888) val table: Table = bsTableEnv.fromDataStream(lineDS, $"word") //将数据集注册成表,表名为words bsTableEnv.createTemporaryView("words", table) val countTable: Table = bsTableEnv.sqlQuery( """ |select word,count(1) from words group by word """.stripMargin) val result: DataStream[(Boolean, Row)] = countTable.toRetractStream[Row] result.print() bsEnv.execute() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)