Flink timer定时器使用

Flink timer定时器使用,第1张

Flink timer定时器使用 窗口函数的问题

工作中使用到了窗口,进行阶段数据的统计。对于大部分场景是能够满足的,但是还是发现了一些问题。比如,窗口内有数据,却无法触发计算。此处以翻滚时间窗口,事件时间取处理时间为例进行说明,窗口触发的条件都是:
① water_mark时间 >= window_end_time;

② 窗口内有数据;

首先确定窗口内有数据,第②个条件满足。但是需要格外注意第①个条件,就是water_mark时间。water_mark时间的更新,需要后续有数据进来。如果后续没有数据进行,water_mark就无法更新,自然也就无法触发窗口的计算,窗口内的数据就无法被计算到。

定时器的优势

只要函数中正确注册了定时器,比如当前时间+5s,那么在后面不管有没有数据,5s之后,定时器都会触发,此时,在对应的方法中,实现自己的逻辑,就可以保证,数据得到计算,不会发生窗口内部分数据无法得到计算的问题。

定时器测试代码
package com.ywwl.ywdc.live.main;

import com.google.common.base.Strings;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

public class MyTimerTest {

    public static void main(String[] args) throws Exception {

        System.setProperty("HADOOP_USER_NAME", "hdfs");

        Config config = ConfigFactory.load(LiveOrderForLocal.class.getClassLoader());

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 禁用全局任务链
        env.disableOperatorChaining();
        env.setParallelism(1);

        String brokers = config.getString("kafka.brokers");
        String topic = "simple_test";
        String groupId = config.getString("kafka.groupId");
        String checkPointPath = config.getString("check.point.path.prefix") + topic;

        StateBackend backend = new EmbeddedRocksDBStateBackend(true);
        env.setStateBackend(backend);

        CheckpointConfig conf = env.getCheckpointConfig();
        // 任务流取消和故障应保留检查点
        conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        conf.setCheckpointInterval(10000);//milliseconds
        conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds
        conf.setMinPauseBetweenCheckpoints(10 * 1000);
        conf.setCheckpointStorage(checkPointPath);

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", brokers);
        props.setProperty("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", 1000);
        props.put("session.timeout.ms", 90000);
        props.put("request.timeout.ms", 120000);
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 100);

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props);
        consumer.setCommitOffsetsonCheckpoints(true);
        DataStream source = env.addSource(consumer);

        DataStream> word2One = source.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    if (!Strings.isNullOrEmpty(word)) {
                        out.collect(new Tuple2<>(word, 1L));
                    }
                }
            }
        });

        KeyedStream, String> keyedWordDataStream = word2One.keyBy(new KeySelector, String>() {
            @Override
            public String getKey(Tuple2 value) throws Exception {
                return value.f0;
            }
        });

        DataStream resultDataStream = keyedWordDataStream.process(new MyProcess());

        resultDataStream.print();

        env.execute("Test");
    }

    public static class MyProcess extends KeyedProcessFunction, String> {

        ValueState timerState;

        MapState wordCountState;

        MapState testMapState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ValueStateDescriptor timerStateDesc = new ValueStateDescriptor("timerStateDesc", Long.class);
            timerState = getRuntimeContext().getState(timerStateDesc);

            MapStateDescriptor wordCountStateDesc = new MapStateDescriptor("wordCountStateDesc", String.class, Long.class);
            wordCountState = getRuntimeContext().getMapState(wordCountStateDesc);

            MapStateDescriptor testMapStateDesc = new MapStateDescriptor("testMapStateDesc", String.class, String.class);
            testMapState = getRuntimeContext().getMapState(testMapStateDesc);
        }

        @Override
        public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {

            // 初次使用没初始化时,下面三个的值都是null
            Long timerStatevalue = timerState.value();
            Long aLong = wordCountState.get(value.f0);
            String tttt = testMapState.get("tttt");

            // 判断定时器是否为空,为空的话,则创建新的定时器
            if (timerState.value() == null) {
                timerState.update(ctx.timestamp() + 5 * 1000L);
                ctx.timerService().registerProcessingTimeTimer(timerState.value());
            }
            String word = value.f0;
            Long count = wordCountState.contains(word) ? wordCountState.get(word) : 0L;
            count ++;
            wordCountState.put(word, count);
        }

        @Override
        public void onTimer(long timestamp, onTimerContext ctx, Collector out) throws Exception {
            // 定时器触发,结果输出
            Iterator> iterator = wordCountState.iterator();
            if (iterator.hasNext()) {
                Map.Entry next = iterator.next();
                String key = next.getKey();
                Long value = next.getValue();
                out.collect(key + " : " + value);
            }
            // 清空状态
            wordCountState.clear();
            timerState.clear();
        }
    }
}

逻辑:从kafka中读取字符串,空格分隔;统计5s之内的词频。

以上内容仅供参考,可能个人理解会有些许偏差,欢迎斧正!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5717155.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存