flink1.12 stream windows-top-n

flink1.12 stream windows-top-n,第1张

flink1.12 stream windows-top-n
  1. 窗口 top-N flink 1.12
package com.cn.stream;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.util.Collector;


import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;


public class WindowTopN {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
     
       
        DataStreamSource Input = env.readTextFile("E:\大数据相关-学员参考\flinkdemo\src\main\resources\ok.txt");
        SingleOutputStreamOperator> InputMap = Input.map(new MapFunction>() {

                    @Override
                    public Tuple3 map(String value) throws Exception {
                        String[] v = value.split(",");
                        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
                        long timestampe = LocalDateTime.parse(v[0], formatter).toInstant(ZoneOffset.of("+8")).toEpochMilli();
                        return new Tuple3(timestampe, Float.valueOf(v[1]), v[2]);
                    }
                }).uid("001-zlg-map01").name("切分数据源")
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .>forBoundedOutOfOrderness(
                                        Duration.ofSeconds(5))
                                .withTimestampAssigner((event, timstamp) -> {
                                            return event.f0;
                                        }
                                ).withIdleness(Duration.ofSeconds(1)));


        SingleOutputStreamOperator>> process = InputMap
                .rebalance()
                .keyBy(new KeySelector, String>() {
                    @Override
                    public String getKey(Tuple3 value) throws Exception {
                        return value.f2;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.minutes(10)))
                .aggregate(new aggregations(), new processWindowsA()).uid("aggregation").name("窗口聚合")
                .keyBy(new KeySelector, Long>() {
                    @Override
                    public Long getKey(Tuple3 value) throws Exception {
                        return value.f2;
                    }
                })
                .process(new keywd()).uid("topc").name("求topn");

        process.print();
        env.execute("top n");

    }
}

class aggregations implements AggregateFunction, Float, Float> {
    //创建一个 累加器
    @Override
    public Float createAccumulator() {
        float num = 0l;
        return num;
    }

    // 创建累加规则
    @Override
    public Float add(Tuple3 value, Float accumulator) {
        return accumulator + value.f1;
    }

    //返回的结果
    @Override
    public Float getResult(Float accumulator) {
        return accumulator;
    }

    // 对不同的节点 计算结果的汇总
    @Override
    public Float merge(Float a, Float b) {
        return a + b;
    }
}

class processWindowsA implements WindowFunction, String, TimeWindow> {

    // *** 作 里面有 状态的信息 返回 关闭的窗口的时间
    @Override
    public void apply(String s, TimeWindow window, Iterable input, Collector> out) throws Exception {
        Float next = input.iterator().next();
        // 获取 窗口结束的时候 目的是为了 触发 onTimer 去处排序和 取值问题
        long end = window.getEnd();
        out.collect(new Tuple3(next, s, end));
    }
}


class keywd extends KeyedProcessFunction, List>> {
    ListState> listState = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 在开始前就 初始化 状态 和状态描述器
        ListStateDescriptor> tuple3ListStateDescriptor = new ListStateDescriptor>("top-n", TypeInformation.of(new TypeHint>() {
            @Override
            public TypeInformation> getTypeInfo() {
                return super.getTypeInfo();
            }
        }));
        listState = getRuntimeContext().getListState(tuple3ListStateDescriptor);
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction, List>>.onTimerContext ctx, Collector>> out) throws Exception {
        // 将状态 状装入 集合并且清除 状态集合
        List> list = new ArrayList<>();
        Iterator> iterator = listState.get().iterator();
        while (iterator.hasNext()) {
            Tuple3 next = iterator.next();
            list.add(next);
        }
        listState.clear();
        //排序 取前三
        List> collect = list.stream().sorted(new Comparator>() {
            @Override
            public int compare(Tuple3 o1, Tuple3 o2) {
                return o1.f0 - o2.f0 < 0 ? 1 : -1;
            }
        }).limit(3).collect(Collectors.toList());

        out.collect(collect);

    }


    @Override
    public void processElement(Tuple3 value, KeyedProcessFunction, List>>.Context ctx, Collector>> out) throws Exception {
        //将 数据 装入到集合中
        listState.add(value);
        //注册ontimer 执行时间 就是在 窗口 关闭的 后1 纳秒 就 执行
        ctx.timerService().registerEventTimeTimer(value.f2 + 1);
    }
}

提供的 文本

2020-04-15 08:05,4.00,supplier1
2020-04-15 08:06,4.00,supplier2
2020-04-15 08:07,2.00,supplier1
2020-04-15 08:08,2.00,supplier3
2020-04-15 08:09,5.00,supplier4
2020-04-15 08:11,2.00,supplier3
2020-04-15 08:13,1.00,supplier1
2020-04-15 08:15,3.00,supplier2
2020-04-15 08:17,6.00,supplier5
2020-04-15 08:25,6.00,supplier5
2020-04-15 08:30,6.00,supplier5

注意 需要注意的有以下几点 第一个 引入包的问题 因为 flink 有java scala 两套API 所以在引入的时候一定观察好不要引入错了 而造成 聚合报错
第二个 就是 类型 一般 使用Tuple的时候很容易造成 类型推断问题 在 程序里面参考我做的 处理 基本可以避免此类问题 缺点但就是 写的时候可能繁琐一些

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5160901.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-18
下一篇 2022-11-18

发表评论

登录后才能评论

评论列表(0条)

保存