原文链接:Flink Window那些事——AggregateFunction窗口函数 - Ruthless - 博客园
AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。
输入类型是输入流中的元素类型,AggregateFunction有一个add方
法可以将一个输入元素添加到一个累加器中。该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。
package com.lynch.stream.window; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TestAggFunctionOnWindow { public static void main(String[] args) throws Exception { // 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取数据 DataStream> input = env.fromElements(ENGLISH); // 求各个班级英语成绩平均分 DataStream avgScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggrate()); avgScore.print(); env.execute("TestAggFunctionOnWindow"); } public static final Tuple3[] ENGLISH = new Tuple3[] { Tuple3.of("class1", "张三", 100L), Tuple3.of("class1", "李四", 40L), Tuple3.of("class1", "王五", 60L), Tuple3.of("class2", "赵六", 20L), Tuple3.of("class2", "小七", 30L), Tuple3.of("class2", "小八", 50L), }; //Tuple3 输入类型 //Tuple2 累加器ACC类型,保存中间状态 //Double 输出类型 public static class AverageAggrate implements AggregateFunction , Tuple2 , Double> { @Override public Tuple2 createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2 add(Tuple3 value, Tuple2 acc) { //acc.f0 总成绩 //value.f2 表示成绩 //acc.f1 人数 return new Tuple2<>(acc.f0 + value.f2, acc.f1 + 1L); } @Override public Double getResult(Tuple2 acc) { return ((double) acc.f0) / acc.f1; } @Override public Tuple2 merge(Tuple2 acc1, Tuple2 acc2) { return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)