Flink TableAPI和SQL(一)快速上手

Flink TableAPI和SQL(一)快速上手,第1张

文章目录
  • 快速上手
  • 一个简单示例

快速上手

如果我们对关系型数据库和 SQL 非常熟悉,那么 Table API 和 SQL 的使用其实非常简单:只要得到一个“表”(Table),然后对它调用 Table API,或者直接写 SQL 就可以了。接下来我们就以一个非常简单的例子上手,初步了解一下这种高层级 API 的使用方法。

Flink1.13版本不算稳定,依然在不停的调整和更新,关注 原理和基本用法。

需要引入的依赖

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-table-api-java-bridge_${scala.binary.version}artifactId>
  <version>${flink.version}version>
dependency>

这里的依赖是一个 Java 的“桥接器”(bridge),主要就是负责 Table API 和下层 DataStream API 的连接支持,按照不同的语言分为 Java 版和 Scala 版。
如果我们希望在本地的集成开发环境(IDE)里运行 Table API 和 SQL,还需要引入以下依赖:

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
  <version>${flink.version}version>
dependency>
<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
  <version>${flink.version}version>
dependency>

这里主要添加的依赖是一个“计划器”(planner),它是 Table API 的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的 blink planner。由于 Flink 安装包的 lib 目录下会自带 planner,所以在生产集群环境中提交的作业不需要打包这个依赖。

而在 Table API 的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个 Scala 版流处理的相关依赖。另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:

<dependency>
  <groupId>org.apache.flinkgroupId>
  <artifactId>flink-table-commonartifactId>
  <version>${flink.version}version>
dependency>
一个简单示例
public class SimpleTableExample {
    public static void main(String[] args) throws Exception {
        //1.创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据源
        SingleOutputStreamOperator<Event> dataStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        //2.创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.将DataStream 转换成 Table
        Table eventTable = tableEnv.fromDataStream(dataStream);

        //TableAPI方式编写
        Table result1 = eventTable.select($("user"), $("url"), $("timestamp"))
                .where($("user")
                        .isEqual("Bob"));

        //SQL方式输出
        String sql = "select user,url,`timestamp` from " + eventTable;
        Table result2 = tableEnv.sqlQuery(sql);

        //输出
        tableEnv.toDataStream(result1).print("TableAPI");
        tableEnv.toDataStream(result2).print("SQL");

        env.execute();
    }
}

这里我们需要创建一个“表环境”(TableEnvironment),然后将数据流(DataStream)转换成一个表(Table);之后就可以执行 SQL 在这个表中查询数据了。查询得到的结果依然是一个表,把它重新转换成流就可以打印输出了。

这里的$符号是 Table API 中定义的“表达式”类 Expressions 中的一个方法,传入一个字段名称,就可以指代数据中对应字段。将得到的表转换成流打印输出,会发现结果与直接执行SQL 完全一样。

注意:写sql语句中,如果遇到字段与sql中的关键字冲突时,一定要在字段上加入`` 号,例如 timestamp

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/871758.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存