- 快速上手
- 一个简单示例
如果我们对关系型数据库和 SQL 非常熟悉,那么 Table API 和 SQL 的使用其实非常简单:只要得到一个“表”(Table),然后对它调用 Table API,或者直接写 SQL 就可以了。接下来我们就以一个非常简单的例子上手,初步了解一下这种高层级 API 的使用方法。
Flink1.13版本不算稳定,依然在不停的调整和更新,关注 原理和基本用法。
需要引入的依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
这里的依赖是一个 Java 的“桥接器”(bridge),主要就是负责 Table API 和下层 DataStream API 的连接支持,按照不同的语言分为 Java 版和 Scala 版。
如果我们希望在本地的集成开发环境(IDE)里运行 Table API 和 SQL,还需要引入以下依赖:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
这里主要添加的依赖是一个“计划器”(planner),它是 Table API 的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的 blink planner。由于 Flink 安装包的 lib 目录下会自带 planner,所以在生产集群环境中提交的作业不需要打包这个依赖。
而在 Table API 的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个 Scala 版流处理的相关依赖。另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-commonartifactId>
<version>${flink.version}version>
dependency>
一个简单示例
public class SimpleTableExample {
public static void main(String[] args) throws Exception {
//1.创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//读取数据源
SingleOutputStreamOperator<Event> dataStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
//2.创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//3.将DataStream 转换成 Table
Table eventTable = tableEnv.fromDataStream(dataStream);
//TableAPI方式编写
Table result1 = eventTable.select($("user"), $("url"), $("timestamp"))
.where($("user")
.isEqual("Bob"));
//SQL方式输出
String sql = "select user,url,`timestamp` from " + eventTable;
Table result2 = tableEnv.sqlQuery(sql);
//输出
tableEnv.toDataStream(result1).print("TableAPI");
tableEnv.toDataStream(result2).print("SQL");
env.execute();
}
}
这里我们需要创建一个“表环境”(TableEnvironment),然后将数据流(DataStream)转换成一个表(Table);之后就可以执行 SQL 在这个表中查询数据了。查询得到的结果依然是一个表,把它重新转换成流就可以打印输出了。
这里的$符号是 Table API 中定义的“表达式”类 Expressions 中的一个方法,传入一个字段名称,就可以指代数据中对应字段。将得到的表转换成流打印输出,会发现结果与直接执行SQL 完全一样。
注意:写sql语句中,如果遇到字段与sql中的关键字冲突时,一定要在字段上加入`` 号,例如 timestamp
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)