Flink入门第十七课:PageView(pv)和UniqueVisitor(uv)的统计

Flink入门第十七课:PageView(pv)和UniqueVisitor(uv)的统计,第1张

Flink入门第十七课:PageView(pv)和UniqueVisitor(uv)的统计 数据文件:
用到的数据文件
链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 
提取码:2hmu
输入bean
package com.atguigu.networkflow_analysis.beans;

public class UserBehavior {
    public Long userId;
    public Long itemId;
    public Integer categoryId;
    public String behavior;
    public Long timestamp;

    public UserBehavior() {
    }

    public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timestamp = timestamp;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public Long getItemId() {
        return itemId;
    }

    public void setItemId(Long itemId) {
        this.itemId = itemId;
    }

    public Integer getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(Integer categoryId) {
        this.categoryId = categoryId;
    }

    public String getBehavior() {
        return behavior;
    }

    public void setBehavior(String behavior) {
        this.behavior = behavior;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + ''' +
                ", timestamp=" + timestamp +
                '}';
    }
}
输出bean
package com.atguigu.networkflow_analysis.beans;

public class PageViewCount {
    private String url;
    private Long windowEnd;
    private Long count;

    public PageViewCount() {
    }

    public PageViewCount(String url, Long windowEnd, Long count) {
        this.url = url;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    public String getUrl() {
        return url;
    }

    public Long getWindowEnd() {
        return windowEnd;
    }

    public Long getCount() {
        return count;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setWindowEnd(Long windowEnd) {
        this.windowEnd = windowEnd;
    }

    public void setCount(Long count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return "PageViewCount{" +
                "url='" + url + ''' +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }
}
PageView(pv)统计
package com.atguigu.networkflow_analysis.Ahotpages;

import com.atguigu.networkflow_analysis.beans.PageViewCount;
import com.atguigu.networkflow_analysis.beans.UserBehavior;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.net.URL;
import java.util.Random;


public class PageView {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        URL resource = PageView.class.getResource("/UserBehavior.csv");//读取文件
        DataStreamSource inputStream = env.readTextFile(resource.getPath());

        DataStream dataStream=inputStream.map(
                line ->{
                    String [] words=line.split(",");
                    return new UserBehavior(new Long(words[0]),new Long(words[1]),new Integer(words[2]),new String(words[3]),new Long(words[4]));
                })
                .assignTimestampsAndWatermarks(
                        new AscendingTimestampExtractor() { //升序
                            @Override
                            public long extractAscendingTimestamp(UserBehavior userBehavior) {//获取事件时间戳,秒级转毫秒级
                                return userBehavior.getTimestamp()*1000L;
                            }
                        });

        //分组聚合得到结果数据
        SingleOutputStreamOperator pvCountStream = dataStream
                .filter(data -> "pv".equals(data.getBehavior())) //过滤“pv”行为
                .map(new MapFunction>() {//创建[1,4]随机key,让数据均匀遍布4个分区

                    @Override
                    public Tuple2 map(UserBehavior value) throws Exception {
                        Random random = new Random();
                        return new Tuple2<>(random.nextInt(4)+1,1L);
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow(Time.minutes(60)) //每1小时更新一次窗口数据
                .aggregate(new PvCountAgg(),new PvCountResult());

        SingleOutputStreamOperator countPvResult = pvCountStream
                .keyBy(PageViewCount::getWindowEnd)
                .process(new TotalPvCount());

        //输出并执行
        countPvResult.print();
        env.execute("hot items analysis");

    }

    //泛型1:输入类型   泛型2:聚合状态类型   泛型3:输出类型
    public static class PvCountAgg implements AggregateFunction,Long,Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Tuple2 value, Long accumulator) {
            return accumulator+1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a+b;
        }
    }

    //参数1:输入类型  参数2:输出类型  参数3:keyBy的返回值键值对中value的类型  参数4: 窗口类型
    public static class PvCountResult implements WindowFunction {

        @Override
        public void apply(Integer integer, TimeWindow window, Iterable iterable, Collector collector) throws Exception {
            collector.collect(new PageViewCount(integer.toString(),window.getEnd(),iterable.iterator().next()));
        }
    }

    //参数1:keyBy返回值类型  参数2:输入类型  参数3:输出类型
    public static class TotalPvCount extends KeyedProcessFunction {
        ValueState  totalCountState; //保存当前总的count值

        @Override
        public void open(Configuration parameters) throws Exception {
            totalCountState=getRuntimeContext().getState(new ValueStateDescriptor("total_count",Long.class,0L));
        }

        @Override
        public void processElement(PageViewCount pageViewCount, Context context, Collector collector) throws Exception {
            totalCountState.update(pageViewCount.getCount()+totalCountState.value()); //count致叠加
            context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd());//注册定时器
        }

        @Override
        public void onTimer(long timestamp, onTimerContext ctx, Collector out) throws Exception {
            Long totalCount = totalCountState.value();
            out.collect(new PageViewCount("pv_count",ctx.getCurrentKey(),totalCount));

            //清空状态
            totalCountState.clear();
        }

        @Override
        public void close() throws Exception {
            totalCountState.clear();
        }
    }
}
UniqueVisitor(uv)统计
package com.atguigu.networkflow_analysis.Ahotpages;

import com.atguigu.networkflow_analysis.beans.PageViewCount;
import com.atguigu.networkflow_analysis.beans.UserBehavior;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.net.URL;


public class UniqueVisitorVersion2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        URL resource = UniqueVisitorVersion1.class.getResource("/UserBehavior.csv");//读取文件
        DataStreamSource inputStream = env.readTextFile(resource.getPath());

        DataStream dataStream=inputStream.map(
                line ->{
                    String [] words=line.split(",");
                    return new UserBehavior(new Long(words[0]),new Long(words[1]),new Integer(words[2]),new String(words[3]),new Long(words[4]));
                })
                .assignTimestampsAndWatermarks(
                        new AscendingTimestampExtractor() { //升序
                            @Override
                            public long extractAscendingTimestamp(UserBehavior userBehavior) {//获取事件时间戳,秒级转毫秒级
                                return userBehavior.getTimestamp()*1000L;
                            }
                        });

        //处理
        SingleOutputStreamOperator uvCountStream = dataStream
                .filter(data -> "pv".equals(data.getBehavior()))
                .timeWindowAll(Time.minutes(60))
                .trigger(new MyTrigger())
                .process(new UvCountResultWithBloomFilter());
        //执行
        uvCountStream.print();
        env.execute("统计uv");
    }

    
    public static class MyTrigger extends Trigger{
        //每来一个元素触发一次
        @Override
        public TriggerResult onElement(UserBehavior userBehavior, long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
            return TriggerResult.FIRE_AND_PURGE; //即每来一条数据触发窗口计算又清空窗口数据
        }
        //在处理时间上自定义触发
        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;//什么都不干
        }
        //在事件时间上触发
        @Override
        public TriggerResult onEventTime(long l, TimeWindow window, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;//什么都不干
        }
        //主要用来清除一些自定义状态
        @Override
        public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception {
        }
    }

    
    public static class MyBloomFilter{
        private Integer cap;//位图的大小,一般需要时2的倍数

        public MyBloomFilter(Integer cap){
            this.cap=cap;
        }

        
        public Long hashCode(String value,Integer seed){
            Long result=1L;
            for(int i=0;i {
        private Jedis jedis;
        private MyBloomFilter mybloomFilter;

        @Override
        public void open(Configuration parameters) throws Exception {
            jedis=new Jedis("localhost",6379);
            mybloomFilter=new MyBloomFilter(1<<29);//约为1亿位
        }

        @Override
        public void process(Context context, Iterable iterable, Collector collector) throws Exception {
            Long windowEnd = context.window().getEnd();
            String bitMapKey = windowEnd.toString();

            String countHashName="uv_count";
            String countKey=windowEnd.toString();

            //取出当前useId
            Long userId = iterable.iterator().next().userId;
            //计算出当前userId在位图中的offset
            Long offset = mybloomFilter.hashCode(userId.toString(), 61);
            //用redis的getbit命令,判断对应位置上的值
            Boolean isExist = jedis.getbit(bitMapKey, offset);

            if(!isExist){
                //如果对应offset上没有值,则对应位置设为1
                jedis.setbit(bitMapKey,offset,true);

                //更新redis保存的count值
                Long uvCount=0L;
                String uvCountString = jedis.hget(countHashName, countKey);//获取count值
                if(uvCountString !=null && !"".equals(uvCountString)){
                    uvCount = Long.valueOf(uvCountString);
                }
                jedis.hset(countHashName,countKey,String.valueOf(uvCount+1L));//设置新的count值

                //输出
                collector.collect(new PageViewCount("uv",windowEnd,uvCount+1L));
            }
        }

        @Override
        public void close() throws Exception {
            jedis.close();
        }
    }

}

依赖


    4.0.0

    com.atguigu
    UserBehaviorAnalysis
    pom
    1.0-SNAPSHOT
    
        HotItemsAnalysis
        NetworkFlowAnalysis
        KafkaDemo
        JavaReview
    
    
    
        1.10.1
        2.11
        2.2.0
    
    
        
            
                org.apache.flink
                flink-clients_${scala.binary.version}
                ${flink.version}
            
            
                org.apache.flink
                flink-java
                ${flink.version}
            
            
                org.apache.flink
                flink-streaming-java_${scala.binary.version}
                ${flink.version}
            
            
                org.apache.kafka
                kafka_${scala.binary.version}
                ${kafka.version}
            
            
                org.apache.flink
                flink-connector-kafka_${scala.binary.version}
                ${flink.version}
            
            
            
                org.apache.flink
                flink-table-planner-blink_${scala.binary.version}
                ${flink.version}
            
            
            
                org.apache.flink
                flink-table-planner_${scala.binary.version}
                ${flink.version}
            
            
            
                org.apache.flink
                flink-csv
                ${flink.version}
            
            
            
                org.apache.flink
                flink-connector-kafka-0.11_${scala.binary.version}
                ${flink.version}
            
            
                org.apache.flink
                flink-connector-redis_${scala.binary.version}
                1.1.5
            
            
            
                org.apache.flink
                flink-connector-elasticsearch6_${scala.binary.version}
                ${flink.version}
            
            
            
                mysql
                mysql-connector-java
                5.1.44
            
            
            
                org.apache.kafka
                kafka-clients
                0.11.0.0
            
            
            
                redis.clients
                jedis
                2.8.1
            
        
    
    
        
            
                maven-compiler-plugin
                
                    1.8
                    1.8
                    UTF-8
                
            
        
    

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4008667.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-22
下一篇 2022-10-22

发表评论

登录后才能评论

评论列表(0条)

保存