【Flink SQL】【第四章 时间属性】

【Flink SQL】【第四章 时间属性】,第1张

【Flink SQL】【第四章 时间属性

本章内容对应官网:
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html

1.时间属性介绍

Time attributes can be part of every table schema.They are defined when creating a table from a CREATE TABLE DDL or a DataStream.once a time attribute is defined, it can be referenced as a field and used in time-based operations.As long as a time attribute is not modified, and is simply forwarded from one part of a query to another, it remains a valid time attribute. Time attributes behave like regular timestamps, and are accessible for calculations.When used in calculations, time attributes are materialized and act as standard timestamps. However, ordinary timestamps cannot be used in place of, or be converted to, time attributes.

重点:

时间属性可以在用DDL创建Table或者从DataStream转Table的时候指定时间属性在使用的时候就是一个普通字段,并且是标准时间戳类型普通的时间戳和时间属性是不一样的 2.处理时间 1.1 DataStream到Table转换

1.2在创建表的DDL中定义

3.事件时间 1.1 DataStream到Table转换

When converting a DataStream to a table, an event time attribute can be defined with the .rowtime property during schema definition.

Timestamps and watermarks must have already been assigned in the DataStream being converted.

There are two ways of defining the time attribute when converting a DataStream into a Table. Depending on whether the specified .rowtime field name exists in the schema of the DataStream, the timestamp is either (1) appended as a new column, or it (2) replaces an existing column.

In either case, the event time timestamp field will hold the value of the DataStream event time timestamp.

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

// Usage:

WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));
1.2在创建表的DDL中定义
CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3), --时间字段,是TIMESTAMP(3)类型的
  -- 在时间字段的基础上定义watermark
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECt TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5716775.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存