flink实时统计新增数据

flink实时统计新增数据,第1张

flink实时统计新增数据
package com.biaodian.flink.taskregister;

import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.dto.RegisterDto;
import com.biaodian.flink.function.register.*;
import com.biaodian.flink.keyby.RegisterKeyBy;
import com.biaodian.flink.keyby.RegisterSiteKeyBy;
import com.biaodian.flink.tool.CountOrProcessingTimeTrigger;
import com.biaodian.flink.tool.PropertiesTool;
import com.biaodian.flink.tool.PropertiesUtil;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Date;
import java.util.Properties;
import java.util.TimeZone;


public class FlinkRegisterUser {
    public static void main(String[] args) throws Exception {
        //时区设置
        TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
        System.out.println("当前时间=" + new Date());
        //保存点路径
        String path = FlinkRegisterUser.class.getClassLoader().getResource("").getPath();
        //检查点保存路径
        path = Constant.CHECKPOINT_PREFIX + path + Constant.CHECKPOINT_REGISTER_SUFFIX;
        System.out.println("保存点路径=" + path);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(Constant.ENABLE_CHECKPOINTING);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setMinPauseBetweenCheckpoints(Constant.MIN_PAUSE_BETWEEN_CHECKPOINTS);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        checkpointConfig.setCheckpointStorage(path);
        //kafka配置信息
        String topicName = PropertiesTool.getProperty("kafka.register.topic");
        Properties properties = PropertiesUtil.getProperties();
        properties.setProperty("group.id", PropertiesTool.getProperty("kafka.register.topic.group-id"));
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(topicName, new SimpleStringSchema(), properties);
        //kafka数据源
        DataStream source = env.addSource(consumer).uid("kafka");
        //数据转化
        DataStream map = source.map(new RegisterMapFunction()).uid("map");
        //数据过滤 只取今天的数据
        DataStream sourceData = map.filter(new RegisterFilterFunction()).uid("filter");
        //开窗处理数据 platform pvType siteId
        DataStream register = sourceData.keyBy(new RegisterKeyBy())
                .window(TumblingProcessingTimeWindows.of(Time.days(Constant.INTEGER_ONE), Time.hours(-8)))
                .trigger(CountOrProcessingTimeTrigger.of(Time.seconds(Constant.INTEGER_ONE), 1000L))
                .aggregate(new RegisterAggregateFunction(), new RegisterWindowFunction()).uid("register");
        DataStream registerSite = sourceData.keyBy(new RegisterSiteKeyBy())
                .window(TumblingProcessingTimeWindows.of(Time.days(Constant.INTEGER_ONE), Time.hours(-8)))
                .trigger(CountOrProcessingTimeTrigger.of(Time.seconds(Constant.INTEGER_ONE), 1000L))
                .aggregate(new RegisterSiteAggregateFunction(), new RegisterSiteWindowFunction()).uid("registerSite");
        //添加到redis缓存中 siteId
        
        env.execute("registerUserNum");
    }
}
package com.biaodian.flink.function.register;

import cn.hutool.core.date.DateTime;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.constant.PlatFormEnum;
import com.biaodian.flink.constant.PvTypeEnum;
import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.common.functions.MapFunction;

public class RegisterMapFunction implements MapFunction {
    JSonObject jsonObject = null;
    JSonObject data = null;
    RegisterDto dto = null;
    String create_at = null;

    @Override
    public RegisterDto map(String value) throws Exception {
        dto = new RegisterDto();
        jsonObject = JSON.parseObject(value.toString());
        data = jsonObject.getJSonObject("data");
        create_at = new DateTime(data.getDate("created_at").getTime()).toDateStr();
        dto.setSourcePlatform(jsonObject.getString("sourcePlatform"));
        dto.setPlatform(PlatFormEnum.namevalue(data.getString("platform")));
        dto.setPvType(PvTypeEnum.namevalue(data.getString("platform")));
        dto.setSiteId(data.getString("application_merchant_id"));
        dto.setEventType(data.getString("event_type"));
        dto.setUid(data.getString("user_id"));
        dto.setStatisticsDateNum(Integer.parseInt(create_at.replaceAll(Constant.UNDER_LINE, Constant.EMPTY_STRING)));
        dto.setNum(Constant.INTEGER_ONE);
        return dto;
    }
}
package com.biaodian.flink.function.register;

import cn.hutool.core.date.DateUtil;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.common.functions.FilterFunction;

public class RegisterFilterFunction implements FilterFunction {
    Integer dateInt = null;
    @Override
    public boolean filter(RegisterDto value) throws Exception {
        dateInt = Integer.parseInt(DateUtil.date().toDateStr().replaceAll(Constant.UNDER_LINE,Constant.EMPTY_STRING));
        return dateInt.equals(value.getStatisticsDateNum());
    }
}
package com.biaodian.flink.keyby;

import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple6;

public class RegisterKeyBy implements KeySelector> {
    @Override
    public Tuple6 getKey(RegisterDto value) throws Exception {
        return new Tuple6<>(value.getStatisticsDateNum(), value.getSourcePlatform(), value.getPlatform(), value.getPvType(), value.getSiteId(), value.getEventType());
    }
}
package com.biaodian.flink.tool;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

public class CountOrProcessingTimeTrigger extends Trigger {
    private static final long serialVersionUID = 1L;

    private final long interval;
    private final long maxCount;
    
    private final ReducingStateDescriptor stateDesc =
            new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
    private final ReducingStateDescriptor countDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

    private static class Min implements ReduceFunction {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }

    private static class Sum implements ReduceFunction {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }

    private CountOrProcessingTimeTrigger(long interval, long maxCount) {
        this.interval = interval;
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
            throws Exception {
        // 注册窗口结束的触发器, 不需要会自动触发
        // count
        ReducingState count = ctx.getPartitionedState(countDesc);
        //interval
        ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc);
        // 每条数据 counter + 1
        count.add(1L);
        if (count.get() >= maxCount) {
            // 满足条数的触发条件,先清 0 条数计数器
            count.clear();
            // 满足条数时也需要清除时间的触发器,如果不是创建结束的触发器
            if (fireTimestamp.get() != window.maxTimestamp()) {
                ctx.deleteProcessingTimeTimer(fireTimestamp.get());
            }
            fireTimestamp.clear();
            // fire 触发计算
            return TriggerResult.FIRE;
        }

        // 触发之后,下一条数据进来才设置时间计数器注册下一次触发的时间
        timestamp = ctx.getCurrentProcessingTime();
        if (fireTimestamp.get() == null) {
            long nextFireTimestamp = timestamp + interval;
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
            throws Exception {
        // count
        ReducingState count = ctx.getPartitionedState(countDesc);
        //interval
        ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc);

        // time trigger and window end
        if (time == window.maxTimestamp()) {
            count.clear();
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE_AND_PURGE;
            // 窗口结束,清0条数和时间的计数器
            
        } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
            // 时间计数器触发,清0条数和时间计数器
            count.clear();
            fireTimestamp.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        // State could be merged into new window.
        ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc);
        Long timestamp = fireTimestamp.get();
        if (timestamp != null) {
            ctx.deleteProcessingTimeTimer(timestamp);
            fireTimestamp.clear();
        }
        ctx.getPartitionedState(countDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, onMergeContext ctx) throws Exception {
        // States for old windows will lose after the call.
        ctx.mergePartitionedState(stateDesc);

        // Register timer for this new window.
        Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
        if (nextFireTimestamp != null) {
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
        }
        ctx.mergePartitionedState(countDesc);
    }

    @VisibleForTesting
    public long getInterval() {
        return interval;
    }

    @Override
    public String toString() {
        return "ContinuousProcessingTimeTrigger(" + interval + ")";
    }

    
    public static  CountOrProcessingTimeTrigger of(Time interval, Long count) {
        return new CountOrProcessingTimeTrigger(interval.toMilliseconds(), count);
    }
}
package com.biaodian.flink.function.register;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.dto.RegisterDto;
import com.biaodian.flink.tool.RedisUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.redisson.api.RBloomFilter;

import java.util.concurrent.TimeUnit;


public class RegisterAggregateFunction implements AggregateFunction {
    private RBloomFilter registerUserNum = null;
    private String dateStr = null;
    private DateTime dateTime = null;

    @Override
    public Integer createAccumulator() {
        dateTime = DateUtil.date();
        dateStr = dateTime.toDateStr().replaceAll(Constant.UNDER_LINE, Constant.EMPTY_STRING);
        registerUserNum = RedisUtil.redisson.getBloomFilter(dateStr + "registerUserNum");
        registerUserNum.tryInit(Constant.BLOOM_REGISTER_SIZE, 0.000001);
        registerUserNum.expire(Constant.BLOOM_ACTIVE_SECOND, TimeUnit.SECONDS);
        return 0;
    }

    @Override
    public Integer add(RegisterDto value, Integer accumulator) {
        String key = value.getPlatform() + value.getPvType() + value.getSiteId() + value.getEventType() + value.getUid();
        if (null == registerUserNum){
            dateTime = DateUtil.date();
            dateStr = dateTime.toDateStr().replaceAll(Constant.UNDER_LINE, Constant.EMPTY_STRING);
            registerUserNum = RedisUtil.redisson.getBloomFilter(dateStr + "registerUserNum");
            registerUserNum.tryInit(Constant.BLOOM_ORDER_SIZE, 0.000001);
            registerUserNum.expire(Constant.BLOOM_ACTIVE_SECOND, TimeUnit.SECONDS);
        }
        boolean contains = registerUserNum.contains(key);
        if (contains) {
            return accumulator;
        }
        RedisUtil.redisson.getMap(value.getStatisticsDateNum() + Constant.colon +
                value.getSourcePlatform() + Constant.colon +
                value.getSiteId() + Constant.colon +
                value.getPlatform() + Constant.colon +
                value.getPvType()).addAndGet(Constant.REGISTER_USER_NUM,Constant.INTEGER_ONE);
        registerUserNum.add(key);
        return 1 + accumulator;
    }

    @Override
    public Integer getResult(Integer accumulator) {
        return accumulator;
    }

    @Override
    public Integer merge(Integer a, Integer b) {
        return a + b;
    }
}
 
package com.biaodian.flink.function.register;

import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class RegisterWindowFunction implements WindowFunction, TimeWindow> {

    @Override
    public void apply(Tuple6 tuple6, TimeWindow window, Iterable input, Collector out) throws Exception {

    }
}
package com.biaodian.flink.function.register;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.biaodian.flink.constant.Constant;
import com.biaodian.flink.dto.RegisterDto;
import com.biaodian.flink.tool.RedisUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.redisson.api.RBloomFilter;

import java.util.concurrent.TimeUnit;


public class RegisterSiteAggregateFunction implements AggregateFunction {
    private RBloomFilter registerSiteUserNum = null;
    private String dateStr = null;
    private DateTime dateTime = null;

    @Override
    public Integer createAccumulator() {
        dateTime = DateUtil.date();
        dateStr = dateTime.toDateStr().replaceAll(Constant.UNDER_LINE, Constant.EMPTY_STRING);
        registerSiteUserNum = RedisUtil.redisson.getBloomFilter(dateStr + "registerSiteUserNum");
        registerSiteUserNum.tryInit(Constant.BLOOM_REGISTER_SIZE, 0.000001);
        registerSiteUserNum.expire(Constant.BLOOM_ACTIVE_SECOND, TimeUnit.SECONDS);
        return 0;
    }

    @Override
    public Integer add(RegisterDto value, Integer accumulator) {
        String key = value.getSiteId() + value.getEventType() + value.getUid();
        if (null == registerSiteUserNum){
            dateTime = DateUtil.date();
            dateStr = dateTime.toDateStr().replaceAll(Constant.UNDER_LINE, Constant.EMPTY_STRING);
            registerSiteUserNum = RedisUtil.redisson.getBloomFilter(dateStr + "registerSiteUserNum");
            registerSiteUserNum.tryInit(Constant.BLOOM_ORDER_SIZE, 0.000001);
            registerSiteUserNum.expire(Constant.BLOOM_ACTIVE_SECOND, TimeUnit.SECONDS);
        }
        boolean contains = registerSiteUserNum.contains(key);
        if (contains) {
            return accumulator;
        }
        RedisUtil.redisson.getMap(value.getStatisticsDateNum() + Constant.colon +
                value.getSourcePlatform() + Constant.colon +
                value.getSiteId())
                .addAndGet(Constant.REGISTER_USER_NUM,Constant.INTEGER_ONE);
        registerSiteUserNum.add(key);
        return 1 + accumulator;
    }

    @Override
    public Integer getResult(Integer accumulator) {
        return accumulator;
    }

    @Override
    public Integer merge(Integer a, Integer b) {
        return a + b;
    }
}
 
package com.biaodian.flink.function.register;

import com.biaodian.flink.dto.RegisterDto;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class RegisterSiteWindowFunction implements WindowFunction, TimeWindow> {
    @Override
    public void apply(Tuple4 tuple4, TimeWindow window, Iterable input, Collector out) throws Exception {

    }
}
package com.biaodian.flink.tool;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class PropertiesTool {
    private static final Properties properties = new Properties();
    static {
        InputStream inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application.properties");
        try {
            properties.load(inputStream);
            if ("dev".equals(properties.getProperty("spring.profiles.active"))){
                inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application-dev.properties");
            }else {
                inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application-prod.properties");
            }
            properties.load(inputStream);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private PropertiesTool(){}

    
    public static String getProperty(String key){
        return properties.getProperty(key);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705765.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)