Flink支持三种与流数据处理相关的时间概念:Processing Time、Event Time和Ingestion Time。
Blink SQL仅支持其中的两种时间类型Event Time和Processing Time:
- Event Time:事件时间(通常是数据的最原始的创建时间)。Event Time必须是提供在数据储存里的数据。
- Processing Time:系统对事件进行处理的本地系统时间,单位为毫秒。
Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列,基于现有列构造出一个TIMESTAMP类型的列。
由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此需要首先明文定义一个Watermark计算方法,才能定义一个Row Time字段。
窗口函数基于Event Time聚合的示例如下。
CREATE TABLE tt_stream ( a VARCHAR, b VARCHAR, ts TIMESTAMP, WATERMARK wk1 FOR ts as withOffset (ts, 1000) --Watermark计算方法。 ) WITH ( type = 'sls', topic = 'Processing Time', accessId = ' ', accessKey = ' ' ); CREATE TABLE rds_output ( id VARCHAR, win_start TIMESTAMP, win_end TIMESTAMP, cnt BIGINT ) WITH ( type = 'rds', url = 'jdbc:mysql://****3306/test', tableName = ' ', userName = ' ', password = ' ' ); INSERT INTO rds_output SELECt a AS id, SESSION_START (ts, INTERVAL '1' SECOND) AS win_start, SESSION_END (ts, INTERVAL '1' SECOND) AS win_end, COUNT (a) AS cnt FROM tt_stream GROUP BY SESSION (ts, INTERVAL '1' SECOND), a
Processing Time是系统产生的,不在您的原始数据中,您需要在数据源表的声明中明文定义一个Processing Time列。
filedName as PROCTIME()
窗口函数基于Processing Time聚合的示例如下。
CREATE TABLE mq_stream ( a VARCHAR, b VARCHAR, c BIGINT, ts AS PROCTIME () --在数据源表的声明中明文定义一个Processing Time列。 ) WITH ( type = 'mq', topic = '时间属性字段传递', accessId = ' ', accessKey = ' ' ); CREATE TABLE rds_output ( id VARCHAR, win_start TIMESTAMP, win_end TIMESTAMP, cnt BIGINT ) with ( type = 'rds', url = ' ', tableName = ' ', userName = ' ', password = ' ' ); INSERT INTO rds_output SELECt a AS id, SESSION_START (ts, INTERVAL '1' SECOND) AS win_start, SESSION_END (ts, INTERVAL '1' SECOND) AS win_end, COUNT (a) AS cnt FROM mq_stream GROUP BY SESSION (ts, INTERVAL '1' SECOND), a
时间属性字段经过如下 *** 作后会失去时间属性特性:
- 对时间属性字段以外的字段进行GROUP BY(滚动窗口、滑动窗口或会话窗口中的GROUP BY除外) *** 作。
- 双流JOIN *** 作。
- 复杂事件处理(CEP)语句中的MATCH_RECOGNIZE *** 作。
- OVER窗口中的PARTITION BY *** 作。
- UNIOn *** 作。UNIOn = RETRACT+UNIOn ALL。
如果经过以上 *** 作后,继续使用该时间属性字段进行窗口函数运算,会出现类似org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.的报错。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)