第三章 基于dolphinscheduler的明细模型设计之一致性维表DIM建设

第三章 基于dolphinscheduler的明细模型设计之一致性维表DIM建设,第1张

第三章 基于dolphinscheduler的明细模型设计之一致性维表DIM建设 1、DIM层概述 1.1、DIM定义
  • DIM:维表的公共层,贯穿数据模型的各个层次

(1)贯穿数据模型的各个层次,保留业务过程中的实体信息

(2)用来关联事实表将数据宽表化

1.2、DIM数据组成
  • DIM:存放维度数据&基础数据

(1)维度数据:一般指一些业务状态,代码的解释表(即码表)

(2)基础数据:存储业务需要关联的基础数据

2、DIM层数据规范 2.1、DIM数据规范

(1)最基础要求:不同字段的含义必须不同,相同字段的含义必须相同;

(2)DIM较多的是和DWS生成ADS;

2.2、DIM表结构

(1)表结构:实体(主键)+维度

(2)命名:dim_表名__di

  • 前缀统一为dim,后缀统一(小时级别为h,天级别为1d,月级别为1m)

(3)生命周期:360天

3、DIM开发样例(大维表)
  • 大维表:业务侧要求每天一个分区装维度数据
绪论、需求说明
  • 将ODS维表数据全量抽取到DIM层,按天分区保留每天最新的维度数据
create table T_YYBZB_TGH_BANKINFO
(
  id        int(8),
  bank_id   int(8),
  bank_name varchar(200),
  source_date varchar(200)
);
insert into T_YYBZB_TGH_BANKINFO (ID, BANK_ID, BANK_NAME)values (11, 11, '工商银行(广州)','20210101');
3.1、创建hive目标表

(1)创建hive目标表

create table dim.dim_t_yybzb_tgh_bankinfo_di
(
  id        int,
  bank_id   int,
  bank_name string
)
partitioned by (`pt` string)
row format delimited fields terminated by ','

(2)配置SQL组件创建表

  • 配置Spark数据源

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YTKA16fy-1639123208275)(C:UsersPROTHAppDataRoamingTyporatypora-user-imagesimage-20211208164131511.png)]

(3)部署上线

(4)调试运行

  • 运行结果日志

3.2、创建表分区

(1)编写SQL脚本

alter table dim.dim_t_yybzb_tgh_bankinfo_di drop if exists partition(pt=${pt})

alter table dim.dim_t_yybzb_tgh_bankinfo_di add if not exists  partition (pt=${pt})

(2)配置SQL组件

  • 若分区存在则删除

  • 创建表分区

3.3、Pyspark脚本导入数据

(1)编写Python脚本

  • 编写主程序
#!/usr/bin/python
# encoding:utf-8

from pyspark import SparkContext
from pyspark.sql import HiveContext
import sys
import re
import time

import importlib
importlib.reload(sys)

#进行相关配置项定义
def dropframe(rows):
    x = ''
    for s in rows:
        x += str(s) + ','
    x = x[:-1]
    return x
    
input_pt = sys.argv[1]

output_feature_hdfs_path = '/user/hive/warehouse/dim.db/dim_t_yybzb_tgh_bankinfo_di/' + '20211213' + '/'

key_cal = 'ods_t_yybzb_tgh_bankinfo_di'

#spark-job编写
sc = SparkContext(appName=key_cal + "_sql_daily")

hsqlContext = HiveContext(sc)

midsqlDf = hsqlContext.sql("select "
                           "id,"
                           "bank_id,"
                           "bank_name "
                           "from ods.ods_t_yybzb_tgh_bankinfo_di "
                           "where pt = {pt}".format(pt=input_pt))


#/user/hive/warehouse/dim.db/dim_t_yybzb_tgh_bankinfo_di/20190724
save_path = output_feature_hdfs_path
keySeconds = midsqlDf.rdd.map(lambda row: dropframe(row))
keySeconds.repartition(5).saveAsTextFile(save_path)

sc.stop()

(2)配置Spark组件

/opt/module/spark-3.1.2/bin/spark-submit --master local --queue default --driver-memory 512m  --driver-cores 2 --num-executors 4 --executor-memory 1G --executor-cores 2 --conf spark.default.parallelism=4  test1.py
  • 将脚本上传到资源中心

  • 配置Spark组件
程序类型:python
程序参数:${pt}

(3)参数传递

  • 设置全局参数为T-1

(4)定时任务设置

  • 设置时间:固定每天6点跑批

  • 原因未明:日志报错spark提交路径出错,but命令行提交能成功,待解决
3.4、SQL脚本导入数据

(1)编写SQL脚本

insert overwrite table dim.dim_t_yybzb_tgh_bankinfo_di partition (pt=${pt})
select id,bank_id,bank_name
from ods.ods_t_yybzb_tgh_bankinfo_di

(2)配置SQL组件

  • 参数配置:${pt}

(3)调试运行

  • 上线部署后运行

  • 查看hive表分区文件

(4)定时调度

  • 每天6点定时调度

4、DIM开发样例(小维表)
  • 小维表规定:小维表(配置表)只能删除/新增,不能更新数据
绪论、需求说明
  • 将ODS维表数据全量抽取到DIM层,按天分区保留每天最新的维度数据
create table T_YYBZB_TGH_BANKINFO
(
  id        int(8),
  bank_id   int(8),
  bank_name varchar(200),
  source_date varchar(200)
);
insert into T_YYBZB_TGH_BANKINFO (ID, BANK_ID, BANK_NAME)values (11, 11, '工商银行(广州)','20210101');
4.1、创建hive目标表

(1)编写建表SQL脚本

  • 增加delete_flag标记,适用于源系统存在硬删除状况
create table dim.dim_t_yybzb_tgh_bankinfo3_di
(
  id        int,
  bank_id   int,
  bank_name string,
  delete_flag int
)
row format delimited fields terminated by ','

#创建中间临时表
create table tmp.tmp_t_yybzb_tgh_bankinfo3_di
(
  id        int,
  bank_id   int,
  bank_name string,
  delete_flag int
)
partitioned by (`pt` string)
row format delimited fields terminated by ','

(2)配置SQL组件

  • 数据源:spark
  • sql类型:非查询

(3)部署上线

(4)运行脚本

  • 进入画布

  • 运行建表脚本

(5)查看日志

4.2、初始化导入数据

(1)编写初始化脚本

insert overwrite table dim.dim_t_yybzb_tgh_bankinfo3_di 
select id,bank_id,bank_name,'1' as delete_flag
from ods.ods_t_yybzb_tgh_bankinfo_di

(2)配置SQL组件

  • 数据源:Spark
  • sql类型:非查询

(3)部署上线

(4)运行脚本

  • 进入画布

  • 运行脚本

4.3、增量更新数据

(1)创建临时表分区

alter table tmp.tmp_t_yybzb_tgh_bankinfo3_di drop if exists partition(pt=${pt})

alter table tmp.tmp_t_yybzb_tgh_bankinfo3_di add if not exists  partition (pt=${pt})

(2)临时表用来装载计算后的数据

insert overwrite table tmp.tmp_t_yybzb_tgh_bankinfo3_di partition(pt=${pt})
select 
if(t1.bank_id is not null,t1.id,t2.id) as id,
if(t1.bank_id is not null,t1.bank_id,t2.bank_id) as bank_id,
if(t1.bank_id is not null,t1.bank_name,t2.bank_name) as bank_name,
if((t1.bank_id is not null) and (t2.bank_id is null),0,1) as delete_flag
from dim.dim_t_yybzb_tgh_bankinfo3_di t1 full join ods.ods_t_yybzb_tgh_bankinfo_di t2
on t1.bank_id = t2.bank_id
where t2.pt=${pt}
  • 注意事项:中间结果数据一般建分区表保留时间一周,便于回滚

(3)将临时表数据导入目标表

insert overwrite table dim.dim_t_yybzb_tgh_bankinfo3_di 
select id,bank_id,bank_name,delete_flag
from tmp.tmp_t_yybzb_tgh_bankinfo_di
where pt=${pt}

(4)配置SQL组件

①删除表分区后创建表分区

②配置任务前置依赖(依赖于ETL作业)

③配置SQL组件一,将计算后的数据插入到临时表中

④配置SQL组件二,将临时表中更新数据插入到DIW表中

⑤配置全局参数

  • 在保存画布后,设置全局进行全局参数配置

  • 整体任务流程图如下

(5)调试运行

①将JOB部署上线

②进入画布

③调试运行

④查看运行结果

(6)定时调度

  • 调度周期:每天凌晨6点开始跑数

  • 定时管理查看


5、常见问题 5.1、Spark数据源配置

(1)启动Spark thriftserver

sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --master yarn --driver-cores 1 --driver-memory 1G --executor-cores 1 --executor-memory 1G -num-executors 2
  • 运行结果

(2)dolphinscheduler配置Spark数据源

  • 数据源中心 --> 配置数据源

5.2、Spark任务提交报错

(1)报错现状

  • 报错现状:Cannot run program “python3”: error=2, 没有那个文件或目录

(2)报错原因

  • 系统只有自带的python2,没有python3,需要安装

(3)解决方案

  • 在linux下安装python3
5.3、/bin/spark-submit: 没有那个文件或目录

(1)问题现状

(2)问题原因

  • dolphinscheduler_env.sh文件未正确配置spark路径

(3)解决方案

vim dolphinscheduler_env.sh

#添加Spark环境变量,根据个人电脑路径添加
#SPARK_HOME
export SPARK_HOME=/opt/module/spark-3.1.2
export PATH=$PATH:$SPARK_HOME/bin
export PATN=$PATH:$SPARK_HOME/sbin
5.4、hive NullPointerException null

(1)问题原因

  • 报错:空指针异常

(2)问题原因

  • hive的远程服务出错

(3)解决方案

  • 重启hiveserver2服务
5.5、shell调PySpark出错

(1)问题现状

Exception in thread "main" java.io.IOException: Cannot run program "python3": error=2, 没有那个文件或目录

(2)问题原因

  • pyspark脚本的解释器路径为 /usr/bin/python
  • 实际环境如下

(3)解决方案

  • 创建python3的软连接
sudo ln -s /opt/module/Python-3.6.5/ python3
  • 修改脚本解释器路径为python3
#!/usr/bin/python

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5656132.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存