SELECT 语句和 VALUES 语句需要使用 TableEnvironment 的 sqlQuery() 方法加以指定,会以 Table 的形式返回 SELECT (或 VALUE)的查询结果。Table 可被用于 SQL 或 Table API 查询、转换为 DataSet 或 DataStream、输出到 TableSink。SQL 与 Table API 的查询可以进行无缝融合、整体优化。
为了可以在 SQL 查询中访问到表,需要先在 TableEnvironment 中 注册表 (可以通过 TableSource、Table、CREATE TABLE 语句、DataStream 或 DataSet 注册)。为方便起见 Table.toString() 将会在其 TableEnvironment 中以唯一的名称自动注册表,并返回名称。
注意: 查询若包括了不支持的 SQL 特性,将会抛出 TableException 。
以下示例显示如何在已注册和内联表上指定 SQL 查询。
SELECT 语句或者 VALUES 语句可以通过 TableEnvironment.executeSql() 方法来执行,该方法返回 TableResult 对象用于包装查询的结果,一个 Table 对象可以通过 Table.execute() 方法执行获取查询结果。 TableResult.collect() 方法返回一个可以关闭的行迭代器(除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以通过 CloseableIterator#close() 方法主动地关闭作业以防止资源泄露)。 还可以通过 TableResult.print() 方法将查询结果打印到控制台。TableResult 中的结果数据只能被访问一次,因此一个 TableResult 实例中, collect() 方法和 print() 方法不能被同时使用。
TableResult.collect() 与 TableResult.print() 的行为在不同的 checkpointing 模式下略有不同。
Flink 通过支持标准 ANSI SQL的 Apache Calcite 解析 SQL。以下“BNF-语法”描述了批处理和流处理查询中所支持的 SQL 特性的超集。
Flink SQL 对于标识符(表、属性、函数名)的命名策略类似于 Java 的词法约定:
字符串文本常量需要被单引号包起来(如 SELECT 'Hello World' )。两个单引号表示转义(如 SELECT 'It''s me.' )。字符串文本常量支持 Unicode 字符,如需明确使用 Unicode 编码,请使用以下语法:
WITH 提供了编写辅助语句的方法,以便在更大的查询中使用。这些语句通常被称为公共表表达式(Common Table Expression,CTE),可以认为它定义了只存在于一个查询中的临时视图。
WITH 语法:
下面的示例定义了一个 CTE: orders_with_total ,并在 GROUP BY 查询中使用它。
SELECT 语句的一般语法为:
table_expression 可以是任何数据源(表、视图、VALUES 子句、多个表的 Join 结果、子查询)。下面的事例读取 Orders 表的所有列:
select_list 指定 * 表示解析所有的列,但是不建议在生产环境中使用,会降低性能,建议只查询需要的列:
查询可以使用 VALUES 子句,每个元组(Tuple)对应一个 Row,并且可以设置别名:
WHERE 语句可以过滤 Row:
可以对每行数据的指定列调用函数(内置、自定义函数,自定义函数必须提前注册):
如果指定 SELECT DISTINCT,则将从结果集中删除重复行(每组重复中保留一行)。
对于流式查询,计算查询结果所需的状态(State)可能会无限增长。状态大小取决于不同行的数量。可<u>以为查询配置适当的状态生存时间(TTL),以防止状态大小过大。这可能会影响查询结果的正确性</u>。
Window 是流处理的核心。Windows 将流拆分为有限大小的片段应用计算。只有流处理支持。
Flink 1.13 提供了几个 Table-valued functions(TVF,区别于 Group Window Function),将表中的元素划分为 windows,包括:
- 滚动窗口(Tumbling windows)
- 滑动窗口(Hop, Sliding windows)
- 累加窗口(Cumulate windows)
- 会话窗口(Session windows,TVF 暂不支持)
每个元素在逻辑上可以属于多个窗口,具体取决于所使用的窗口函数。TVF 必须和聚合 *** 作一起使用:
假设存在一个 Bid 表
指定一个固定大小的窗口,并且不重叠,语法:
设定一个10分钟大小的滚动窗口,
指定一个固定大小的窗口,设定滑动间隔,元素会被指定给多个窗口,语法:
设定一个10分钟大小,每5分钟滑动的窗口,
指定一个窗口的最大规模,按照指定时间间隔增长累加,直到达到窗口的最大规模,每次窗口增长会进行一次计算,可以理解为多次计算的滚动窗口,语法:
设定一个10分钟大小,每2分钟累计一次的窗口,
Doris官网定义 mysql原始表结构 1.doris中关联mysql外表 结果如下: 2.doris中关联kafka导入数据 查看作业 State为RUNNING,表示已经成功。 停止作业 3.通过flink导入mysql数据到doris 方法1:通过mysql-cdc写入kafka,kafka关联doris表。 方法2:通过阿里云DTS->datahub,然后通过Flink写入kafka,再关联到doris外表 如何处理delete数据?对于方法1,需要手动的删除doris中的数据;对于方法2,可以通过dts_operation_flag字段来标示,dts_operation_flag可以为I/U/D,分别表示添加、更新和删除。那我们就只需要在doris表中添加一个dts_operation_flag字段来标示就可以了,查询数据的时候就不再查询等于D的值。 如何处理脏数据?delete doris中的数据,然后insert正确的值;还有个方法是将关联一个外表(这个是正确的值),然后再将doris中的表和外表中的值diff,将diff的值insert到doris中。flink on yarn cluster的模式, yarn上的应用经常发生异常, 如jobmanager的oom, zk心跳丢失, slot分配请求超时, hdfs文件已存在等等经过排查定位到了是flink sql的解析问题, 像count, where这类的语句在实际执行的时候变成了全量的查询
分析dump文件, 得知内存中存放了该表几乎全量的数据, 但sql加上where条件后, 实际上数据只有10来条, 是create table阶段的问题, 还是sql执行阶段的问题呢?
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)