今天学习了watermark传递机制,弄清楚了在多并行度情况下watermark的传递机制,特此备忘
此处使用的案例是参考了尚硅谷武老师flink教程中的案例,在此表示感谢
package com.atguigu.apitest.window; import com.atguigu.apitest.beans.SensorReading; import org.apache.commons.collections.IteratorUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.time.Duration; public class WindowTest3_EventTimeWindow { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStreamSourcedataStreamSource = env.socketTextStream("hadoop1", 9999); SingleOutputStreamOperator map = dataStreamSource.map(new MapFunction () { @Override public SensorReading map(String value) throws Exception { String[] fields = value.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); } }); SingleOutputStreamOperator assignTimestampsAndWatermarks = map.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.seconds(2)) { @Override public long extractTimestamp(SensorReading element) { return element.getTimestamp() * 1000L; } }); //基于事件时间的开窗聚合 SingleOutputStreamOperator cntStream = assignTimestampsAndWatermarks .keyBy("id") .window(TumblingEventTimeWindows.of(Time.seconds(10))) .minBy("temperature"); cntStream.print("cntStream"); env.execute(); } }
pojo类
package com.atguigu.apitest.beans; public class SensorReading { private String id; private Long timestamp; private Double temperature; public SensorReading() { } public SensorReading(String id, Long timestamp, Double temperature) { this.id = id; this.timestamp = timestamp; this.temperature = temperature; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } public Double getTemperature() { return temperature; } public void setTemperature(Double temperature) { this.temperature = temperature; } @Override public String toString() { return "SensorReading{" + "id='" + id + ''' + ", timestamp=" + timestamp + ", temperature=" + temperature + '}'; } }
测试数据
sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718207,36.3 sensor_1,1547718209,32.8 sensor_1,1547718212,37.1情景1:设置并行度为1时
在使用事件时间语义的窗口函数中
package org.apache.flink.streaming.api.windowing.windows;
函数如下
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }``` 因此 1547718199 的起始点为1547718190
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)