目录
概述
什么是拉链表?
拉链表有何作用?
多表联合的拉链表
首日装载
得到某些目标字段的最新数据
最新数据集
每日装载
新旧对比
数据的动态分区
总结
概述 什么是拉链表?
拉链表就是对数据的一种状态化的展现,数据何时发生了变化,该行数据该状态的持续时间段一目了然。
拉链表的一行数据具体形式如下:
数据内容a 数据内容b 起始时间 结束时间
其中起始时间是该行数据从何时开始生效,结束时间为该行数据何时失效,一般来说该时间精确到天。
假如是用户拉链表,对于某一个特定id的用户来说,数据就像是拉链一般,每一个起始时间对应该用户上一个状态的结束时间。
拉链表有何作用?一般来说,一个维度表的数据通常不大于100w行,但是某些维度可能不一样,比如电商中的商品维度,用户基数较大的用户维度,这些表的数据都有可能大于100w行,但是它们数据变化量可能每日不超过10%,如果都进行每日全量存储,会造成大量的资源浪费;同样,在进行计算时,不可避免的全表扫描也会大大增加计算的负担。这么看来,做一个拉链表,只针对变化的状态进行存储,可以节省不少资源。
也有小伙伴会问,同样是存储新增及变化的数据,为什么一定要拉链表的形式而非普通的新增及变化呢?就由我来回答这个问题。
我们要知道,维度表用于关联时往往需要取得最新的数据,如果采取普通的新增及变化的策略,那么一行数据里顶多只有一个数据变化的时间,也就是用于分区的条件。但是我们并不清楚在某个时间点该数据的样貌,好比1.5号数据变化了,你1.8号去查只能查到1.8号变化的数据。又有小伙伴会说:我只要知道它1.8没变化,那么查最新的数据不就行了吗?确实如此,但是如果要知道这条数据的用户每个状态的持续时间,这样计算起来就相对来说要麻烦一点了。
要记住,拉链表没有任何缺点,除了建表比较麻烦!!(这也就是我这个表卡了挺久的原因)但是拉链表建表的麻烦是换取以后上层建表的简单,要问用普通的新增及变化可以吗?当然可以!
多表联合的拉链表网上众多用一张ods层源表来实现拉链表的例子,我就不在这列举了。
不了解如何实现基础的拉链表的话,建议先去其它博客了解了解再来。
这里给出一种更加复杂的情况,
首先,数据源为多张表,且多张表均采用新增及变化的同步策略至ods层。;其次,由于ods层的表已经确定好,并用于了其他表的建设,所以不适合再次改动。因此表中含有多个分区,且最新的所有数据需要通过SQL提出。
SQL总共用到3个源表:ods_hip_b2b_customer_webuser,ods_core_channel,ods_hip_b2b_webuser_nymanager。分别是渠道用户表,渠道表,客户经理表。共有5个子查询,其中某些子查询中嵌套子查询。
该表分为首日与每日装载。
首日装载和一张源表的,只要取出最新的数据集即可。问题是怎样得到所有的最新数据呢?
得到某些目标字段的最新数据由于三张表都是新增及变化的同步策略,如果要得到各自表所对应的最新的数据,只需要对其进行一个分区字段的倒叙排序取第一个就行,部分代码如下:
SELECt id, channel_name, sfid, channel_system, nymanager, parent_id FROM ( SELECt id, channel_name, sfid, channel_system, nymanager, parent_id, RANK() OVER(PARTITION by id ORDER by pt DESC ) rk FROM nanyan_space.ods_core_channel ) tmp_c where rk=1最新数据集
目标表数据字段来自于多个子查询的话,可以转换一下思想,如果只有一个子查询该怎么做?我们要的数据是什么样子?无非就是最新的数据集罢了!
那么现在能够得到多个字段各自的最新数据,该怎样每行数据都是一行最新的、完整的数据,这才是我们的目的。因此只需要根据用户表的所有用户,对各个子查询进行left join,就能得到每个用户的最新数据了。
记bizdate(就是昨天,称为业务日期)为start_date,99990101为end_date,分区字段pt为99990101(任意一个不可能取到的日期即可,但由于阿里云的dataworks中datetime类型需要是正常年月日,后期又要用end_date来作为pt,故这里用的99990101),将用户表与所有多表查询的字段结果进行left join得到最终的最新数据:
INSERT OVERWRITE TABLE dim_channel_webusers_info PARTITION(pt) SELECt t1.cuswebuserid as user_id, t1.name as user_name, t1.login_name, t1.`status`, t1.sfid as user_sfid, t1.created_date as create_date, t2.id as channel_id, t2.channel_name, t2.sfid as channel_sfid, t2.channel_system, CASE WHEN t2.channel_system=4 THEN t3.nymanager else t2.nymanager END as nymanager, t4.cuswebuserid as referee_id, t4.name as referee_name, t4.login_name as referee_login_name, t5.cuswebuserid as manager_id, t5.name as manager_name, t5.login_name as manager_login_name, TO_DATE('${bizdate}','yyyymmdd') as start_date, TO_DATE('99990101','yyyymmdd') as end_date, '99990101' as pt -- 用户账号信息 FROM ( SELECt cuswebuserid, name, login_name, `status`, sfid, created_date, core_channel_id, parent_id FROM ( SELECt cuswebuserid, name, login_name, `status`, sfid, created_date, core_channel_id, parent_id, RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk FROM nanyan_space.ods_hip_b2b_customer_webuser ) tmp_wu WHERe rk=1 )t1 -- 用户渠道信息以及join条件 LEFT JOIN ( SELECt id, channel_name, sfid, channel_system, nymanager, parent_id FROM ( SELECt id, channel_name, sfid, channel_system, nymanager, parent_id, RANK() OVER(PARTITION by id ORDER by pt DESC ) rk FROM nanyan_space.ods_core_channel ) tmp_c where rk=1 )t2 on t1.core_channel_id = t2.id -- 蜂系的客户经理 LEFT JOIN ( SELECt cuswebuserid, nymanager from ( SELECt cuswebuserid, nymanager, RANK() OVER(PARTITION by id ORDER by pt DESC ) rk FROM nanyan_space.ods_hip_b2b_webuser_nymanager ) where rk=1 )t3 on t1.cuswebuserid=t3.cuswebuserid -- 推荐人信息 LEFT JOIN ( SELECt cuswebuserid, name, login_name FROM ( SELECt cuswebuserid, name, login_name, RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk FROM nanyan_space.ods_hip_b2b_customer_webuser ) tmp_wu WHERe rk=1 )t4 on t1.parent_id = t4.cuswebuserid -- 管理员信息 LEFT JOIN ( SELECt tmp_cw.cuswebuserid, tmp_cw.name, tmp_cw.login_name, tmp_cw.core_channel_id FROM ( SELECt cuswebuserid, name, login_name, core_channel_id, login_pwd, RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk FROM nanyan_space.ods_hip_b2b_customer_webuser ) tmp_cw JOIN ( SELECt id, admin_username, admin_password, RANK() OVER(PARTITION by id ORDER by pt DESC ) rk FROM nanyan_space.ods_core_channel ) tmp_cc ON tmp_cw.login_name=tmp_cc.admin_username AND tmp_cw.login_pwd=tmp_cc.admin_password and core_channel_id=id WHERe tmp_cc.rk=1 and tmp_cw.rk=1 )t5 on t1.core_channel_id = t5.core_channel_id ;
将所有用户的最新数据提出并装载至99990101分区中,至此完成该表的首日装载。
每日装载每日装载需要的数据是什么样的?首先想到拉链表的结构——该用户上一条数据的结束时间对应着下一条数据的起始时间(两者可以相同,也可以相差一天,具体根据业务来定)。
新旧对比拉链表的性质涉及到新旧对比,也就是当日变化的的数据与之前最新的数据进行full join,通过nvl(新,旧)函数进行判断,选择合适的字段作为最新的数据继续存入99990101分区,同时将过期(之前有该用户,并在昨日进行修改)的数据写入bizdate(业务日期,也就是昨天)分区。
重点就是数据源是多表该如何判断哪些数据是变化了的数据呢?因为可能用户发生了变化,该用户对应的渠道没有变化;或者说渠道变化了,但属于该渠道的用户没有变化。我们不论对用户表、渠道表最新分区的数据进行怎样的join *** 作,都会导致缺失多条变化的数据,因为可能渠道变化了的用户不在最新的分区当中。
面对上面这种情况我们该如何处理?
1.先行筛选出所有变化的数据,再对99990101分区进行full join。
2.直接采用首日导入脚本,获取全部用户最新的数据,与99990101分区进行full join后,再筛选是否为变化的数据。
乍一看,方案1应该更节省资源,其实不然,方案一首先光筛选所有变化的数据,是必然进行多表的全表扫描的,因为只有多表join后才能知道所有变化的用户,其次逻辑不是非常的清晰。因此我们采用方案2,通过组建临时表的形式将新旧数据全部暂存。
WITH tmp as ( SELECt old.user_id as old_user_id, old.user_name as old_user_name, old.login_name as old_login_name, old.`status` as old_status, old.user_sfid as old_user_sfid, old.create_date as old_create_date, old.channel_id as old_channel_id, old.channel_name as old_channel_name, old.channel_sfid as old_channel_sfid, old.channel_system as old_channel_system, old.nymanager as old_nymanager, old.referee_id as old_referee_id, old.referee_name as old_referee_name, old.referee_login_name as old_referee_login_name, old.manager_id as old_manager_id, old.manager_name as old_manager_name, old.manager_login_name as old_manager_login_name, old.start_date as old_start_date, old.end_date as old_end_date, new.user_id as new_user_id, new.user_name as new_user_name, new.login_name as new_login_name, new.`status` as new_status, new.user_sfid as new_user_sfid, new.create_date as new_create_date, new.channel_id as new_channel_id, new.channel_name as new_channel_name, new.channel_sfid as new_channel_sfid, new.channel_system as new_channel_system, new.nymanager as new_nymanager, new.referee_id as new_referee_id, new.referee_name as new_referee_name, new.referee_login_name as new_referee_login_name, new.manager_id as new_manager_id, new.manager_name as new_manager_name, new.manager_login_name as new_manager_login_name, new.start_date as new_start_date, new.end_date as new_end_date FROM ( SELECt user_id, user_name, login_name, `status`, user_sfid, create_date, channel_id, channel_name, channel_sfid, channel_system, nymanager, referee_id, referee_name, referee_login_name, manager_id, manager_name, manager_login_name, start_date, end_date FROM nanyan_space.dim_channel_webusers_info where pt='99990101' )old FULL outer JOIN ( SELECt t1.cuswebuserid as user_id, t1.name as user_name, t1.login_name, t1.`status`, t1.sfid as user_sfid, t1.created_date as create_date, t2.id as channel_id, t2.channel_name, t2.sfid as channel_sfid, t2.channel_system, CASE WHEN t2.channel_system=4 THEN t3.nymanager else t2.nymanager END as nymanager, t4.cuswebuserid as referee_id, t4.name as referee_name, t4.login_name as referee_login_name, t5.cuswebuserid as manager_id, t5.name as manager_name, t5.login_name as manager_login_name, TO_DATE('${bizdate}','yyyymmdd') as start_date, TO_DATE('99990101','yyyymmdd') as end_date -- 用户账号信息 FROM ( SELECt cuswebuserid, name, login_name, `status`, sfid, created_date, core_channel_id, parent_id FROM ( SELECt cuswebuserid, name, login_name, `status`, sfid, created_date, core_channel_id, parent_id, RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk FROM nanyan_space.ods_hip_b2b_customer_webuser ) tmp_wu WHERe rk=1 )t1 -- 用户渠道信息以及join条件 LEFT JOIN ( SELECt id, channel_name, sfid, channel_system, nymanager, parent_id FROM ( SELECt id, channel_name, sfid, channel_system, nymanager, parent_id, RANK() OVER(PARTITION by id ORDER by pt DESC ) rk FROM nanyan_space.ods_core_channel ) tmp_c where rk=1 )t2 on t1.core_channel_id = t2.id -- 蜂系的客户经理 LEFT JOIN ( SELECt cuswebuserid, nymanager from ( SELECt cuswebuserid, nymanager, RANK() OVER(PARTITION by id ORDER by pt DESC ) rk FROM nanyan_space.ods_hip_b2b_webuser_nymanager ) where rk=1 )t3 on t1.cuswebuserid=t3.cuswebuserid -- 推荐人信息 LEFT JOIN ( SELECt cuswebuserid, name, login_name FROM ( SELECt cuswebuserid, name, login_name, RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk FROM nanyan_space.ods_hip_b2b_customer_webuser ) tmp_wu WHERe rk=1 )t4 on t1.parent_id = t4.cuswebuserid -- 管理员信息 LEFT JOIN ( SELECt tmp_cw.cuswebuserid, tmp_cw.name, tmp_cw.login_name, tmp_cw.core_channel_id FROM ( SELECt cuswebuserid, name, login_name, core_channel_id, login_pwd, RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk FROM nanyan_space.ods_hip_b2b_customer_webuser ) tmp_cw JOIN ( SELECt id, admin_username, admin_password, RANK() OVER(PARTITION by id ORDER by pt DESC ) rk FROM nanyan_space.ods_core_channel ) tmp_cc ON tmp_cw.login_name=tmp_cc.admin_username AND tmp_cw.login_pwd=tmp_cc.admin_password and core_channel_id=id WHERe tmp_cc.rk=1 and tmp_cw.rk=1 )t5 on t1.core_channel_id = t5.core_channel_id )new on old.user_id=new.user_id )数据的动态分区
然后再进行筛选数据发往不同的分区,通过nvl()函数,得到最新的数据,发往99990101分区;通过where xxx in (子查询变化的数据)来确定是否为发生变化的用户,变化了的用户发往昨天的分区。根据pt字段进行分区的划分。
INSERT OVERWRITE TABLE dim_channel_webusers_info PARTITION(pt) SELECt nvl(new_user_id,old_user_id), nvl(new_user_name,old_user_name), nvl(new_login_name,old_login_name), nvl(new_status,old_status), nvl(new_user_sfid,old_user_sfid), nvl(new_create_date,old_create_date), nvl(new_channel_id,old_channel_id), nvl(new_channel_name,old_channel_name), nvl(new_channel_sfid,old_channel_sfid), nvl(new_channel_system,old_channel_system), nvl(new_nymanager,old_nymanager), nvl(new_referee_id,old_referee_id), nvl(new_referee_name,old_referee_name), nvl(new_referee_login_name,old_referee_login_name), nvl(new_manager_id,old_manager_id), nvl(new_manager_name,old_manager_name), nvl(new_manager_login_name,old_manager_login_name), new_start_date, new_end_date, '99990101' pt from tmp union ALL SELECt old_user_id, old_user_name, old_login_name, old_status, old_user_sfid, old_create_date, old_channel_id, old_channel_name, old_channel_sfid, old_channel_system, old_nymanager, old_referee_id, old_referee_name, old_referee_login_name, old_manager_id, old_manager_name, old_manager_login_name, old_start_date, TO_DATE('${bizdate}','yyyymmdd') as end_date, '${bizdate}' as pt FROM tmp --只有有表有变化就进行筛选 where old_user_id is not NULL and new_user_id is not NULL and ( old_user_id IN ( SELECt cuswebuserid FROM nanyan_space.ods_hip_b2b_customer_webuser where pt = '${bizdate}' ) or old_channel_id IN ( SELECt id FROM nanyan_space.ods_core_channel where pt = '${bizdate}' ) or (old_channel_system=4 and old_user_id in ( SELECt cuswebuserid FROM nanyan_space.ods_hip_b2b_webuser_nymanager where pt = '${bizdate}' ) ) ) ;
旧表数据必须用用户主键 is not null 来进行判断!因为有可能是今天刚刚创建的一条信息,但之前没有这条数据的记录,它本身就是最新的数据,那么自然就没有之前的用户主键;换句话说只有新表有数据,且旧表有数据才是真正发生改变的数据!!
这样就完成了我们的每日装载模块。
总结拉链表,麻烦归麻烦,但是我们只要掌握了思路,举一反三,那么不管是什么类型的拉链表,下次也能轻松解决。这次的思路就是将多表化为一表来解决的。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)