利用dataworks实现多表联合的拉链表

利用dataworks实现多表联合的拉链表,第1张

利用dataworks实现多表联合的拉链表

目录

概述

什么是拉链表?

拉链表有何作用?

多表联合的拉链表

首日装载

得到某些目标字段的最新数据

最新数据集

每日装载

新旧对比

数据的动态分区

总结


概述 什么是拉链表?

拉链表就是对数据的一种状态化的展现,数据何时发生了变化,该行数据该状态的持续时间段一目了然。

拉链表的一行数据具体形式如下:
数据内容a        数据内容b        起始时间        结束时间
其中起始时间是该行数据从何时开始生效,结束时间为该行数据何时失效,一般来说该时间精确到天。

假如是用户拉链表,对于某一个特定id的用户来说,数据就像是拉链一般,每一个起始时间对应该用户上一个状态的结束时间。

拉链表有何作用?

一般来说,一个维度表的数据通常不大于100w行,但是某些维度可能不一样,比如电商中的商品维度,用户基数较大的用户维度,这些表的数据都有可能大于100w行,但是它们数据变化量可能每日不超过10%,如果都进行每日全量存储,会造成大量的资源浪费;同样,在进行计算时,不可避免的全表扫描也会大大增加计算的负担。这么看来,做一个拉链表,只针对变化的状态进行存储,可以节省不少资源。

也有小伙伴会问,同样是存储新增及变化的数据,为什么一定要拉链表的形式而非普通的新增及变化呢?就由我来回答这个问题。

我们要知道,维度表用于关联时往往需要取得最新的数据,如果采取普通的新增及变化的策略,那么一行数据里顶多只有一个数据变化的时间,也就是用于分区的条件。但是我们并不清楚在某个时间点该数据的样貌,好比1.5号数据变化了,你1.8号去查只能查到1.8号变化的数据。又有小伙伴会说:我只要知道它1.8没变化,那么查最新的数据不就行了吗?确实如此,但是如果要知道这条数据的用户每个状态的持续时间,这样计算起来就相对来说要麻烦一点了。

要记住,拉链表没有任何缺点,除了建表比较麻烦!!(这也就是我这个表卡了挺久的原因)但是拉链表建表的麻烦是换取以后上层建表的简单,要问用普通的新增及变化可以吗?当然可以!

多表联合的拉链表

网上众多用一张ods层源表来实现拉链表的例子,我就不在这列举了。

不了解如何实现基础的拉链表的话,建议先去其它博客了解了解再来。
这里给出一种更加复杂的情况,

首先,数据源为多张表,且多张表均采用新增及变化的同步策略至ods层。;其次,由于ods层的表已经确定好,并用于了其他表的建设,所以不适合再次改动。因此表中含有多个分区,且最新的所有数据需要通过SQL提出。

SQL总共用到3个源表:ods_hip_b2b_customer_webuser,ods_core_channel,ods_hip_b2b_webuser_nymanager。分别是渠道用户表,渠道表,客户经理表。共有5个子查询,其中某些子查询中嵌套子查询。

该表分为首日与每日装载。

首日装载

和一张源表的,只要取出最新的数据集即可。问题是怎样得到所有的最新数据呢?

得到某些目标字段的最新数据

由于三张表都是新增及变化的同步策略,如果要得到各自表所对应的最新的数据,只需要对其进行一个分区字段的倒叙排序取第一个就行,部分代码如下:

SELECt 
    id,
    channel_name,
    sfid,
    channel_system,
    nymanager,
    parent_id
FROM (
    SELECt 
        id,
        channel_name,
        sfid,
        channel_system,
        nymanager,
        parent_id,
        RANK() OVER(PARTITION by id ORDER by pt DESC ) rk 
    FROM nanyan_space.ods_core_channel
    ) tmp_c
where rk=1
最新数据集

目标表数据字段来自于多个子查询的话,可以转换一下思想,如果只有一个子查询该怎么做?我们要的数据是什么样子?无非就是最新的数据集罢了!

那么现在能够得到多个字段各自的最新数据,该怎样每行数据都是一行最新的、完整的数据,这才是我们的目的。因此只需要根据用户表的所有用户,对各个子查询进行left join,就能得到每个用户的最新数据了。

记bizdate(就是昨天,称为业务日期)为start_date,99990101为end_date,分区字段pt为99990101(任意一个不可能取到的日期即可,但由于阿里云的dataworks中datetime类型需要是正常年月日,后期又要用end_date来作为pt,故这里用的99990101),将用户表与所有多表查询的字段结果进行left join得到最终的最新数据:

INSERT OVERWRITE TABLE dim_channel_webusers_info PARTITION(pt)
SELECt 
    t1.cuswebuserid as user_id,
    t1.name as user_name,
    t1.login_name,
    t1.`status`,
    t1.sfid as user_sfid,
    t1.created_date as create_date,
    t2.id as channel_id,
    t2.channel_name,
    t2.sfid as channel_sfid,
    t2.channel_system,
    CASE WHEN t2.channel_system=4 THEN t3.nymanager else t2.nymanager END as nymanager,
    t4.cuswebuserid as referee_id,
    t4.name as referee_name,
    t4.login_name as referee_login_name,
    t5.cuswebuserid as manager_id,
    t5.name as manager_name,
    t5.login_name as manager_login_name,
    TO_DATE('${bizdate}','yyyymmdd') as start_date,
    TO_DATE('99990101','yyyymmdd') as end_date,
    '99990101' as pt
-- 用户账号信息
FROM (
    SELECt 
        cuswebuserid,
        name,
        login_name,
        `status`,
        sfid,
        created_date,
        core_channel_id,
        parent_id
    FROM (
        SELECt 
            cuswebuserid,
            name,
            login_name,
            `status`,
            sfid,
            created_date,
            core_channel_id,
            parent_id,
            RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk
        FROM nanyan_space.ods_hip_b2b_customer_webuser
    ) tmp_wu
    WHERe rk=1
)t1
-- 用户渠道信息以及join条件
LEFT JOIN (    
    SELECt 
        id,
        channel_name,
        sfid,
        channel_system,
        nymanager,
        parent_id
    FROM (
        SELECt 
            id,
            channel_name,
            sfid,
            channel_system,
            nymanager,
            parent_id,
            RANK() OVER(PARTITION by id ORDER by pt DESC ) rk 
        FROM nanyan_space.ods_core_channel
    ) tmp_c
    where rk=1
)t2
on t1.core_channel_id = t2.id
-- 蜂系的客户经理
LEFT JOIN (
    SELECt 
        cuswebuserid,
        nymanager
    from (
        SELECt 
            cuswebuserid,
            nymanager,
            RANK() OVER(PARTITION by id ORDER by pt DESC ) rk
        FROM nanyan_space.ods_hip_b2b_webuser_nymanager 
    )
    where rk=1
)t3
on t1.cuswebuserid=t3.cuswebuserid
-- 推荐人信息
LEFT JOIN (
    SELECt 
        cuswebuserid,
        name,
        login_name
    FROM (
        SELECt 
            cuswebuserid,
            name,
            login_name,
            RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk
        FROM nanyan_space.ods_hip_b2b_customer_webuser
    ) tmp_wu
    WHERe rk=1
)t4
on t1.parent_id = t4.cuswebuserid
-- 管理员信息
LEFT JOIN (
    SELECt 
        tmp_cw.cuswebuserid,
        tmp_cw.name,
        tmp_cw.login_name,
        tmp_cw.core_channel_id
    FROM (
        SELECt 
            cuswebuserid,
            name,
            login_name,
            core_channel_id,
            login_pwd,
            RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk
        FROM nanyan_space.ods_hip_b2b_customer_webuser
    ) tmp_cw
    JOIN (
        SELECt 
            id,
            admin_username,
            admin_password,
            RANK() OVER(PARTITION by id ORDER by pt DESC ) rk
        FROM nanyan_space.ods_core_channel 
    ) tmp_cc
    ON tmp_cw.login_name=tmp_cc.admin_username 
    AND tmp_cw.login_pwd=tmp_cc.admin_password
    and core_channel_id=id
    WHERe tmp_cc.rk=1 and tmp_cw.rk=1
)t5
on t1.core_channel_id = t5.core_channel_id
;

将所有用户的最新数据提出并装载至99990101分区中,至此完成该表的首日装载。

每日装载

每日装载需要的数据是什么样的?首先想到拉链表的结构——该用户上一条数据的结束时间对应着下一条数据的起始时间(两者可以相同,也可以相差一天,具体根据业务来定)。

新旧对比

拉链表的性质涉及到新旧对比,也就是当日变化的的数据与之前最新的数据进行full join,通过nvl(新,旧)函数进行判断,选择合适的字段作为最新的数据继续存入99990101分区,同时将过期(之前有该用户,并在昨日进行修改)的数据写入bizdate(业务日期,也就是昨天)分区。

重点就是数据源是多表该如何判断哪些数据是变化了的数据呢?因为可能用户发生了变化,该用户对应的渠道没有变化;或者说渠道变化了,但属于该渠道的用户没有变化。我们不论对用户表、渠道表最新分区的数据进行怎样的join *** 作,都会导致缺失多条变化的数据,因为可能渠道变化了的用户不在最新的分区当中。

面对上面这种情况我们该如何处理?
1.先行筛选出所有变化的数据,再对99990101分区进行full join。
2.直接采用首日导入脚本,获取全部用户最新的数据,与99990101分区进行full join后,再筛选是否为变化的数据。

乍一看,方案1应该更节省资源,其实不然,方案一首先光筛选所有变化的数据,是必然进行多表的全表扫描的,因为只有多表join后才能知道所有变化的用户,其次逻辑不是非常的清晰。因此我们采用方案2,通过组建临时表的形式将新旧数据全部暂存。

WITH 
tmp as 
(
    SELECt 
        old.user_id as old_user_id,
        old.user_name as old_user_name,
        old.login_name as old_login_name,
        old.`status` as old_status,
        old.user_sfid as old_user_sfid,
        old.create_date as old_create_date,
        old.channel_id as old_channel_id,
        old.channel_name as old_channel_name,
        old.channel_sfid as old_channel_sfid,
        old.channel_system as old_channel_system,
        old.nymanager as old_nymanager,
        old.referee_id as old_referee_id,
        old.referee_name as old_referee_name,
        old.referee_login_name as old_referee_login_name,
        old.manager_id as old_manager_id,
        old.manager_name as old_manager_name,
        old.manager_login_name as old_manager_login_name,
        old.start_date as old_start_date,
        old.end_date as old_end_date,
        new.user_id as new_user_id,
        new.user_name as new_user_name,
        new.login_name as new_login_name,
        new.`status` as new_status,
        new.user_sfid as new_user_sfid,
        new.create_date as new_create_date,
        new.channel_id as new_channel_id,
        new.channel_name as new_channel_name,
        new.channel_sfid as new_channel_sfid,
        new.channel_system as new_channel_system,
        new.nymanager as new_nymanager,
        new.referee_id as new_referee_id,
        new.referee_name as new_referee_name,
        new.referee_login_name as new_referee_login_name,
        new.manager_id as new_manager_id,
        new.manager_name as new_manager_name,
        new.manager_login_name as new_manager_login_name,
        new.start_date as new_start_date,
        new.end_date as new_end_date
    FROM (
        SELECt 
            user_id,
            user_name,
            login_name,
            `status`,
            user_sfid,
            create_date,
            channel_id,
            channel_name,
            channel_sfid,
            channel_system,
            nymanager,
            referee_id,
            referee_name,
            referee_login_name,
            manager_id,
            manager_name,
            manager_login_name,
            start_date,
            end_date
        FROM nanyan_space.dim_channel_webusers_info
        where pt='99990101'
    )old 
    FULL outer JOIN (
        SELECt  
        t1.cuswebuserid as user_id,
        t1.name as user_name,
        t1.login_name,
        t1.`status`,
        t1.sfid as user_sfid,
        t1.created_date as create_date,
        t2.id as channel_id,
        t2.channel_name,
        t2.sfid as channel_sfid,
        t2.channel_system,
        CASE WHEN t2.channel_system=4 THEN t3.nymanager else t2.nymanager END as nymanager,
        t4.cuswebuserid as referee_id,
        t4.name as referee_name,
        t4.login_name as referee_login_name,
        t5.cuswebuserid as manager_id,
        t5.name as manager_name,
        t5.login_name as manager_login_name,
        TO_DATE('${bizdate}','yyyymmdd') as start_date,
        TO_DATE('99990101','yyyymmdd') as end_date
        -- 用户账号信息
        FROM (
            SELECt 
                cuswebuserid,
                name,
                login_name,
                `status`,
                sfid,
                created_date,
                core_channel_id,
                parent_id
            FROM (
                SELECt 
                    cuswebuserid,
                    name,
                    login_name,
                    `status`,
                    sfid,
                    created_date,
                    core_channel_id,
                    parent_id,
                    RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk
                FROM nanyan_space.ods_hip_b2b_customer_webuser
            ) tmp_wu
            WHERe rk=1
        )t1
        -- 用户渠道信息以及join条件
        LEFT JOIN (    
            SELECt 
                id,
                channel_name,
                sfid,
                channel_system,
                nymanager,
                parent_id
            FROM (
                SELECt 
                    id,
                    channel_name,
                    sfid,
                    channel_system,
                    nymanager,
                    parent_id,
                    RANK() OVER(PARTITION by id ORDER by pt DESC ) rk 
                FROM nanyan_space.ods_core_channel
            ) tmp_c
            where rk=1
        )t2
        on t1.core_channel_id = t2.id
        -- 蜂系的客户经理
        LEFT JOIN (
            SELECt 
                cuswebuserid,
                nymanager
            from (
                SELECt 
                    cuswebuserid,
                    nymanager,
                    RANK() OVER(PARTITION by id ORDER by pt DESC ) rk
                FROM nanyan_space.ods_hip_b2b_webuser_nymanager 
            )
            where rk=1
        )t3
        on t1.cuswebuserid=t3.cuswebuserid
        -- 推荐人信息
        LEFT JOIN (
            SELECt 
                cuswebuserid,
                name,
                login_name
            FROM (
                SELECt 
                    cuswebuserid,
                    name,
                    login_name,
                    RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk
                FROM nanyan_space.ods_hip_b2b_customer_webuser
            ) tmp_wu
            WHERe rk=1
        )t4
        on t1.parent_id = t4.cuswebuserid
        -- 管理员信息
        LEFT JOIN (
            SELECt 
                tmp_cw.cuswebuserid,
                tmp_cw.name,
                tmp_cw.login_name,
                tmp_cw.core_channel_id
            FROM (
                SELECt 
                    cuswebuserid,
                    name,
                    login_name,
                    core_channel_id,
                    login_pwd,
                    RANK() OVER(PARTITION by cuswebuserid ORDER by pt DESC ) rk
                FROM nanyan_space.ods_hip_b2b_customer_webuser
            ) tmp_cw
            JOIN (
                SELECt 
                    id,
                    admin_username,
                    admin_password,
                    RANK() OVER(PARTITION by id ORDER by pt DESC ) rk
                FROM nanyan_space.ods_core_channel 
            ) tmp_cc
            ON tmp_cw.login_name=tmp_cc.admin_username 
            AND tmp_cw.login_pwd=tmp_cc.admin_password
            and core_channel_id=id
            WHERe tmp_cc.rk=1 and tmp_cw.rk=1
        )t5
        on t1.core_channel_id = t5.core_channel_id
    )new
    on old.user_id=new.user_id
)
数据的动态分区

然后再进行筛选数据发往不同的分区,通过nvl()函数,得到最新的数据,发往99990101分区;通过where xxx in (子查询变化的数据)来确定是否为发生变化的用户,变化了的用户发往昨天的分区。根据pt字段进行分区的划分。

INSERT OVERWRITE TABLE dim_channel_webusers_info PARTITION(pt)
SELECt 
    nvl(new_user_id,old_user_id),
    nvl(new_user_name,old_user_name),
    nvl(new_login_name,old_login_name),
    nvl(new_status,old_status),
    nvl(new_user_sfid,old_user_sfid),
    nvl(new_create_date,old_create_date),
    nvl(new_channel_id,old_channel_id),
    nvl(new_channel_name,old_channel_name),
    nvl(new_channel_sfid,old_channel_sfid),
    nvl(new_channel_system,old_channel_system),
    nvl(new_nymanager,old_nymanager),
    nvl(new_referee_id,old_referee_id),
    nvl(new_referee_name,old_referee_name),
    nvl(new_referee_login_name,old_referee_login_name),
    nvl(new_manager_id,old_manager_id),
    nvl(new_manager_name,old_manager_name),
    nvl(new_manager_login_name,old_manager_login_name),
    new_start_date,
    new_end_date,
    '99990101' pt
from tmp
union ALL 
SELECt 
    old_user_id,
    old_user_name,
    old_login_name,
    old_status,
    old_user_sfid,
    old_create_date,
    old_channel_id,
    old_channel_name,
    old_channel_sfid,
    old_channel_system,
    old_nymanager,
    old_referee_id,
    old_referee_name,
    old_referee_login_name,
    old_manager_id,
    old_manager_name,
    old_manager_login_name,
    old_start_date,
    TO_DATE('${bizdate}','yyyymmdd') as end_date,
    '${bizdate}' as pt
FROM tmp 
--只有有表有变化就进行筛选
where old_user_id is not NULL and new_user_id is not NULL 
and (
    old_user_id IN (
        SELECt cuswebuserid
        FROM nanyan_space.ods_hip_b2b_customer_webuser
        where pt = '${bizdate}'
    )
    or old_channel_id IN (
        SELECt id
        FROM nanyan_space.ods_core_channel 
        where pt = '${bizdate}'
    )
    or (old_channel_system=4 and old_user_id in (
        SELECt cuswebuserid
        FROM nanyan_space.ods_hip_b2b_webuser_nymanager
        where pt = '${bizdate}'
        )
    )
)
;

旧表数据必须用用户主键 is not null 来进行判断!因为有可能是今天刚刚创建的一条信息,但之前没有这条数据的记录,它本身就是最新的数据,那么自然就没有之前的用户主键;换句话说只有新表有数据,且旧表有数据才是真正发生改变的数据!!

这样就完成了我们的每日装载模块。

总结

拉链表,麻烦归麻烦,但是我们只要掌握了思路,举一反三,那么不管是什么类型的拉链表,下次也能轻松解决。这次的思路就是将多表化为一表来解决的。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716968.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存