本章内容对应官网:
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
Time attributes can be part of every table schema.They are defined when creating a table from a CREATE TABLE DDL or a DataStream.once a time attribute is defined, it can be referenced as a field and used in time-based operations.As long as a time attribute is not modified, and is simply forwarded from one part of a query to another, it remains a valid time attribute. Time attributes behave like regular timestamps, and are accessible for calculations.When used in calculations, time attributes are materialized and act as standard timestamps. However, ordinary timestamps cannot be used in place of, or be converted to, time attributes.
重点:
时间属性可以在用DDL创建Table或者从DataStream转Table的时候指定时间属性在使用的时候就是一个普通字段,并且是标准时间戳类型普通的时间戳和时间属性是不一样的 2.处理时间 1.1 DataStream到Table转换
1.2在创建表的DDL中定义 3.事件时间 1.1 DataStream到Table转换
When converting a DataStream to a table, an event time attribute can be defined with the .rowtime property during schema definition.
Timestamps and watermarks must have already been assigned in the DataStream being converted.
There are two ways of defining the time attribute when converting a DataStream into a Table. Depending on whether the specified .rowtime field name exists in the schema of the DataStream, the timestamp is either (1) appended as a new column, or it (2) replaces an existing column.
In either case, the event time timestamp field will hold the value of the DataStream event time timestamp.
// Option 1: // extract timestamp and assign watermarks based on knowledge of the stream DataStream1.2在创建表的DDL中定义> stream = inputStream.assignTimestampsAndWatermarks(...); // declare an additional logical field as an event time attribute Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime()); // Option 2: // extract timestamp from first field, and assign watermarks based on knowledge of the stream DataStream > stream = inputStream.assignTimestampsAndWatermarks(...); // the first field has been used for timestamp extraction, and is no longer necessary // replace first field with a logical event time attribute Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data")); // Usage: WindowedTable windowedTable = table.window(Tumble .over(lit(10).minutes()) .on($("user_action_time")) .as("userActionWindow"));
CREATE TABLE user_actions ( user_name STRING, data STRING, user_action_time TIMESTAMP(3), --时间字段,是TIMESTAMP(3)类型的 -- 在时间字段的基础上定义watermark WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH ( ... ); SELECt TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)