- 窗口 top-N flink 1.12
package com.cn.stream; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.stream.Collectors; public class WindowTopN { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourceInput = env.readTextFile("E:\大数据相关-学员参考\flinkdemo\src\main\resources\ok.txt"); SingleOutputStreamOperator > InputMap = Input.map(new MapFunction >() { @Override public Tuple3 map(String value) throws Exception { String[] v = value.split(","); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"); long timestampe = LocalDateTime.parse(v[0], formatter).toInstant(ZoneOffset.of("+8")).toEpochMilli(); return new Tuple3 (timestampe, Float.valueOf(v[1]), v[2]); } }).uid("001-zlg-map01").name("切分数据源") .assignTimestampsAndWatermarks( WatermarkStrategy . >forBoundedOutOfOrderness( Duration.ofSeconds(5)) .withTimestampAssigner((event, timstamp) -> { return event.f0; } ).withIdleness(Duration.ofSeconds(1))); SingleOutputStreamOperator >> process = InputMap .rebalance() .keyBy(new KeySelector
, String>() { @Override public String getKey(Tuple3 value) throws Exception { return value.f2; } }) .window(TumblingEventTimeWindows.of(Time.minutes(10))) .aggregate(new aggregations(), new processWindowsA()).uid("aggregation").name("窗口聚合") .keyBy(new KeySelector , Long>() { @Override public Long getKey(Tuple3 value) throws Exception { return value.f2; } }) .process(new keywd()).uid("topc").name("求topn"); process.print(); env.execute("top n"); } } class aggregations implements AggregateFunction , Float, Float> { //创建一个 累加器 @Override public Float createAccumulator() { float num = 0l; return num; } // 创建累加规则 @Override public Float add(Tuple3 value, Float accumulator) { return accumulator + value.f1; } //返回的结果 @Override public Float getResult(Float accumulator) { return accumulator; } // 对不同的节点 计算结果的汇总 @Override public Float merge(Float a, Float b) { return a + b; } } class processWindowsA implements WindowFunction , String, TimeWindow> { // *** 作 里面有 状态的信息 返回 关闭的窗口的时间 @Override public void apply(String s, TimeWindow window, Iterable input, Collector > out) throws Exception { Float next = input.iterator().next(); // 获取 窗口结束的时候 目的是为了 触发 onTimer 去处排序和 取值问题 long end = window.getEnd(); out.collect(new Tuple3 (next, s, end)); } } class keywd extends KeyedProcessFunction , List >> { ListState > listState = null; @Override public void open(Configuration parameters) throws Exception { // 在开始前就 初始化 状态 和状态描述器 ListStateDescriptor > tuple3ListStateDescriptor = new ListStateDescriptor >("top-n", TypeInformation.of(new TypeHint >() { @Override public TypeInformation > getTypeInfo() { return super.getTypeInfo(); } })); listState = getRuntimeContext().getListState(tuple3ListStateDescriptor); } @Override public void onTimer(long timestamp, KeyedProcessFunction , List >>.onTimerContext ctx, Collector >> out) throws Exception { // 将状态 状装入 集合并且清除 状态集合 List
> list = new ArrayList<>(); Iterator > iterator = listState.get().iterator(); while (iterator.hasNext()) { Tuple3 next = iterator.next(); list.add(next); } listState.clear(); //排序 取前三 List > collect = list.stream().sorted(new Comparator >() { @Override public int compare(Tuple3 o1, Tuple3 o2) { return o1.f0 - o2.f0 < 0 ? 1 : -1; } }).limit(3).collect(Collectors.toList()); out.collect(collect); } @Override public void processElement(Tuple3 value, KeyedProcessFunction , List >>.Context ctx, Collector >> out) throws Exception { //将 数据 装入到集合中 listState.add(value); //注册ontimer 执行时间 就是在 窗口 关闭的 后1 纳秒 就 执行 ctx.timerService().registerEventTimeTimer(value.f2 + 1); } }
提供的 文本
2020-04-15 08:05,4.00,supplier1 2020-04-15 08:06,4.00,supplier2 2020-04-15 08:07,2.00,supplier1 2020-04-15 08:08,2.00,supplier3 2020-04-15 08:09,5.00,supplier4 2020-04-15 08:11,2.00,supplier3 2020-04-15 08:13,1.00,supplier1 2020-04-15 08:15,3.00,supplier2 2020-04-15 08:17,6.00,supplier5 2020-04-15 08:25,6.00,supplier5 2020-04-15 08:30,6.00,supplier5
注意 需要注意的有以下几点 第一个 引入包的问题 因为 flink 有java scala 两套API 所以在引入的时候一定观察好不要引入错了 而造成 聚合报错
第二个 就是 类型 一般 使用Tuple的时候很容易造成 类型推断问题 在 程序里面参考我做的 处理 基本可以避免此类问题 缺点但就是 写的时候可能繁琐一些
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)