Flink中实现自定义ProcessFunction实现定时器、侧输出

Flink中实现自定义ProcessFunction实现定时器、侧输出,第1张

Flink中实现自定义ProcessFunction实现定时器、侧输出

在Flink中,当我们需要获取到算子的Processing Time或者Water Mark以及定时器时,可以实现ProcessFunction函数。
目前该函数主要有:K恶业的ProcessFuntion,ProcessFunction,CoPropcessFunction等,核心功能主要如下:

可以使用状态计算,能够在算子中访问Keyed State可以设置定时器侧输出,可以将一部分数据发送到另外一个数据流中,而且输出的两个数据流数据类型可以不一样。

如下自定义实现一个KeyedProcessFunction:

public class MyKeyedProcessFunctionJava extends KeyedProcessFunction {

    private ValueState currentTime;
    private ValueState lastPrice ;
    private static long  intervalMs = 500 ;

    OutputTag highLevel ;
    OutputTag middleLevel;
    OutputTag lowLevel;

    public MyKeyedProcessFunctionJava(OutputTag highLevel,
                                      OutputTag middleLevel,
                                      OutputTag lowLevel){
        this.lowLevel = lowLevel;
        this.middleLevel= middleLevel;
        this.highLevel = highLevel;

    }
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        currentTime = getRuntimeContext().getState(new ValueStateDescriptor("currentTime",Long.class));
        lastPrice = getRuntimeContext().getState(new ValueStateDescriptor("lastPrice",Double.class));
    }

    @Override
    public void processElement(StockPrice stockPrice, Context context, Collector collector) throws Exception {
        double price = 0;
        if(lastPrice.value() != null) {
            price = lastPrice.value() ;
        }
        long currentTimeStamp = 0;
        if(currentTime.value() != null){
            currentTimeStamp = currentTime.value();
        }
        if(price < stockPrice.getPrice()){
            context.timerService().deleteEventTimeTimer(currentTimeStamp);
        }else{
            long time = context.timestamp()+intervalMs;
            context.timerService().registerEventTimeTimer(time);
            currentTime.update(time);

        }
        lastPrice.update(stockPrice.getPrice());
        if(price>10000){
            context.output(highLevel,stockPrice);
        }else if(price<= 10000 && price > 3000){
            context.output(middleLevel,stockPrice);
        }else{
            context.output(lowLevel,stockPrice);
        }


    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyy-MM-dd HH:mm:ss");
        String timeStr = format.format(timestamp);
        String warnings = String.format("warnings: time=%s ,key=%s increased",timeStr,ctx.getCurrentKey());
        out.collect(warnings);
    }

    public static void main(String[] args) {
        String topic = "test001";
        Properties kafkaProps = new Properties();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, new MyKafkaDeserializationSchema(), kafkaProps);

        OutputTag highLevel = new OutputTag("highLevel");
        OutputTag middleLevel = new OutputTag("middleLevel");
        OutputTag lowLevel = new OutputTag("lowLevel");

        SingleOutputStreamOperator warning = env.addSource(consumer)
                .keyBy(stockPrice -> stockPrice.getId())
        .process(new MyKeyedProcessFunctionJava(highLevel,middleLevel,lowLevel));

        DataStream highLevelStream = warning.getSideOutput(highLevel);
        DataStream middleLevelStream = warning.getSideOutput(middleLevel);
        DataStream lowLevelStream = warning.getSideOutput(lowLevel);





    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700645.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存