缓慢变化分三种类型:
type1:当数据发生变化时,直接覆盖旧值
type2:当数据发生变化时,新增一行
这里的DWID 也就是我们日常项目使用的代理键CustomerKey
type3:当数据发生变化时,新增一列
增量拉链表:
1.适用范围: 1).有增量字段可以识别数据更新,且业务数据变动时,增量时间会更新;2).数据不存在物理删除
2.实现方式: 2).基于mysql表的增量数据实现,此处我们假设增量数据存放于stg_a001_table1,全量表存放于ods_a001_table1,uuid为构造字段,具体逻辑为,md5(concat(nvl(a),nvl(b)...)),也就是拉链比对字段空处理后拼接,再MD5的值
则具体实现可体现为:
select
oat.ods_start_dt,
if(sat.id is not null and oat.uuid<>sat.uuid,'${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新数据进行关链
oat.uuid,
oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
'${ymd}' as ods_start_dt,
'99991231' as ods_end_dt, -- 新增和更新数据进行开链
sat.uuid,
sat.其它字段
from stg_a001_table1 sat
where sat.ymd='${ymd}'
全量拉链表:
1.适用范围: 1).无增量字段可以识别数据更新;2).数据存在物理删除
2.实现方式: 2).基于mysql表的增量数据实现,此处我们假设同步全量数据存放于stg_a001_table1,全量表存放于ods_a001_table1
select
oat.ods_start_dt,
case when sat.id is null or (sat.id is not null and oat.uuid<>sat.uuid),'${ymd}',oat.ods_end_dt) as ods_end_dt, -- 对更新和删除进行关链
oat.uuid,
oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
'${ymd}' as oat.ods_start_dt,
'99991231' as ods_end_dt, -- 对插入和更新的开链
sat.uuid,
sat.其它字段
from stg_a001_table1 sat
left join ods_a001_table1 oat on oat.id=sat.id and oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'-- stg_a001_table1最好设置为分区表
where sat.ymd='${ymd}'
and (oat.id is null or (oat.id is not null and oat.uuid<>sat.uuid))
binlog的数据我们通过canel已经写入kafka,所以此处只需要通过flume去实时消费对应的topic并写入hdfs即可。
1).创建外表tmp_stg_a001_table1映射到hdfs上的增量数据存储路径
2).通过row_number函数,按照主键,对数据进行去重,只保留最后一条数据,将数据写入表stg_a001_table1
select
oat.ods_start_dt,
if((sat.id is not null and oat.uuid<>sat.uuid) or sat.binlog_type='D','${ymd}',oat.ods_end_dt) as ods_end_dt -- 对更新和删除数据进行关链
oat.uuid,
oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好设置为分区表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
'${ymd}' as ods_start_dt,
'99991231' as ods_end_dt, -- 新增和更新数据进行开链
sat.uuid,
sat.其它字段
from stg_a001_table1 sat
where sat.ymd='${ymd}' and sat.binlog_type<>'D' -- 当日新增且当日删除的数据无需处理
拉链表取一个月的数据的 *** 作如下:1、可以使用时间戳来过滤出在一个月内的数据。
2、可以利用特定的SQL命令来查询指定时间段内的数据。
3、还可以使用代码来编写算法,将一个月内的数据提取出来。
Q1
什么拉链表?
拉链表,记录每条信息的生命周期,一旦一条记录的生命周期结束,就要重新开始一条新的记录,并把当前日期放入生效的开始日期;
如果当前信息至今有效,在生效结束日期中填入一个极大值(一般为9999-99-99)
Q2
为什么要建立拉链表?
拉链表适合于:数据会发生变化,但是大部分是不变的;
比如:订单信息从未支付、已支付、未发货、已完成等经历了一周,大部分时间是不变化的,只有状态和状态发生时间会有更改。如果数据量有一定的规模,无法按照每日全量的方式保存。
Q3
如何使用拉链表?
通过,生效开始日期<=某个日期 且 生效结束日期>=某个日期,能够得到某个时间点的数据全量切片
1.拉链表数据
2.例如获取2019-01-01的历史切片:SELECT * FROM order_info where start_date<='2019-01-01' AND end_date>='2019-01-01'
3.例如获取2019-01-02的历史切片:SELECT * FROM order_info where start_date<='2019-01-02' AND end_date>='2019-01-02'
Q4
拉链表的制作过程?
订单当日全部数据和MySQL中每天变化的数据拼接在一起,形成一个新的临时拉链表数据。用临时拉链表覆盖旧的拉链表数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)