第四章 基于dolphinscheduler的公共汇总模型(DWS)建设

第四章 基于dolphinscheduler的公共汇总模型(DWS)建设,第1张

第四章 基于dolphinscheduler的公共汇总模型(DWS)建设 1、DWS层概述 1.1、DWS层定义
  • 数据汇总层:进行轻度聚合,不跨主题域,提升公共指标的复用性。
1.2、DWS数据组合

(1)DWS层数据对于DWD层进行轻度汇总,比如sum或count *** 作。

(2)DWS层数据业务场景不会跨主题域,且表内指标都是相同统计范围,比如最近1天,或者累加至今,原则上一般不做维度冗余。

1.3、DWS层表结构

(1)表结构:实体+度量

(2)命名规则:dws_业务表名__di

(3)生命周期:182层

2、DWS层数据处理 2.1、进行轻度聚合形成宽表模型
  • 根据不同维度组成多个宽表。

(1)最细粒度:维度1,维度2,维度3,维度4…

(2)宽表力度:维度1,维度2,维度3

2.2、DWS层数据应用

(1)主要对外开放的数据,也是使用频率最高的部分,该层的核心在于计算和分许

(2)将上册数据整合成通用数据,提供指标汇总层

3、DWS实例(用户全量表) 绪论
  • 需求:统计小说用户全量表,日更新
create table ods.ods_ios_di(
p_date string 
,etl_time string 
,server_ip      string  
,log_time       string
,guid           string
,search_words    string
,age            String 
,city           String 
,phone         string
,appid          string
,msgid          string
,httpcode       string
,httpcontent    string
,msg            string
)
partitioned by (`pt` string)
row format delimited fields terminated by ',';
3.1、创建Hive目标表

(1)编写建表脚本

create table dws.dws_novel_guid(
guid string
)
partitioned by (`pt` string)
row format delimited fields terminated by ',';

(2)配置SQL组件创建表

  • 数据源:hive::dws
  • sql类型:非查询

(3)上线部署

  • 上线后进入画布

(4)运行脚本

3.2、编写业务逻辑进行数据测试

(1)编写业务逻辑脚本

insert overwrite table dws.dws_novel_guid partition (pt=${pt})
select if(t2.guid is not null,t2.guid,t1.guid) as guid 
from (
select guid
from ods.ods_ios_di
where pt = ${pt}
group by guid
) t1
full join
(select guid 
 from dws_novel_guid
 where pt = ${pt}-1
group by guid
)
on t1.guid = t2.guid

(2)配置SQL组件创建表

  • 数据源:hive::dws
  • sql类型:非查询
  • 自定义参数: p t 、 {pt} 、 pt、{pt_one}
  • 前置SQL
alter table dws.dws_novel_guid drop if exists partition(pt=${pt})
alter table dws.dws_novel_guid add if not exists  partition (pt=${pt})

(4)部署上线

  • 上线后进入画布

(5)运行测试

  • 进入画布,运行数据链路的补充节点(前置任务已跑多日)

  • 补数,跑一周数据后进行对数
    • 串行执行:资源不足时使用
    • 并行执行:资源充足时使用

3.3、定时调度

(1)调度周期

  • 每天凌晨6点调度前一天数据

(2)失败策略

  • 失败策略:结束

(3)通知策略

  • 通知策略:失败发

4、DWS实例(指标统计表) 绪论
  • 需求:统计每个人的小说阅读pv,是否新用户
create table dwd.dwd_ios_di(
p_date string 
,etl_time string 
,server_ip      string  
,log_time       string
,guid           string
,search_words    string
,age            String 
,city           String 
,phone         string
,appid          string
,msgid          string
,httpcode       string
,httpcontent    string
,msg            string
)
partitioned by (`pt` string)
row format delimited fields terminated by ',';
4.1、创建Hive表

(1)创建Hive目标表

CREATE TABLE dws.dws_npvel_guid_pv_di(
 p_date    string  COMMENT '分区日期',
 platform  string  COMMENT '手机平台',
 version   string  COMMENT '手机版本',
 read_pv   string  COMMENT '小说pv',
 channel_id string COMMENT '小说渠道',
 top_channel string COMMENT '一级渠道名称',
 second_channel string COMMENT '二级渠道名称',
 thrid_channel string COMMENT '三级渠道名称',
 is_new    string  COMMENT '是否新用户',
)
partitioned by (`pt` string)
row format delimited fields terminated by ',';

(2)配置SQL组件创建表

  • 数据源:hive::dws
  • sql类型:非查询

(3)上线部署

  • 上线后进入画布

(4)运行脚本

4.2、编写业务逻辑进行数据测试

(1)编写业务逻辑

insert overwrite table dws.dws_npvel_guid_pv_di partition (pt=${pt})
select 
${pt} as p_date
,t1.guid as guid
,if(split(t1.phone,'-')[0] in ('小米','华为'),'android','ios') as platform
,nvl(split(t1.phone,'-')[0],'未知') as version
,sum(guid_novel_pv) as guid_novel_pv
,channel_id as channel_id
,top_channel_name
,second_channel_name
,thrid_channel_name
,case when t2.guid is not null then 1 else 0 end as is_new
case when guid is not null then 1 else 0 end as is is_new
from 
(
guid
,sum(guid_pv) as guid_novel_pv
,phone 
,appid as channel_id
,split(appid,'_')[0] as top_channel
,split(appid,'_')[1] as second_channel
,split(appid,'_')[2] as thrid_channel
from dwd.dwd_ios_di
where pt = ${pt}
 and appid in ('1_2_3_4','2_3_4_5','3_4_5_6','4_5_6_7')
group by 
 guid,phone,appid,split(appid,'_')[0],split(appid,'_')[1],split(appid,'_')[2]
) t1
full join 
(
select guid 
--小用户全量表
from dws.dwd_novel_guid
where pt=${pt_one}
  and guid RLIKE '^[4-9A-Z]{24}$' 
) t2
on  t1.guid = t2.guid
left join 
(
--  渠道维表
select top_channel,top_channel_name,second_channel,second_channel_name, thrid_channel,thrid_channel_name
from dim.dim_novel_channel_di
where pt = ${pt} 
) t3
on t1.top_channel = t3.top_channel and t1.second_channel = t3.second_channel and t1.thrid_channel = t3.thrid_channel

(2)配置SQL组件

  • 数据源:hive::dws
  • sql类型:非查询
  • 自定义参数: p t 、 {pt} 、 pt、{pt_one}
  • 前置SQL
alter table dws.dws_npvel_guid_pv_di drop if exists partition(pt=${pt})
alter table dws.dws_npvel_guid_pv_di add if not exists  partition (pt=${pt})

(3)配置依赖组件

  • 添加维表依赖,依赖于T-1的数据

  • 添加全量表依赖,注意全量表依赖于T-2的数据

  • 配置起始任务组件,方便运行调试任务流

(4)部署上线

  • 上线后进入画布

(5)运行测试

  • 进入画布,运行数据链路的起始节点

  • 补数,跑一周数据后进行对数
    • 串行执行:资源不足时使用
    • 并行执行:资源充足时使用

4.3、定时调度

(1)调度周期

  • 每天凌晨6点调度前一天数据

(2)失败策略

  • 失败策略:结束

(3)通知策略

  • 通知策略:失败发

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5679081.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存