- DIM:维表的公共层,贯穿数据模型的各个层次
(1)贯穿数据模型的各个层次,保留业务过程中的实体信息
(2)用来关联事实表将数据宽表化
1.2、DIM数据组成- DIM:存放维度数据&基础数据
(1)维度数据:一般指一些业务状态,代码的解释表(即码表)
(2)基础数据:存储业务需要关联的基础数据
2、DIM层数据规范 2.1、DIM数据规范(1)最基础要求:不同字段的含义必须不同,相同字段的含义必须相同;
(2)DIM较多的是和DWS生成ADS;
2.2、DIM表结构(1)表结构:实体(主键)+维度
(2)命名:dim_表名__di
- 前缀统一为dim,后缀统一(小时级别为h,天级别为1d,月级别为1m)
(3)生命周期:360天
3、DIM开发样例(大维表)- 大维表:业务侧要求每天一个分区装维度数据
- 将ODS维表数据全量抽取到DIM层,按天分区保留每天最新的维度数据
create table T_YYBZB_TGH_BANKINFO ( id int(8), bank_id int(8), bank_name varchar(200), source_date varchar(200) ); insert into T_YYBZB_TGH_BANKINFO (ID, BANK_ID, BANK_NAME)values (11, 11, '工商银行(广州)','20210101');3.1、创建hive目标表
(1)创建hive目标表
create table dim.dim_t_yybzb_tgh_bankinfo_di ( id int, bank_id int, bank_name string ) partitioned by (`pt` string) row format delimited fields terminated by ','
(2)配置SQL组件创建表
- 配置Spark数据源
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YTKA16fy-1639123208275)(C:UsersPROTHAppDataRoamingTyporatypora-user-imagesimage-20211208164131511.png)]
(3)部署上线
(4)调试运行
- 运行结果日志
(1)编写SQL脚本
alter table dim.dim_t_yybzb_tgh_bankinfo_di drop if exists partition(pt=${pt}) alter table dim.dim_t_yybzb_tgh_bankinfo_di add if not exists partition (pt=${pt})
(2)配置SQL组件
- 若分区存在则删除
- 创建表分区
(1)编写Python脚本
- 编写主程序
#!/usr/bin/python # encoding:utf-8 from pyspark import SparkContext from pyspark.sql import HiveContext import sys import re import time import importlib importlib.reload(sys) #进行相关配置项定义 def dropframe(rows): x = '' for s in rows: x += str(s) + ',' x = x[:-1] return x input_pt = sys.argv[1] output_feature_hdfs_path = '/user/hive/warehouse/dim.db/dim_t_yybzb_tgh_bankinfo_di/' + '20211213' + '/' key_cal = 'ods_t_yybzb_tgh_bankinfo_di' #spark-job编写 sc = SparkContext(appName=key_cal + "_sql_daily") hsqlContext = HiveContext(sc) midsqlDf = hsqlContext.sql("select " "id," "bank_id," "bank_name " "from ods.ods_t_yybzb_tgh_bankinfo_di " "where pt = {pt}".format(pt=input_pt)) #/user/hive/warehouse/dim.db/dim_t_yybzb_tgh_bankinfo_di/20190724 save_path = output_feature_hdfs_path keySeconds = midsqlDf.rdd.map(lambda row: dropframe(row)) keySeconds.repartition(5).saveAsTextFile(save_path) sc.stop()
(2)配置Spark组件
/opt/module/spark-3.1.2/bin/spark-submit --master local --queue default --driver-memory 512m --driver-cores 2 --num-executors 4 --executor-memory 1G --executor-cores 2 --conf spark.default.parallelism=4 test1.py
- 将脚本上传到资源中心
- 配置Spark组件
程序类型:python 程序参数:${pt}
(3)参数传递
- 设置全局参数为T-1
(4)定时任务设置
- 设置时间:固定每天6点跑批
- 原因未明:日志报错spark提交路径出错,but命令行提交能成功,待解决
(1)编写SQL脚本
insert overwrite table dim.dim_t_yybzb_tgh_bankinfo_di partition (pt=${pt}) select id,bank_id,bank_name from ods.ods_t_yybzb_tgh_bankinfo_di
(2)配置SQL组件
- 参数配置:${pt}
(3)调试运行
- 上线部署后运行
- 查看hive表分区文件
(4)定时调度
- 每天6点定时调度
- 小维表规定:小维表(配置表)只能删除/新增,不能更新数据
- 将ODS维表数据全量抽取到DIM层,按天分区保留每天最新的维度数据
create table T_YYBZB_TGH_BANKINFO ( id int(8), bank_id int(8), bank_name varchar(200), source_date varchar(200) ); insert into T_YYBZB_TGH_BANKINFO (ID, BANK_ID, BANK_NAME)values (11, 11, '工商银行(广州)','20210101');4.1、创建hive目标表
(1)编写建表SQL脚本
- 增加delete_flag标记,适用于源系统存在硬删除状况
create table dim.dim_t_yybzb_tgh_bankinfo3_di ( id int, bank_id int, bank_name string, delete_flag int ) row format delimited fields terminated by ',' #创建中间临时表 create table tmp.tmp_t_yybzb_tgh_bankinfo3_di ( id int, bank_id int, bank_name string, delete_flag int ) partitioned by (`pt` string) row format delimited fields terminated by ','
(2)配置SQL组件
- 数据源:spark
- sql类型:非查询
(3)部署上线
(4)运行脚本
- 进入画布
- 运行建表脚本
(5)查看日志
(1)编写初始化脚本
insert overwrite table dim.dim_t_yybzb_tgh_bankinfo3_di select id,bank_id,bank_name,'1' as delete_flag from ods.ods_t_yybzb_tgh_bankinfo_di
(2)配置SQL组件
- 数据源:Spark
- sql类型:非查询
(3)部署上线
(4)运行脚本
-
进入画布
-
运行脚本
(1)创建临时表分区
alter table tmp.tmp_t_yybzb_tgh_bankinfo3_di drop if exists partition(pt=${pt}) alter table tmp.tmp_t_yybzb_tgh_bankinfo3_di add if not exists partition (pt=${pt})
(2)临时表用来装载计算后的数据
insert overwrite table tmp.tmp_t_yybzb_tgh_bankinfo3_di partition(pt=${pt}) select if(t1.bank_id is not null,t1.id,t2.id) as id, if(t1.bank_id is not null,t1.bank_id,t2.bank_id) as bank_id, if(t1.bank_id is not null,t1.bank_name,t2.bank_name) as bank_name, if((t1.bank_id is not null) and (t2.bank_id is null),0,1) as delete_flag from dim.dim_t_yybzb_tgh_bankinfo3_di t1 full join ods.ods_t_yybzb_tgh_bankinfo_di t2 on t1.bank_id = t2.bank_id where t2.pt=${pt}
- 注意事项:中间结果数据一般建分区表保留时间一周,便于回滚
(3)将临时表数据导入目标表
insert overwrite table dim.dim_t_yybzb_tgh_bankinfo3_di select id,bank_id,bank_name,delete_flag from tmp.tmp_t_yybzb_tgh_bankinfo_di where pt=${pt}
(4)配置SQL组件
①删除表分区后创建表分区
②配置任务前置依赖(依赖于ETL作业)
③配置SQL组件一,将计算后的数据插入到临时表中
④配置SQL组件二,将临时表中更新数据插入到DIW表中
⑤配置全局参数
- 在保存画布后,设置全局进行全局参数配置
- 整体任务流程图如下
(5)调试运行
①将JOB部署上线
②进入画布
③调试运行
④查看运行结果
(6)定时调度
-
调度周期:每天凌晨6点开始跑数
-
定时管理查看
(1)启动Spark thriftserver
sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --master yarn --driver-cores 1 --driver-memory 1G --executor-cores 1 --executor-memory 1G -num-executors 2
- 运行结果
(2)dolphinscheduler配置Spark数据源
- 数据源中心 --> 配置数据源
(1)报错现状
- 报错现状:Cannot run program “python3”: error=2, 没有那个文件或目录
(2)报错原因
- 系统只有自带的python2,没有python3,需要安装
(3)解决方案
- 在linux下安装python3
(1)问题现状
(2)问题原因
- dolphinscheduler_env.sh文件未正确配置spark路径
(3)解决方案
vim dolphinscheduler_env.sh #添加Spark环境变量,根据个人电脑路径添加 #SPARK_HOME export SPARK_HOME=/opt/module/spark-3.1.2 export PATH=$PATH:$SPARK_HOME/bin export PATN=$PATH:$SPARK_HOME/sbin5.4、hive NullPointerException null
(1)问题原因
- 报错:空指针异常
(2)问题原因
- hive的远程服务出错
(3)解决方案
- 重启hiveserver2服务
(1)问题现状
Exception in thread "main" java.io.IOException: Cannot run program "python3": error=2, 没有那个文件或目录
(2)问题原因
- pyspark脚本的解释器路径为 /usr/bin/python
- 实际环境如下
(3)解决方案
- 创建python3的软连接
sudo ln -s /opt/module/Python-3.6.5/ python3
- 修改脚本解释器路径为python3
#!/usr/bin/python
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)