基于binlog构建拉链表

基于binlog构建拉链表,第1张

前言:缓慢变化知识点回顾

缓慢变化分三种类型:

type1:当数据发生变化时,直接覆盖旧值

type2:当数据发生变化时,新增一行

这里的DWID 也就是我们日常项目使用的代理键CustomerKey

type3:当数据发生变化时,新增一列

增量拉链表

        1.适用范围:         1).有增量字段可以识别数据更新,且业务数据变动时,增量时间会更新;2).数据不存在物理删除

        2.实现方式:         2).基于mysql表的增量数据实现,此处我们假设增量数据存放于stg_a001_table1,全量表存放于ods_a001_table1,uuid为构造字段,具体逻辑为,md5(concat(nvl(a),nvl(b)...)),也就是拉链比对字段空处理后拼接,再MD5的值

则具体实现可体现为:

select

    oat.ods_start_dt,

    if(sat.id is not null and oat.uuid<>sat.uuid,'${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新数据进行关链

    oat.uuid,

    oat.其它字段

from ods_a001_table1 oat

left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

union all

select

    '${ymd}' as ods_start_dt,

    '99991231' as ods_end_dt, -- 新增和更新数据进行开链

    sat.uuid,

    sat.其它字段

from stg_a001_table1 sat

where sat.ymd='${ymd}'

全量拉链表:

 1.适用范围:         1).无增量字段可以识别数据更新;2).数据存在物理删除

 2.实现方式:         2).基于mysql表的增量数据实现,此处我们假设同步全量数据存放于stg_a001_table1,全量表存放于ods_a001_table1

select

    oat.ods_start_dt,

    case when sat.id is null or (sat.id is not null and oat.uuid<>sat.uuid),'${ymd}',oat.ods_end_dt) as ods_end_dt, -- 对更新和删除进行关链

    oat.uuid,

    oat.其它字段

from ods_a001_table1 oat

left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

union all

select

    '${ymd}' as oat.ods_start_dt,

    '99991231' as ods_end_dt, -- 对插入和更新的开链

    sat.uuid,

    sat.其它字段

from stg_a001_table1 sat

left join ods_a001_table1 oat on oat.id=sat.id and oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'-- stg_a001_table1最好设置为分区表

where sat.ymd='${ymd}'

and (oat.id is null or (oat.id is not null and oat.uuid<>sat.uuid))

binlog的数据我们通过canel已经写入kafka,所以此处只需要通过flume去实时消费对应的topic并写入hdfs即可。

1).创建外表tmp_stg_a001_table1映射到hdfs上的增量数据存储路径

2).通过row_number函数,按照主键,对数据进行去重,只保留最后一条数据,将数据写入表stg_a001_table1

select

    oat.ods_start_dt,

    if((sat.id is not null and oat.uuid<>sat.uuid) or sat.binlog_type='D','${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新和删除数据进行关链

    oat.uuid,

    oat.其它字段

from ods_a001_table1 oat

left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表

where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'

union all

select

    '${ymd}' as ods_start_dt,

    '99991231' as ods_end_dt, -- 新增和更新数据进行开链

    sat.uuid,

    sat.其它字段

from stg_a001_table1 sat

where sat.ymd='${ymd}' and sat.binlog_type<>'D' -- 当日新增且当日删除的数据无需处理

拉链表取一个月的数据的 *** 作如下:

1、可以使用时间戳来过滤出在一个月内的数据。

2、可以利用特定的SQL命令来查询指定时间段内的数据。

3、还可以使用代码来编写算法,将一个月内的数据提取出来。

Q1

什么拉链表?

拉链表,记录每条信息的生命周期,一旦一条记录的生命周期结束,就要重新开始一条新的记录,并把当前日期放入生效的开始日期;

如果当前信息至今有效,在生效结束日期中填入一个极大值(一般为9999-99-99)

Q2

为什么要建立拉链表?

拉链表适合于:数据会发生变化,但是大部分是不变的;

比如:订单信息从未支付、已支付、未发货、已完成等经历了一周,大部分时间是不变化的,只有状态和状态发生时间会有更改。如果数据量有一定的规模,无法按照每日全量的方式保存。

Q3

如何使用拉链表?

通过,生效开始日期<=某个日期 且 生效结束日期>=某个日期,能够得到某个时间点的数据全量切片

1.拉链表数据

2.例如获取2019-01-01的历史切片:SELECT * FROM order_info where start_date<='2019-01-01' AND end_date>='2019-01-01'

3.例如获取2019-01-02的历史切片:SELECT * FROM order_info where start_date<='2019-01-02' AND end_date>='2019-01-02'

Q4

拉链表的制作过程?

订单当日全部数据和MySQL中每天变化的数据拼接在一起,形成一个新的临时拉链表数据。用临时拉链表覆盖旧的拉链表数据。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/7362117.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-04
下一篇 2023-04-04

发表评论

登录后才能评论

评论列表(0条)

保存