TableAPI和SQL之创建Table(一)

TableAPI和SQL之创建Table(一),第1张

TableAPI和SQL之创建Table(一)

1.从一个文件中导入表结构(Structure)(常用于批计算)(静态)
2.Table API 中已经提供了 TableSource 从外部系统获取数据,例如常见的数据库、文件 系统和 Kafka 消息队列等外部系统。
3.从文件中创建 Table(静态表) Flink 允许用户从本地或者分布式文件系统中读取和写入数据,在 Table API 中可以通 过 CsvTableSource 类来创建,只需指定相应的参数即可。但是文件格式必须是 CSV 格式的。 其 他 文 件 格 式 也 支 持 ( 在 Flink 还 有 Connector 的 来 支 持 其 他 格 式 或 者 自 定 义 TableSource)。

package tablesql

import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource


object TestCreateTableByFile {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
    val table = StreamTableEnvironment.create(environment, settings)

    //读取数据
    val source: CsvTableSource = new CsvTableSource("data/statefile.log",
      Array[String]("f1", "f2", "f3", "f4", "f5", "f6"),
      Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG)
    )

    //注册一张表
    table.registerTableSource("t_station_log",source)
    //打印表结构,或者使用Table API, 需要得到Table对象 API
    val t: Table = table.scan("t_station_log")
    t.printSchema()

  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683677.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存