滚动窗口,想要设置窗口开始的时点,怎么弄。举例说明:
- watermart设置为3s
- 滚动窗口长度设置为5s
- 起点设置为3,即[3,8)是一个窗口
代码没啥说的,就是个offset的使用
import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingProcessingTimeWindows, TumblingEventTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time import com.flink.sourceread.SensorReading import com.util.TimeUtils.convertTimeStamp2DateStr import scala.collection.mutable.ArrayBuffer object waterMarkTest { def main(args: Array[String]): Unit = { val dataList = List( SensorReading("sensor_1", 1609473600, 35.8) ) // 创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 设置时间语义-事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 水位线,每隔50毫秒产生一个watermark // 相当于50ms检验一次,完成数据分配 env.getConfig.setAutoWatermarkInterval(50) // windows 通过nc -lp 7777 命令写入 val inputStream = env.socketTextStream("localhost", 7777) // 先转换成样例类类型(简单转换 *** 作) val dataStream = inputStream .map(data => { val arr = data.trim.split(",") SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L //秒转成毫秒 }) //waterMark延迟时间3秒 // 窗口起始点 val latetag = new OutputTag[(String, Double, Long, String)]("late") // 每5秒统计一次,窗口内各传感器所有温度的最小值,以及最新的时间戳 val resultStream = dataStream .map( data => (data.id, data.timestamp, ArrayBuffer(data.timestamp), convertTimeStamp2DateStr(data.timestamp)) ) .keyBy(_._1) // 按照二元组的第一个元素(id)分组 // 滚动时间窗口,5s滑窗,1s偏移,即统计[3,8)的数据 .window(TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(3))) .reduce((curRes, newData) => (curRes._1, newData._2, curRes._3++newData._3, convertTimeStamp2DateStr(newData._2))) resultStream.print("result") env.execute("window test") } }测试情况
input:
sensor_1", 1609473600, 35.8 sensor_1", 1609473601, 35.8 sensor_1", 1609473602, 35.8 sensor_1", 1609473603, 35.8 sensor_1", 1609473604, 35.8 sensor_1", 1609473605, 35.8 sensor_1", 1609473606, 35.8 sensor_1", 1609473607, 35.8 sensor_1", 1609473608, 35.8 sensor_1", 1609473609, 35.8 sensor_1", 1609473610, 35.8 sensor_1", 1609473620, 35.8
output:
result> (sensor_1",1609473602,ArrayBuffer(1609473600, 1609473601, 1609473602),2021-01-01 12:00:02) result> (sensor_1",1609473607,ArrayBuffer(1609473603, 1609473604, 1609473605, 1609473606, 1609473607),2021-01-01 12:00:07) result> (sensor_1",1609473610,ArrayBuffer(1609473608, 1609473609, 1609473610),2021-01-01 12:00:10)
测试逻辑:
- 输入0-10s的数据
- 可以看到[0,1,2],[3,4,5,6,7],[8,9,10]分成三块
2021-12-03 于南京市江宁区九龙湖
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)