11.2.11、flink核心

11.2.11、flink核心,第1张

11.2.11、flink核心 1、创建环境blink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
2、代码实现读取流数据转成表

将结果流转成流

—toRetractStream:更新的表
—toAppendStream:追加的表

object Demo01API {
  def main(args: Array[String]): Unit = {

    val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings
      .newInstance()
      .useBlinkPlanner() //使用blink的计划器
      .inStreamingMode() //使用流模型
      .build()
    //创建table的环境
    val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)

    //构建一个流
    val lineDS: DataStream[String] = bsEnv.socketTextStream("master", 8888)

    
    val table: Table = bsTableEnv.fromDataStream(lineDS, $"word")
    //将数据集注册成表,表名为words
    bsTableEnv.createTemporaryView("words", table)

    
    val countTable: Table = bsTableEnv.sqlQuery(
      """
        |select word,count(1) from words group by word
      """.stripMargin)

    
    val result: DataStream[(Boolean, Row)] = countTable.toRetractStream[Row]
    result.print()
    bsEnv.execute()
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701130.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存