flink 统计uv

flink 统计uv,第1张

flink 统计uv

import com.alibaba.fastjson.JSONObject;
import com.poizon.bigdata.flink.job.baseFlinkJob;
import com.poizon.bigdata.flink.utils.TimeUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;


@Slf4j
public class UnifyStateJobUV4 extends baseFlinkJob {

    private static Logger logger = LoggerFactory.getLogger(UnifyStateJobUV4.class);

    
    public static void main(String[] args) throws Exception {

        ParameterTool parameters = ParameterTool.fromArgs(args);
        Map kvMap = parameters.toMap();

        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.enableCheckpointing(30000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

        env.setStateBackend(new FsStateBackend("file:///Users/wangpengpeng/dewu/checkpoint"));

        DataStreamSource stringDataStreamSource = env.addSource(MyKafkaUtil.getKafkaConsumer("test", "wppp"));

        //TODO 3.将每行数据转换为JSON对象
        OutputTag outputTag = new OutputTag("Dirty") {};


        SingleOutputStreamOperator mapStream = stringDataStreamSource.map(new RichMapFunction() {

            @Override
            public EventBean map(String s) throws Exception {
                EventBean eventBean= JSONObject.parseObject(s, EventBean.class);
                return eventBean;
            }

        }).setParallelism(1);

        mapStream.map(new RichMapFunction() {
            @Override
            public void open(Configuration parameters) throws Exception {
            }

            @Override
            public EventBean map(EventBean eventBean) throws Exception {
                return eventBean;
            }
        }).setParallelism(1);

        SingleOutputStreamOperator process = mapStream
        .keyBy(row-> row.getUid())
        .process(new KeyedProcessFunction() {
            MapState mapState;
            ValueState valueState;

            @Override
            public void  open(Configuration configuration){
                MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("uv", String.class, Integer.class);
                mapState = getRuntimeContext().getMapState(mapStateDescriptor);
                valueState = getRuntimeContext().getState(new ValueStateDescriptor("uv_cnt", Long.class));
            }

            @Override
            public void processElement(EventBean eventBean, Context context, Collector collector) throws Exception {

                try {
                    String dateTime = TimeUtil.getShortDateFormat(eventBean.getTime());
                    String key = dateTime + eventBean.getUid();
                    Integer value = mapState.get(key);
                    logger.info(dateTime +",value:" +value+"," +key);

                    if (value == null  ) {
                        Long currentCnt = valueState.value() == null ? 0L: valueState.value();
                        logger.info("currentCnt" +currentCnt +", valueState.value()" + valueState.value());
                        valueState.update(currentCnt+ 1L);
                    }else {
                        valueState.update(valueState.value());
                    }
                    mapState.put(key, 1);

                    collector.collect(dateTime + "的uv:" + valueState.value());
                } catch (Exception e) {
                    System.out.println(e.getMessage());
                }
            }

        }).setParallelism(1);

        process.print("正常数据");
        
        env.execute("ddd");

    }


}

import lombok.Data;
import lombok.Getter;


@Data
public class EventBean {

    public String uid;
    public Long time;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4828366.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-10
下一篇 2022-11-10

发表评论

登录后才能评论

评论列表(0条)

保存