之前我们已经通过动态分流把数据分到了我们想要的位置,为了方便后续内容的讲解方便,所以接下来我们可以把配置表的信息进行导入了,然后通过动态分流的方法,把数据发往对应的kafka主题或者是hbase的维度表中:
//配置信息表: CREATE TABLE `table_process` ( `source_table` varchar(200) NOT NULL COMMENT '来源表', `operate_type` varchar(200) NOT NULL COMMENT ' *** 作类型 insert,update,delete', `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka', `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)', `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段', `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段', `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展', PRIMARY KEY (`source_table`,`operate_type`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 //来源表就是指是事实表还是维度表; //对于kafka,输出表就是主题名;对于Hbase,输出表就是对应的表; //输出字段就是指给定需要输出的字段 //主键字段就是指输出的主键字段(比如Hbase表的rowKey) //建表扩展就是指ENGINE=InnoDB DEFAULT CHARSET=utf8语句;
先清空原来配置表table_process中的数据,建表成功之后,开启业务数据和对应的IDEA程序,生成对应的表:
分流 Sink 之保存业务数据到 Kafka 主题//配置表完成以后,对应的主题就在kafka中生成完毕; //类型有: dwd_cart_info dwd_comment_info dwd_coupon_use dwd_display_log dwd_favor_info dwd_order_detail dwd_order_detail_activity dwd_order_detail_coupon dwd_order_info dwd_order_info_update dwd_order_refund_info dwd_page_log dwd_payment_info dwd_refund_payment dwd_start_log分流Sink之保存维度数据到Hbase主题:
GMALL0709_REALTIME | DIM_ACTIVITY_INFO | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_ACTIVITY_RULE | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_ACTIVITY_SKU | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_base_CATEGORY1 | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_base_CATEGORY2 | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_base_CATEGORY3 | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_base_DIC | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_base_PROVINCE | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_base_REGION | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_base_TRADEMARK | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_COUPON_INFO | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_COUPON_RANGE | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_FINANCIAL_SKU_COST | TABLE | | | | | | false | null | | | GMALL0709_REALTIME | DIM_SKU_INFO | TABLE | | | | | | false | 4 | | | GMALL0709_REALTIME | DIM_SPU_INFO | TABLE | | | | | | false | 3 | | | GMALL0709_REALTIME | DIM_USER_INFO | TABLE | | | | | | false | 3 | +------------+---------------------+-------------------------+---------------+----------+------------+----------------------------+-----------
我们应该能在kafka和Phoenix中看到这些表信息,尽管这些表并不是现在使用,但生成之后能更好的看出来动态分流之后的效果,这些主题或者维度表的内容对应的就是Sql数据库中gmall0709中的各种业务表的数据内容;(其中各个表的数据已经根据配置表中的字段名和信息进行了过滤,也即这些表的信息和MySql数据库中的业务数据是保持一致的)
DWM层业务设计得到了日志分离数据、维度数据和事实数据后,基本上就完成了ODS、DWD、DIM层的设计,接下来就是DWM层的设计了,那么为什么需要DWM这么一个中间层呢?
以上是DWS层的主题宽表的一个设计思路,那么首先我们需要把所需要的维度表和事实表都制作完毕,然后再开始维度宽表的制作(这里涉及到维度建模理论,简单来讲这是OLAP不同于OLTP的一个明显特征,OLTP通过范式建模,而OLAP则通过维度建模来形成主题宽表,从而完成后续的数据分析工作)。DWM层的作用就是建立在DWD、DWS层之间的一个过渡层,所以这一层不是必要的,只是为了更进一步地区分业务数据,如果业务不是很复杂,也可以直接从dwd层把数据发往DWS层;结构如下所示:
需求分析与思路
//UV,全称是 Unique Visitor,即独立访客,对于实时计算中,也可以称为 DAU(Daily Active User),即每日活跃用户,因为实时计算中的 uv 通常是指当日的访客数。
那么如何从用户行为日志中识别出当日的访客,那么有两点:
➢ 其一,是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用
➢ 其二,由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重
//核心过滤思路: ➢ 首先用 keyby 按照 mid 进行分组,每组表示当前设备的访问情况 ➢ 分组后使用 keystate 状态,记录用户进入时间,实现 RichFilterFunction 完成过滤 ➢ 重写 open 方法用来初始化状态 ➢ 重写 filter 方法进行过滤 ◼ 可以直接筛掉 last_page_id 不为空的字段,因为只要有上一页,说明这条不是这个用户进入的首个页面。 ◼ 状态用来记录用户的进入时间,只要这个 lastVisitDate 是今天,就说明用户今天已经访问过了所以筛除掉。如果为空或者不是今天,说明今天还没访问过,则保留。 ◼ 因为状态值主要用于筛选是否今天来过,所以这个记录过了今天基本上没有用了,这里 enableTimeToLive 设定了 1 天的过期时间,避免状态过大。 //从dwd_page_log中获取数据; //Flink消费dwd_page_log主题中的数据并作uv计算; //处理后的数据发送到dwm_unique_visit主题中; //测试1:是否能读取到动态分流后的数据; //➢ 启动 log.sh、zk、kafka //➢ 运行 Idea 中的 baseLogApp //➢ 运行 Idea 中的 UniqueVisitApp //➢ 查看控制台输出 //➢ 执行流程 //jsonObjDS.print("---->"); //测试2: ➢ 启动 log.sh、zk、kafka ➢ 运行 Idea 中的 baseLogApp ➢ 运行 Idea 中的 UniqueVisitApp ➢ 查看控制台输出以及 kafka 的 dwm_unique_visit 主题 ➢ 执行流程 //filteredDS.print(">>>>>"); 以下是我在第一次运行时遇到的一个问题,如果大家看的不是我的代码,很可能会遇到这个问题,这里做一个提醒,这里实际上是脚本生成数据的一个问题,修改后的方案是:如果不是启动日志,那就直接输出到主流(页面日志),然后再在页面日志中去找曝光日志; //此时的结果应该是 baseLogApp控制台有数据输出,而UniqueVisitApp控制台没有数据输出; //从前面可以直到dwd_page_log中是有数据的,所以数据出错是在dwd_page_log --> UniqueVisitApp的过程中;debug测试; //发现是因为每条数据都有一个"last_page_id"属性,导致数据都被过滤掉了; //是之前baseLogApp的日志分流时的代码有问题,曝光日志中也包含了页面日志,不能区分开来; //处理完之后,也可以顺带测试最后的kafka接收情况;
最后,把过滤后的数据输出到DWM层中,在这里我们数据保存到了dwm_unique_visit主题中,等待后续的使用。
DWM层业务设计实现–跳出明细计算//什么是跳出: 跳出就是用户成功访问了网站的一个页面后就退出,不在继续访问网站的其它页面。而跳出率就是用跳出次数除以访问次数。 关注跳出率,可以看出引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果 //判断跳出行为: 首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征: ➢ 该页面是用户近期访问的第一个页面,这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。 ➢ 首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。 //判断方法:这第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论 //的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的他并不是永远不存在,而是 //在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢? //最简单的办法就是 Flink 自带的 CEP 技术。这个 CEP 非常适合通过多条数据组合来识别某个事件。用户跳出事件,本质上就是一个条件事件加一个超时事件的组合。 //从dwd_page_log中获取数据; //Flink消费dwd_page_log主题中的数据并通过CEP方式来选取跳出的页面数据流; //选取出来的数据发送到dwm_user_jump_detail主题中;
如果对Flink CEP编程不是非常了解,可以参考我的另一篇博客,这里有一个简短的介绍:
CEP编程
具体代码思路:首先设定好事件时间戳,按照mid(机器id)进行分组,然后设定好对应模式,即10秒内没有发生页面跳转则认为没有发生跳出行为,把超时部分的内容放入侧输出流中进行输出;(只满足第一个条件的很明显就是我们要的跳出数据,但是不满足后续条件)将这部分数据存到dwm_user_jump_detail主题中,这就是我们要的跳出数据内容
测试:
//采用代码中的测试数据来做;并行度改为1;
//开启zk、kk、dwm_user_jump_detail消费主题,应用程序,查看是否有输出;
//jumpDS.print(">>>>>");
订单是统计分析的重要的对象,围绕订单有很多的维度统计需求,比如用户、地区、商品、品类、品牌等等。
为了之后统计计算更加方便,减少大表之间的关联,所以在实时计算过程中将围绕订单的相关数据整合成为一张订单的宽表。
那究竟哪些数据需要和订单整合在一起?
具体实现过程如图,订单表和订单明细表关联起来后,与用户表、地区表、品牌表、品类表、SPU表、SKU表等关联起来;
维度表和事实表进行关联后,可以得到我们所需要的这张支付宽表;那么他们之间是如何进行关联的呢?首先我们可以来看这两张事实表–订单表和订单明细表;
//order_info表的字段如下: CREATE TABLE `order_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单ID', `consignee` varchar(100) DEFAULT NULL COMMENT '收货人', `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话', `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额', `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态', `user_id` bigint(20) DEFAULT NULL COMMENT '用户id', `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式', `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址', `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注', `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)', `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `operate_time` datetime DEFAULT NULL COMMENT ' *** 作时间', `expire_time` datetime DEFAULT NULL COMMENT '失效时间', `process_status` varchar(20) DEFAULT NULL COMMENT '进度状态', `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号', `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号', `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径', `province_id` int(20) DEFAULT NULL COMMENT '地区', `activity_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '促销金额', `coupon_reduce_amount` decimal(16,2) DEFAULT NULL COMMENT '优惠券', `original_total_amount` decimal(16,2) DEFAULT NULL COMMENT '原价金额', `feight_fee` decimal(16,2) DEFAULT NULL COMMENT '运费', `feight_fee_reduce` decimal(16,2) DEFAULT NULL COMMENT '运费减免', `refundable_time` datetime DEFAULT NULL COMMENT '可退款日期(签收后30天)', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=29549 DEFAULT CHARSET=utf8 COMMENT='订单表 订单表' //订单明细表的内容如下: CREATE TABLE `order_detail` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号', `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id', `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)', `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)', `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)', `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `source_type` varchar(20) DEFAULT NULL COMMENT '来源类型', `source_id` bigint(20) DEFAULT NULL COMMENT '来源编号', `split_total_amount` decimal(16,2) DEFAULT NULL, `split_activity_amount` decimal(16,2) DEFAULT NULL, `split_coupon_amount` decimal(16,2) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=88048 DEFAULT CHARSET=utf8 COMMENT='订单明细表' //订单表设为两个,一个是基本信息表,一个是订单明细表。 //订单明细表会存每个订单里面的商品信息,比如订单两个商品对应两条订单明细表,两张表公用一个orderNo字段。 以上内容可以从MySql的数据库中的对应表中获得;对应表中事实数据则是由动态分流后获得;
因此,我们可以从dwd_order_info和dwd_order_detail两个主题中去获取我们的事实数据,然后进行关联;所以这里还需要两个Bean Pojo类,来吧两个表中的数据对应封装,根据刚才提到的order_id进行分组,然后用inner Join的方法,把两个对象中的字段合并起来;
//订单和订单明细关联; public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail){ mergeOrderInfo(orderInfo); mergeOrderDetail(orderDetail); } //给订单相关属性赋值; public void mergeOrderInfo(OrderInfo orderInfo ) { if (orderInfo != null) { this.order_id = orderInfo.id; this.order_status = orderInfo.order_status; this.create_time = orderInfo.create_time; this.create_date = orderInfo.create_date; this.activity_reduce_amount = orderInfo.activity_reduce_amount; this.coupon_reduce_amount = orderInfo.coupon_reduce_amount; this.original_total_amount = orderInfo.original_total_amount; this.feight_fee = orderInfo.feight_fee; this.total_amount = orderInfo.total_amount; this.province_id = orderInfo.province_id; this.user_id = orderInfo.user_id; } } //给订单明细相关属性赋值; public void mergeOrderDetail(OrderDetail orderDetail ) { if (orderDetail != null) { this.detail_id = orderDetail.id; this.sku_id = orderDetail.sku_id; this.sku_name = orderDetail.sku_name; this.order_price = orderDetail.order_price; this.sku_num = orderDetail.sku_num; this.split_activity_amount=orderDetail.split_activity_amount; this.split_coupon_amount=orderDetail.split_coupon_amount; this.split_total_amount=orderDetail.split_total_amount; } } //以上实际上就是把两个对象合并为orderWide对象的过程,这样,我们就拿到了这个宽表数据的雏形,接下来,我们只需要去关联其他维度数据即可,关联方法采用 //AsyncDataStream.unorderedWait方法,实现AsyncFunction(一个异步方法),顺序就按照图里的顺序一个一个关联即可; 注意,这里的AsyncDataStream.unorderedWait方法其实是一种优化方案,因为phoenix查询数据比较慢,所以在做维度关联的时候,考虑做优化; 其中这里我们选用的是unorderedWait,因为数据的顺序在这里并不重要; ➢ 无序等待(unorderedWait) 后来的数据,如果异步查询速度快可以超过先来的数据,这样性能会更好一些,但是会有乱序出现。 ➢ 有序等待(orderedWait) 严格保留先来后到的顺序,所以后来的数据即使先完成也要等前面的数据。所以性能会差一些
接下来我的代码中的关联顺序为:
关联用户维度 --》 关联省市维度 --》关联SKU商品维度 --》关联SPU维度 --》关联品牌维度 --》关联品类维度,只要按照之前关联图中的顺序来关联即可;全部关联起来之后,将数据写入到dwm_order_wide主题中,这样就得到了我们所需的订单宽表的数据了;
现在一个整体的思路都有了,那么回过头来,我们应该要思考一个问题,如何去获取这些维度表的数据呢?两个事实表的数据我们已经从kafka主题表中获取,那么这些维度表数据从Hbase中如何获得呢?
按照我们的分析过程,这里一共需要关联6个维度表,我们可以根据每个维度表单独写一个AsyncFunction,但是这样比较繁琐,但是直接封装也面临一个问题,我们每个关联表的数据内容、关联的主键、分组主键都是不同的,怎么去封装这些内容呢?这里用到一个技巧:模板方法设计模式;
//简单理解这里的模板方法设计模式: 首先,我们需要知道哪些内容是变动的: 1.关联的主键; 2.关联方法;(不同维度表的属性赋值过程) 那么我们可以这么去思考这个问题,既然不能直接封装,那么我们在用的时候再去实现这个过程不就好了; 简单来说就是不对这两个动态内容进行封装,而只对其他内容进行封装,把这两个内容设定为抽象方法,具体怎么实现,到要实现的时候重写即可; 这么一想这个问题,我们就可以在实现对应的维度表关联时来实现具体的关联过程; String key = getKey(obj); //根据维度的主键到维度表中进行查询 JSONObject dimInfoJsonObj = DimUtil.getDimInfo(tableName, key); //System.out.println("维度数据Json格式:" + dimInfoJsonObj); if(dimInfoJsonObj != null){ //维度关联 流中的事实数据和查询出来的维度数据进行关联 //这里同样采用抽象方法来做; join(obj,dimInfoJsonObj); //在实现的DimAsyncFunction类的异步调用方法asyncInvoke时,我们可以把getKey和join两个方法设定为抽象方法, //当我们需要去做属性赋值的时候,再去对应实现这两个方法,其他部分的内容是相同的;Phoenix查询
因为我们每次从Phoenix中查询的内容是不同的,所以是不能把查询的返回内容写死的,在PhoenixUtil类中:
// 从Phoenix中查询数据
// select * from 表 where XXX=xxx
//返回类型为class;
public static List queryList(String sql,Class clazz){}的返回内容应该是不确定的,所以我们封装到List中的对象也应该是不确定的;写好了PhoenixUtil类之后,我们再来回过头看看
JSonObject dimInfoJsonObj = DimUtil.getDimInfo(tableName, key);到底是如何实现的:
进入到DimUtil类中,观察getDimInfoNoCache方法,实际上DimUtil底层的查询逻辑就是我们刚才写好的PhoenixUtil类的queryList方法,可以看出具体的一个实现就是通过先拼接好对应的查询Sql语句,然后让Phoenix去对应执行查询;查询的语句类型类似:
select * from dim_base_trademark where id=10 and name=zs;
但是我们要面临一个问题:
即Hbase的查询速度是比较慢的,这也是我们前面做异步调用的一个重要原因;所以这里可以考虑再做一个优化,即旁路缓存:
//我们在上面实现的功能中,直接查询的 Hbase。外部数据源的查询常常是流式计算的性能瓶颈,所以我们需要在上面实现的基础上进行一定的优化。我们这里使用旁路缓存。旁路缓存模式是一种非常常见的按需分配缓存的模式。任何请求优先访问缓存,缓存命中,直接获得数据返回请求。如果未命中则,查询数据库,同时把结果写入缓存以备后续请求使用。(redis缓存中查询,缓存数据来自于第一次访问数据库后的查询效果); //这种缓存策略有几个注意点: //缓存要设过期时间,不然冷数据会常驻缓存浪费资源。 //要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存。 //这里不采用堆缓存来做,用redis做;旁路缓存实现
//在做维度关联的时候,大部分场景都是通过id进行关联,所以提供一个方法,只需要将id的值作为参数传进来即可 public static JSONObject getDimInfo(String tableName, String id) { return getDimInfo(tableName, Tuple2.of("id", id)); } //这里之所以做这么一步的原因是原因,是因为这些表之间基本都是根据ID这个字段来进行关联,每一个要关联的DIW表中都有id这个字段(提前在配置表中就定义好了),只不过这个id在各个DIM表中表示的含义不同(比如订单id、地区id等),我们根据不同的id来获取不同的数据;注意,这里的id是通过之前另一个抽象方法getKey()获取到的,所以这里的id对应的就是不同表中的不同id字段,所以数据是匹配得上的,不用担心数据匹配不上;
所以,相比未优化之前的getDimInfoNoCache方法,getDimInfo中多增加了一步判断过程,即Redis中是否有对应的缓存数据,如果有直接从redis中获取数据,如果没有,则从Hbase中获取数据并且把数据导入到redis中,保存的key就以dim+dim表名+查询字段名来命名;注意,由于这里我们只用了id来做关联,所以在后面做删除缓存 *** 作的时候,也只写了id的情况,即这里的查询字段名都是id(只不过是各个表自己的id);
在这里还要注意一个点,从redis或者Phoenix中查询出来的数据,不止一个,所以我们最后返回的结果是一个List集合,其中List中的每一个元素就是查询结果的一行封装好的JsonObject对象;
➢ 测试用户维度关联 ◼ 将 table_process 表中的数据删除掉,执行table_process 初始配置.sql(上一篇博客中也给出了) CREATE TABLE `table_process` ( `source_table` varchar(200) NOT NULL COMMENT '来源表', `operate_type` varchar(200) NOT NULL COMMENT ' *** 作类型 insert,update,delete', `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka', `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)', `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段', `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段', `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展', PRIMARY KEY (`source_table`,`operate_type`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ◼ 启动 Maxwell、ZK、Kafka、HDFS、Hbase、Redis ◼ 运行运行 Idea 中的 baseDBApp ◼ 初始化用户维度数据到 Hbase(通过 Maxwell 的 Bootstrap) bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop202 --database gmall2021 --table user_info --client_id maxwell_1 ◼ 运行 Idea 中的 OrderWideApp ◼ 执行模拟生成业务数据的 jar 包 ◼ 查看控制台输出可以看到用户的年龄以及性别 //测试结果大致如下: //都关联好之后,最后输出数据如下: //>>>>>:1> OrderWide{detail_id=89087, order_id=30441, sku_id=11, order_price=8197.00, sku_num=1, // sku_name='Apple iPhone 12 (A2404) 64GB 白色 支持移动联通电信5G 双卡双待手机', province_id=31, order_status='1001', // user_id=1903, total_amount=8286.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, // original_total_amount=8266.00, feight_fee=20.00, split_feight_fee=null, split_activity_amount=null, // split_coupon_amount=null, split_total_amount=8197.00, expire_time='null', create_time='2021-07-16 20:06:47', // operate_time='null', create_date='null', create_hour='null', province_name='四川', province_area_code='510000', // province_iso_code='CN-51', province_3166_2_code='CN-SC', user_age=18, user_gender='F', spu_id=3, tm_id=2, // category3_id=61, spu_name='Apple iPhone 12', tm_name='苹果', category3_name='手机'} //可以看到,关联数据都关联好了; //kafka中接收到的数据为: //{"activity_reduce_amount":0.00,"category3_id":86,"category3_name":"平板电视","coupon_reduce_amount":0.00, // "create_time":"2021-07-16 10:53:42","detail_id":89306,"feight_fee":7.00,"order_id":30598, // "order_price":2899.00,"order_status":"1001","original_total_amount":5798.00,"province_3166_2_code":"CN-AH", // "province_area_code":"340000","province_id":9,"province_iso_code":"CN-34","province_name":"安徽","sku_id":20, // "sku_name":"小米电视E65X 65英寸 全面屏 4K超高清HDR 蓝牙遥控内置小爱 2+8GB AI人工智能液晶网络平板电视 L65M5-EA", // "sku_num":2,"split_total_amount":5798.00,"spu_id":6,"spu_name":"小米电视 内置小爱 智能网络液晶平板教育电视", // "tm_id":5,"tm_name":"小米","total_amount":5805.00,"user_age":49,"user_gender":"M","user_id":2113} //此时redis中保存的是维度数据的查询结果;一些缺失的内容在进入kafka时被删除了(比如operate_time='null', create_date='null'等);DWM层业务设计实现–支付宽表
支付宽表的目的,最主要的原因是支付表没有关联到订单明细,支付金额没有细分到商品上,没有办法统计商品级的支付状况。所以本次宽表的核心就是要把支付表的信息与订单明细(也即上面的订单宽表)关联上。(通过odder_id进行关联)
为了更方便理解支付宽表的结果,在最下方的测试过程中,我给出了支付表和刚才做出来的订单宽表的内容,相当于此处是对支付表的一个内容扩展。
解决方案有两个
➢ 一个是把订单宽表输出到Hbase上,在支付宽表计算时查询hbase,这相当于把订单宽表作为一种维度进行管理。(订单宽表的关联思想)
➢ 一个是用流的方式接收订单宽表数据,然后用双流join方式进行合并。因为订单与支付产生有一定的时差。所以必须用intervalJoin来管理流的状态时间,保证当支付到达时订单明细还保存在状态中。(这里用第二种方式,用流的方式更符合实时处理);
具体的代码流程这里就不展开讲解了,和订单宽表的intervalJoin方法基本一致,具体测试结果如下所示:
//测试://zk、kk、maxwell、hdfs、hbase、baseDBApp、redis; //运行baseDBApp;OrderWideApp;本应用; //运行rt_dblog下的业务数据生成脚本; //会输出pay和roderwidr两个数据流; //数据类型类似: //pay>>>>>:4> PaymentInfo{id=19914, order_id=30772, user_id=2312, total_amount=339.00, // subject='CAREMiLLE珂曼奶油小方口红 雾面滋润保湿持久丝缎唇膏 M02干玫瑰等6件商品', payment_type='1102', // create_time='2021-07-16 13:05:40', callback_time='2021-07-16 13:06:00'} //orderWide>>>>:4> OrderWide{detail_id=89344, order_id=30623, sku_id=17, order_price=6699.00, sku_num=1, // sku_name='TCL 65Q10 65英寸 QLED原色量子点电视 安桥音响 AI声控智慧屏 超薄全面屏 MEMC防抖 3+32GB 平板电视', // province_id=24, order_status='1001', user_id=178, total_amount=6717.00, activity_reduce_amount=0.00, // coupon_reduce_amount=0.00, original_total_amount=6699.00, feight_fee=18.00, split_feight_fee=null, // split_activity_amount=null, split_coupon_amount=null, split_total_amount=6699.00, expire_time='null', // create_time='2021-07-16 13:05:38', operate_time='null', create_date='null', create_hour='null', // province_name='湖北', province_area_code='420000', province_iso_code='CN-42', province_3166_2_code='CN-HB', // user_age=51, user_gender='F', spu_id=5, tm_id=4, category3_id=86, // spu_name='TCL巨幕私人影院电视 4K超高清 AI智慧屏 液晶平板电视机', tm_name='TCL', category3_name='平板电视'} //测试2:方法类似,查看一下dwm_payment_wide主题的消费情况; //数据结果为: //{"activity_reduce_amount":0.00,"callback_time":"2021-07-16 14:22:22","category3_id":61,"category3_name":"手机", // "coupon_reduce_amount":0.00,"detail_id":89977,"feight_fee":7.00,"order_create_time":"2021-07-16 14:22:01", // "order_id":31054,"order_price":1299.00,"order_status":"1001","original_total_amount":3726.00, // "payment_create_time":"2021-07-16 14:22:02","payment_id":20006,"payment_type":"1102", // "province_3166_2_code":"CN-GD","province_area_code":"440000","province_id":26,"province_iso_code":"CN-44", // "province_name":"广东","sku_id":6,"sku_name":"Redmi 10X 4G Helio G85游戏芯 4800万超清四摄 5020mAh大电量 小孔全面屏 128GB大存储 8GB+128GB 冰雾白 游戏智能手机 小米 红米", // "sku_num":1,"split_total_amount":1299.00,"spu_id":2,"spu_name":"Redmi 10X", // "subject":"索芙特i-Softto 口红不掉色唇膏保湿滋润 璀璨金钻哑光唇膏 Y01复古红 百搭气质 璀璨金钻哑光唇膏 等4件商品", // "tm_id":1,"tm_name":"Redmi","total_amount":3733.00,"user_age":24,"user_gender":"F","user_id":1431}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)