之前通过分流等处理手段,将数据拆分成了独立的kafka topic,接下来处理数据,我们应该考虑的是将实时计算使用的指标项进行处理,时效性是实时数仓所追求的,所以在一些场景没有必要和离线数仓一样,大而全的中间层,只需要中间层将一些计算指标保存即可,为下次计算使用提供便利。
所以需要考虑一些实时计算的指标需求,把这些指标以主题宽表的形式输出就是dws层
这里列出来一部分指标,主要为服务可视化大屏计算,说了这么多,dwm层怎么还没出现,别急,dwm层主要服务dws层,因为部分需求直接从dwd层到dws层会有一些复杂的计算,而且这些计算的结果很可能为dws层多个主题复用,所以dwd层形成一层dwm,主要涉及到业务
访客UV计算跳出明细计算订单宽表支付宽表 访客UV计算
UV(Unique Visitor),即独立访客,也称为 DAU(DailyActive User),即每日活跃用户
用户行为日志识别当日访客有两点:
- 识别出访客打开的第一个页面,表示这个访客开始进入我们的应用由于访客可以在一天中多次进入应用,所以我们在一天的范围内进行去重
数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> baseLogApp -> kafka -> UniqueVisitApp -> Kafka
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2.读取Kafka dwd_page_log 主题的数据 String groupId = "unique_visit_app_210325"; String sourceTopic = "dwd_page_log"; String sinkTopic = "dwm_unique_visit"; DataStreamSourcekafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); //TODO 3.将每行数据转换为JSON对象 SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject); //TODO 4.过滤数据 状态编程 只保留每个mid每天第一次登陆的数据 KeyedStream keyedStream = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid")); SingleOutputStreamOperator uvDS = keyedStream.filter(new RichFilterFunction () { private ValueState dateState; private SimpleDateFormat simpleDateFormat; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("date-state", String.class); //设置状态的超时时间以及更新时间的方式 StateTtlConfig stateTtlConfig = new StateTtlConfig .Builder(Time.hours(24)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .build(); valueStateDescriptor.enableTimeToLive(stateTtlConfig); dateState = getRuntimeContext().getState(valueStateDescriptor); simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); } @Override public boolean filter(JSONObject value) throws Exception { //取出上一条页面信息 String lastPageId = value.getJSONObject("page").getString("last_page_id"); //判断上一条页面是否为Null if (lastPageId == null || lastPageId.length() <= 0) { //取出状态数据 String lastDate = dateState.value(); //取出今天的日期 String curDate = simpleDateFormat.format(value.getLong("ts")); //判断两个日期是否相同 if (!curDate.equals(lastDate)) { dateState.update(curDate); return true; } } return false; } }); //TODO 5.将数据写入Kafka uvDS.print(); uvDS.map(JSONAware::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic)); //TODO 6.启动任务 env.execute("UniqueVisitApp");
跳出明细计算数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> baseLogApp -> kafka -> UserJumpDetailApp -> Kafka
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //生产环境,与Kafka分区数保持一致 //TODO 2.读取Kafka主题的数据创建流 String sourceTopic = "dwd_page_log"; String groupId = "userJumpDetailApp"; String sinkTopic = "dwm_user_jump_detail"; DataStreamSource订单宽表设计kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); //TODO 3.将每行数据转换为JSON对象并提取时间戳生成Watermark SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject) .assignTimestampsAndWatermarks(WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(JSONObject element, long recordTimestamp) { return element.getLong("ts"); } })); //TODO 4.定义模式序列 Pattern pattern = Pattern. begin("start").where(new SimpleCondition () { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }).next("next").where(new SimpleCondition () { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }).within(Time.seconds(10)); //使用循环模式 定义模式序列 Pattern. begin("start").where(new SimpleCondition () { @Override public boolean filter(JSONObject value) throws Exception { String lastPageId = value.getJSONObject("page").getString("last_page_id"); return lastPageId == null || lastPageId.length() <= 0; } }) .times(2) .consecutive() //指定严格近邻(next) .within(Time.seconds(10)); //TODO 5.将模式序列作用到流上 PatternStream patternStream = CEP .pattern(jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid")) , pattern); //TODO 6.提取匹配上的和超时事件 OutputTag timeOutTag = new OutputTag ("timeOut") { }; SingleOutputStreamOperator selectDS = patternStream.select(timeOutTag, new PatternTimeoutFunction () { @Override public JSONObject timeout(Map > map, long ts) throws Exception { return map.get("start").get(0); } }, new PatternSelectFunction () { @Override public JSONObject select(Map > map) throws Exception { return map.get("start").get(0); } }); DataStream timeOutDS = selectDS.getSideOutput(timeOutTag); //TODO 7.UNIOn两种事件 DataStream unionDS = selectDS.union(timeOutDS); //TODO 8.将数据写入Kafka unionDS.print(); unionDS.map(JSONAware::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(sinkTopic)); //TODO 9.启动任务 env.execute("UserJumpDetailApp"); }
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2.读取Kafka 主题的数据 并转换为JavaBean对象&提取时间戳生成WaterMark String orderInfoSourceTopic = "dwd_order_info"; String orderDetailSourceTopic = "dwd_order_detail"; String orderWideSinkTopic = "dwm_order_wide"; String groupId = "order_wide_group_0325"; SingleOutputStreamOperator支付宽表设计orderInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderInfoSourceTopic, groupId)) .map(line -> { OrderInfo orderInfo = JSON.parseObject(line, OrderInfo.class); String create_time = orderInfo.getCreate_time(); String[] dateTimeArr = create_time.split(" "); orderInfo.setCreate_date(dateTimeArr[0]); orderInfo.setCreate_hour(dateTimeArr[1].split(":")[0]); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); orderInfo.setCreate_ts(sdf.parse(create_time).getTime()); return orderInfo; }).assignTimestampsAndWatermarks(WatermarkStrategy. forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(OrderInfo element, long recordTimestamp) { return element.getCreate_ts(); } })); SingleOutputStreamOperator orderDetailDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderDetailSourceTopic, groupId)) .map(line -> { OrderDetail orderDetail = JSON.parseObject(line, OrderDetail.class); String create_time = orderDetail.getCreate_time(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); orderDetail.setCreate_ts(sdf.parse(create_time).getTime()); return orderDetail; }).assignTimestampsAndWatermarks(WatermarkStrategy. forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(OrderDetail element, long recordTimestamp) { return element.getCreate_ts(); } })); //TODO 3.双流JOIN SingleOutputStreamOperator orderWideWithNoDimDS = orderInfoDS.keyBy(OrderInfo::getId) .intervalJoin(orderDetailDS.keyBy(OrderDetail::getOrder_id)) .between(Time.seconds(-5), Time.seconds(5)) //生成环境中给的时间给最大延迟时间 .process(new ProcessJoinFunction () { @Override public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector out) throws Exception { out.collect(new OrderWide(orderInfo, orderDetail)); } }); //打印测试 orderWideWithNoDimDS.print("orderWideWithNoDimDS>>>>>>>>>"); //TODO 4.关联维度信息 Hbase Phoenix // orderWideWithNoDimDS.map(orderWide -> { // //关联用户维度 // Long user_id = orderWide.getUser_id(); // //根据user_id查询Phoenix用户信息 // //将用户信息补充至orderWide // //地区 // //SKU // //SPU // //。。。 // //返回结果 // return orderWide; // }); //4.1 关联用户维度 SingleOutputStreamOperator orderWideWithUserDS = AsyncDataStream.unorderedWait( orderWideWithNoDimDS, new DimAsyncFunction ("DIM_USER_INFO") { @Override public String getKey(OrderWide orderWide) { return orderWide.getUser_id().toString(); } @Override public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException { orderWide.setUser_gender(dimInfo.getString("GENDER")); String birthday = dimInfo.getString("BIRTHDAY"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); long currentTs = System.currentTimeMillis(); long ts = sdf.parse(birthday).getTime(); long age = (currentTs - ts) / (1000 * 60 * 60 * 24 * 365L); orderWide.setUser_age((int) age); } }, 60, TimeUnit.SECONDS); //打印测试 // orderWideWithUserDS.print("orderWideWithUserDS"); //4.2 关联地区维度 SingleOutputStreamOperator orderWideWithProvinceDS = AsyncDataStream.unorderedWait(orderWideWithUserDS, new DimAsyncFunction ("DIM_base_PROVINCE") { @Override public String getKey(OrderWide orderWide) { return orderWide.getProvince_id().toString(); } @Override public void join(OrderWide orderWide, JSONObject dimInfo) throws ParseException { orderWide.setProvince_name(dimInfo.getString("NAME")); orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE")); orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE")); orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2")); } }, 60, TimeUnit.SECONDS); //4.3 关联SKU维度 SingleOutputStreamOperator orderWideWithSkuDS = AsyncDataStream.unorderedWait( orderWideWithProvinceDS, new DimAsyncFunction ("DIM_SKU_INFO") { @Override public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException { orderWide.setSku_name(jsonObject.getString("SKU_NAME")); orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID")); orderWide.setSpu_id(jsonObject.getLong("SPU_ID")); orderWide.setTm_id(jsonObject.getLong("TM_ID")); } @Override public String getKey(OrderWide orderWide) { return String.valueOf(orderWide.getSku_id()); } }, 60, TimeUnit.SECONDS); //4.4 关联SPU维度 SingleOutputStreamOperator orderWideWithSpuDS = AsyncDataStream.unorderedWait( orderWideWithSkuDS, new DimAsyncFunction ("DIM_SPU_INFO") { @Override public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException { orderWide.setSpu_name(jsonObject.getString("SPU_NAME")); } @Override public String getKey(OrderWide orderWide) { return String.valueOf(orderWide.getSpu_id()); } }, 60, TimeUnit.SECONDS); //4.5 关联TM维度 SingleOutputStreamOperator orderWideWithTmDS = AsyncDataStream.unorderedWait( orderWideWithSpuDS, new DimAsyncFunction ("DIM_base_TRADEMARK") { @Override public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException { orderWide.setTm_name(jsonObject.getString("TM_NAME")); } @Override public String getKey(OrderWide orderWide) { return String.valueOf(orderWide.getTm_id()); } }, 60, TimeUnit.SECONDS); //4.6 关联Category维度 SingleOutputStreamOperator orderWideWithCategory3DS = AsyncDataStream.unorderedWait( orderWideWithTmDS, new DimAsyncFunction ("DIM_base_CATEGORY3") { @Override public void join(OrderWide orderWide, JSONObject jsonObject) throws ParseException { orderWide.setCategory3_name(jsonObject.getString("NAME")); } @Override public String getKey(OrderWide orderWide) { return String.valueOf(orderWide.getCategory3_id()); } }, 60, TimeUnit.SECONDS); orderWideWithCategory3DS.print("orderWideWithCategory3DS>>>>>>>>>>>"); //TODO 5.将数据写入Kafka orderWideWithCategory3DS .map(JSONObject::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(orderWideSinkTopic)); //TODO 6.启动任务 env.execute("OrderWideApp"); }
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2.读取Kafka主题的数据创建流 并转换为JavaBean对象 提取时间戳生成WaterMark String groupId = "payment_wide_group"; String paymentInfoSourceTopic = "dwd_payment_info"; String orderWideSourceTopic = "dwm_order_wide"; String paymentWideSinkTopic = "dwm_payment_wide"; SingleOutputStreamOperatorDWM层总结orderWideDS = env.addSource(MyKafkaUtil.getKafkaConsumer(orderWideSourceTopic, groupId)) .map(line -> JSON.parseObject(line, OrderWide.class)) .assignTimestampsAndWatermarks(WatermarkStrategy. forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(OrderWide element, long recordTimestamp) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { return sdf.parse(element.getCreate_time()).getTime(); } catch (ParseException e) { e.printStackTrace(); return recordTimestamp; } } })); SingleOutputStreamOperator paymentInfoDS = env.addSource(MyKafkaUtil.getKafkaConsumer(paymentInfoSourceTopic, groupId)) .map(line -> JSON.parseObject(line, PaymentInfo.class)) .assignTimestampsAndWatermarks(WatermarkStrategy. forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(PaymentInfo element, long recordTimestamp) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { return sdf.parse(element.getCreate_time()).getTime(); } catch (ParseException e) { e.printStackTrace(); return recordTimestamp; } } })); //TODO 3.双流JOIN SingleOutputStreamOperator paymentWideDS = paymentInfoDS.keyBy(PaymentInfo::getOrder_id) .intervalJoin(orderWideDS.keyBy(OrderWide::getOrder_id)) .between(Time.minutes(-15), Time.seconds(5)) .process(new ProcessJoinFunction () { @Override public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, Context ctx, Collector out) throws Exception { out.collect(new PaymentWide(paymentInfo, orderWide)); } }); //TODO 4.将数据写入Kafka paymentWideDS.print(">>>>>>>>>"); paymentWideDS .map(JSONObject::toJSONString) .addSink(MyKafkaUtil.getKafkaProducer(paymentWideSinkTopic)); //TODO 5.启动任务 env.execute("PaymentWideApp"); }
主要通过计算把一种明细转换为另一种明细来应对后续的统计和计算
利用状态(state)进行去重
利用CEP可以针对一组数据进行筛选判断
学会使用intervaljoin处理流join
处理维度关联,通过缓存和异步查询对性能进行优化
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)