Event Time : 事件创建的时间 (一般为kafka中消息中的时间字段,为事件消息的创建事件)
Ingestion Time:数据进入Flink的时间 (如source读取到kafka流时的时间)
Processing Time:执行 *** 作算子的本地系统时间,与机器有关(算子执行当前时间时的时间)
Watermark实时计算的输入数据是持续不断的,当我们进行窗口 *** 作时需要一个有效的进度指标。Watermark就是一种衡量事件进展的有效机制。(窗口关闭前闭后开)
- 通常在读入数据源之后直接声明Watermark(取事件消息中的事件时间)
- Watermark的生成是不可逆的
- Flink 应用程序可以通过Watermark得知事件时间的进度,从而关闭窗口。
注意:在上有游数据源输入是多个分区(分片)输入时,Watermark取所有并行数据源中Watermark的最小值。(也就是说如果有一个分区无数据发送,Watermark不会更新)
作用:watermark处理乱序数据,在开窗口 *** 作时,设置watermark延迟时间,等待乱序事件到来触发计算。
Watermark策略:
- 周期性Watermark
- 标记生成
可指定窗口大小和滑动步长,具有固定长度的窗口长度,可能重叠。
滚动窗口翻滚窗口具有固定大小并且不重叠
窗口触发 触发条件a、water_mark时间 >= window_end_time只是第一个条件
b、在[window_start_time,window_end_time)区间中还需要有数据存在,如果没有数据同样是不会触发的。
窗口分配方式消息无顺序到达
乱序消息到达,设置watermark延迟为2。
这样就可以保证数据最大延迟在2s内,数据也可以被分配到对应的窗口正常计算。(根据实际情况估算数据延迟情况后合理设置watermark延迟)
窗口开始时间计算flink计算规则
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
#5分钟滚动窗口为例 => 300000 2021-12-28 11:23:08 => 1640661788095 (timestamp - offset + windowSize) % windowSize => (1640661788095 - 0 + 300000)% 300000 = 188,095 timestamp - (timestamp - offset + windowSize) % windowSize => 1640661788095 - 188,095 = 1640661600000 => 2021-12-28 11:20:00Flink sql WATERMARK
- WATERMARK定义了表的事件时间属性,其形式为WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。
- rowtime_column_name该列的类型必须为TIMESTAMP(3),并且是模式中的极列,它也可以是一个计算列。
- watermark_strategy_expression定义了水印的生成策略。它允许使用包括计算列时间的任意非查询表达式来计算水印;表达式的类型必须是TIMESTAMP(3),表示从Epoch以来的时间。事件中获取watermark的字段不为空且大于之前发出的水印时间会将当前的时间发出(以保证水印记录的创建)。每条水印生成表达式计算全部由框架完成。框架会随时生成的最大水印,如果当前水印仍然与前一个水印相同,或者为空,或返回的水印的值小于最后一个发出的水印,则新的水印不会被发出。水印根据pipeline.auto-watermark-interval中所配置的间隔发出。若水印的间隔是0ms,那么每条记录都会参与计算水印,且水印会在不为空并大于上一个发出的水印时发出。
使用事件时间时,表声明时必须包含事件时间属性和水印策略。
Flink 提供了几种常用的水印策略。
-
递增:WATERMARK FOR rowtime_column AS rowtime_column。
发出已经观察到的最大时间的水印。
-
设置延迟:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND。
发出已观察到的最大时间减去延迟时间的水印。
# 声明无延迟watermark CREATE TABLE Orders ( `user` BIGINT, product STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time ) WITH ( . . . ); # 声明watermark延迟5秒 CREATE TABLE Orders ( `user` BIGINT, product STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( . . . );Group Window
SQL 查询的时间窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数
注意: 辅助函数必须使用与 GROUP BY 子句中的分组窗口函数完全相同的参数来调用.
可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性:
# 计算每日的 SUM(amount)(使用事件时间) SELECt user, TUMBLE_START(rowtime, INTERVAL '5' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user; # 计算每日的 SUM(amount)(使用处理时间) SELECt user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user;实际样例说明 场景一 watermark生成
- watermark正常推进
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:01”} => 2021-12-30 23:20:01 (减8小时)
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:05”} => 2021-12-30 23:20:05 (减8小时)
2.watermark延迟生成(WATERMARK FOR row_time AS row_time - INTERVAL ‘5’ SECOND)
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:01”} => 2021-12-30 23:19:56 (减8小时)
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:05”} => 2021-12-30 23:20:00 (减8小时)
场景二 事件时间窗口触发- 不设置延迟触发
#SET table.local-time-zone=UTC; create table test1( id String, name String, age int, update_time String, row_time as TO_TIMESTAMP(update_time), WATERMARK FOR row_time AS row_time )with( 'connector' = 'kafka', 'properties.bootstrap.servers'='192.168.10.55:9092,192.168.10.81:9092', 'properties.group.id' = 'test1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'sink.parallelism' = '1', 'json.ignore-parse-errors' = 'true', 'topic'='test1'); # 窗口查询 SELECT name, TUMBLE_START(row_time, INTERVAL '5' SECOND) as wStart, TUMBLE_END(row_time, INTERVAL '5' SECOND) as wEnd, SUM(age) FROM test1 GROUP BY TUMBLE(row_time, INTERVAL '5' SECOND), name;
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:01”} => 根据窗口计算规则,应该开15:20:00 - 15:20:05窗口
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:05”}=> 时间到达15:20:05 ,窗口触发(左闭右开)
2.设置wateremark延迟。
#SET table.local-time-zone=UTC; create table test1( id String, name String, age int, update_time String, row_time as TO_TIMESTAMP(update_time), WATERMARK FOR row_time AS row_time - INTERVAL '2' SECOND )with( 'connector' = 'kafka', 'properties.bootstrap.servers'='192.168.10.55:9092,192.168.10.81:9092', 'properties.group.id' = 'test1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'sink.parallelism' = '1', 'json.ignore-parse-errors' = 'true', 'topic'='test1'); # 窗口查询 SELECT name, TUMBLE_START(row_time, INTERVAL '5' SECOND) as wStart, TUMBLE_END(row_time, INTERVAL '5' SECOND) as wEnd, SUM(age) FROM test1 GROUP BY TUMBLE(row_time, INTERVAL '5' SECOND), name;
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:01”} => 根据窗口计算规则,应该开15:20:00 - 15:20:05窗口。
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:05”}=> 时间到达15:20:03 。
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:07”}=> 时间到达15:20:05 ,窗口触发(左闭右开,不包括15:20:05时间的数据)
场景三 系统时间窗口触发#SET table.local-time-zone=UTC; create table test1( id String, name String, age int, update_time String, proctime as PROCTIME() )with( 'connector' = 'kafka', 'properties.bootstrap.servers'='192.168.10.55:9092,192.168.10.81:9092', 'properties.group.id' = 'test1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'sink.parallelism' = '1', 'json.ignore-parse-errors' = 'true', 'topic'='test1'); # 窗口查询 SELECT name, TUMBLE_START(proctime, INTERVAL '5' SECOND) as wStart, TUMBLE_END(proctime, INTERVAL '5' SECOND) as wEnd, SUM(age) FROM test1 GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND), name;
{“id”:“11111111”,“name”:“张三”,“age”:22,“update_time”:“2021-12-30 15:20:01”} => 窗口触发通过系统时间前进到达触发条件,触发窗口
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)