1.从一个文件中导入表结构(Structure)(常用于批计算)(静态)
2.Table API 中已经提供了 TableSource 从外部系统获取数据,例如常见的数据库、文件 系统和 Kafka 消息队列等外部系统。
3.从文件中创建 Table(静态表) Flink 允许用户从本地或者分布式文件系统中读取和写入数据,在 Table API 中可以通 过 CsvTableSource 类来创建,只需指定相应的参数即可。但是文件格式必须是 CSV 格式的。 其 他 文 件 格 式 也 支 持 ( 在 Flink 还 有 Connector 的 来 支 持 其 他 格 式 或 者 自 定 义 TableSource)。
package tablesql import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{EnvironmentSettings, Table} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.sources.CsvTableSource object TestCreateTableByFile { def main(args: Array[String]): Unit = { val environment = StreamExecutionEnvironment.getExecutionEnvironment val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build() val table = StreamTableEnvironment.create(environment, settings) //读取数据 val source: CsvTableSource = new CsvTableSource("data/statefile.log", Array[String]("f1", "f2", "f3", "f4", "f5", "f6"), Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG) ) //注册一张表 table.registerTableSource("t_station_log",source) //打印表结构,或者使用Table API, 需要得到Table对象 API val t: Table = table.scan("t_station_log") t.printSchema() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)