跳出就是用户成功访问了网站的一个页面后就退出,不在继续访问网站的其它页面。而
跳出率就是用跳出次数除以访问次数。
关注跳出率,可以看出引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间
的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果
首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么
要抓住几个特征:
- 该页面是用户近期访问的第一个页面
这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。
第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的
判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存
在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。
更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的
组合行为呢?
最简单的办法就是 Flink 自带的 CEP 技术。这个 CEP 非常适合通过多条数据组合来识
别某个事件。
用户跳出事件,本质上就是一个组合。
代码实现:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONAware; import com.alibaba.fastjson.JSONObject; import com.atguigu.utils.MyKafkaUtil; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.PatternTimeoutFunction; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.OutputTag; import java.time.Duration; import java.util.List; import java.util.Map; //数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm) //程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> baseLogApp -> kafka -> UserJumpDetailApp -> Kafka public class UserJumpDetailApp { public static void main(String[] args) throws Exception { //TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //生产环境,与Kafka分区数保持一致 //1.1 设置CK&状态后端 //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck")); //env.enableCheckpointing(5000L); //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //env.getCheckpointConfig().setCheckpointTimeout(10000L); //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000); //env.setRestartStrategy(RestartStrategies.fixedDelayRestart()); //TODO 2.读取Kafka主题的数据创建流 String sourceTopic = "dwd_page_log"; String groupId = "userJumpDetailApp"; String sinkTopic = "dwm_user_jump_detail"; DataStreamSourcekafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); //TODO 3.将每行数据转换为JSON对象并提取时间戳生成Watermark SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject) .assignTimestampsAndWatermarks(WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(JSONObject element, long recordTimestamp) { return element.getLong("ts"); } })); //TODO 4.定义模式序列 Pattern pattern = Pattern. begin("start").where(new SimpleCondition () { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }).next("next").where(new SimpleCondition () { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }).within(Time.seconds(10)); //使用循环模式 定义模式序列 Pattern. begin("start").where(new SimpleCondition () { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }) .times(2) .consecutive() //指定严格近邻(next) .within(Time.seconds(10)); //TODO 5.将模式序列作用到流上 PatternStream patternStream = CEP .pattern(jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid")) , pattern); //TODO 6.提取匹配上的和超时事件 OutputTag timeOutTag = new OutputTag ("timeOut") { }; SingleOutputStreamOperator selectDS = patternStream.select(timeOutTag, new PatternTimeoutFunction () { @Override public JSONObject timeout(Map > map, long ts) throws Exception { return map.get("start").get(0); } }, new PatternSelectFunction () { @Override public JSONObject select(Map > map) throws Exception { return map.get("start").get(0); } }); DataStream timeOutDS = selectDS.getSideOutput(timeOutTag); //TODO 7.UNIOn两种事件 DataStream unionDS = selectDS.union(timeOutDS); //TODO 8.将数据写入Kafka unionDS.print(); unionDS.map(JSONAware::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic)); //TODO 9.启动任务 env.execute("UserJumpDetailApp"); } }
流程图:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)