基于Flink实时数仓——DWM 层-跳出明细计算(3.2)

基于Flink实时数仓——DWM 层-跳出明细计算(3.2),第1张

基于Flink实时数仓——DWM 层-跳出明细计算(3.2) 什么是跳出?

跳出就是用户成功访问了网站的一个页面后就退出,不在继续访问网站的其它页面。而
跳出率就是用跳出次数除以访问次数。
关注跳出率,可以看出引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间
的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果

计算跳出行为的思路

首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么
要抓住几个特征:

    该页面是用户近期访问的第一个页面
    这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。

第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的
判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存
在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。
更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的
组合行为呢?
最简单的办法就是 Flink 自带的 CEP 技术。这个 CEP 非常适合通过多条数据组合来识
别某个事件。
用户跳出事件,本质上就是一个组合。
代码实现:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;
import java.util.List;
import java.util.Map;

//数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm)
//程  序:mockLog -> Nginx -> Logger.sh  -> Kafka(ZK)  -> baseLogApp -> kafka -> UserJumpDetailApp -> Kafka
public class UserJumpDetailApp {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境,与Kafka分区数保持一致

        //1.1 设置CK&状态后端
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
        //env.enableCheckpointing(5000L);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());

        //TODO 2.读取Kafka主题的数据创建流
        String sourceTopic = "dwd_page_log";
        String groupId = "userJumpDetailApp";
        String sinkTopic = "dwm_user_jump_detail";
        DataStreamSource kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

        //TODO 3.将每行数据转换为JSON对象并提取时间戳生成Watermark
        SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject)
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner(new SerializableTimestampAssigner() {
                            @Override
                            public long extractTimestamp(JSONObject element, long recordTimestamp) {
                                return element.getLong("ts");
                            }
                        }));

        //TODO 4.定义模式序列
        Pattern pattern = Pattern.begin("start").where(new SimpleCondition() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).next("next").where(new SimpleCondition() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        }).within(Time.seconds(10));

        //使用循环模式  定义模式序列
        Pattern.begin("start").where(new SimpleCondition() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                String lastPageId = value.getJSONObject("page").getString("last_page_id");
                return lastPageId == null || lastPageId.length() <= 0;
            }
        })
                .times(2)
                .consecutive() //指定严格近邻(next)
                .within(Time.seconds(10));

        //TODO 5.将模式序列作用到流上
        PatternStream patternStream = CEP
                .pattern(jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
                        , pattern);

        //TODO 6.提取匹配上的和超时事件
        OutputTag timeOutTag = new OutputTag("timeOut") {
        };
        SingleOutputStreamOperator selectDS = patternStream.select(timeOutTag,
                new PatternTimeoutFunction() {
                    @Override
                    public JSONObject timeout(Map> map, long ts) throws Exception {
                        return map.get("start").get(0);
                    }
                }, new PatternSelectFunction() {
                    @Override
                    public JSONObject select(Map> map) throws Exception {
                        return map.get("start").get(0);
                    }
                });
        DataStream timeOutDS = selectDS.getSideOutput(timeOutTag);

        //TODO 7.UNIOn两种事件
        DataStream unionDS = selectDS.union(timeOutDS);

        //TODO 8.将数据写入Kafka
        unionDS.print();
        unionDS.map(JSONAware::toJSONString)
                .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        //TODO 9.启动任务
        env.execute("UserJumpDetailApp");

    }

}

流程图:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5704797.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存