电商大数据实时数仓

电商大数据实时数仓,第1张

电商大数据实时数仓 实时数据仓库 1、数据采集

1、在mysql中创建数据库gma

2、创建所有的表

init_mysql_table.sql

3、开启canal动态topic功能

vim /usr/local/soft/canal/conf/example/instance.properties

# 监控gma数据库,不同的表发送到表名的topic上

canal.mq.dynamicTopic=gma…*

4、启动zk 和kafka

5、启动canal

cd /usr/local/soft/canal/bin

./startup.sh

6、导入数据

load_data.sql

2、在flink中构建ods层

1、需要先启动hadoop hive的元数据服务, flink on yarn-session

# 启动hadoop
start-all.sh

# 启动hive元数据服务
nohup hive --service metastore >> metastore.log 2>&1 &

# 启动yarn-session
yarn-session.sh -jm 1024m -tm 1096

2、进入sql-client创建ods层的表

sql-client.sh  embedded

# 创建库
create database gma_ods;
create database gma_dwd;
create database gma_dws;
create database gma_dim;
create database gma_ads;
3、在ods层中创建所有表
ods_mysql_kafka_base_category1.sql
ods_mysql_kafka_base_category2.sql
ods_mysql_kafka_base_category3.sql
ods_mysql_kafka_base_province.sql
ods_mysql_kafka_base_region.sql
ods_mysql_kafka_base_trademark.sql
ods_mysql_kafka_date_info.sql
ods_mysql_kafka_holiday_info.sql
ods_mysql_kafka_holiday_year.sql
ods_mysql_kafka_order_detailr.sql
ods_mysql_kafka_order_info.sql
ods_mysql_kafka_order_status_log.sql
ods_mysql_kafka_payment_info.sql
ods_mysql_kafka_sku_info.sql
ods_mysql_kafka_user_info.sql
4、构建dim层

1、在mysql 中创建gma_dim数据库用于存储维表数据

再mysql中创建gma_dim

4.1、地区维表

1、在flink sql-client中创建jdbc表

-- 在flink中创建sink表

drop table gma_dim.dim_kafka_mysql_region;

CREATE TABLE gma_dim.dim_kafka_mysql_region(
  id bigint,
  name STRING,
  area_code STRING,
  region_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8',
  'table-name' = 'dim_kafka_mysql_region',
  'username' = 'root',
  'password' = '123456'
);

2、在mysql中创建地区维表

-- mysql中创建接收表

CREATE TABLE `dim_kafka_mysql_region` (
  `id` bigint(20) NOT NULL COMMENT 'id',
  `name` varchar(20) DEFAULT NULL COMMENT '省名称',
  `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码',
  `region_name` varchar(20) DEFAULT NULL COMMENT '大区名',
   PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


3、在到flink中创建mysql cdc表

--- 使用mysqlcdc读取维表
--- mysql cdc 只支持读取,不支持写入

drop table gma_dim.dim_kafka_mysql_region_cdc;

CREATE TABLE gma_dim.dim_kafka_mysql_region_cdc(
  id bigint,
  name STRING,
  area_code STRING,
  region_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'master',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'gma_dim',
  'table-name' = 'dim_kafka_mysql_region'
);

4、将项目打包提交到集群中运行

flink run  -c com.shujia.dim.DImKafkaMysqlRegion gma9-1.0.jar
4.2 、商品维度表

1、在flink中创建sink表

-- 在flink中创建sink表

drop table gma_dim.dim_kafka_mysql_sku;

CREATE TABLE gma_dim.dim_kafka_mysql_sku(
    `id` bigint COMMENT 'skuid(itemID)',
    `spu_id` bigint COMMENT 'spuid',
    `price` decimal(10,0)  COMMENT '价格',
    `sku_name` STRING COMMENT 'sku名称',
    `sku_desc` STRING COMMENT '商品规格描述',
    `weight` decimal(10,2)  COMMENT '重量',
    `tm_name` STRING COMMENT '品牌名',
    `category3_name` STRING COMMENT '三级分类id(冗余)',
    `category2_name` STRING COMMENT '二级分类id(冗余)',
    `category1_name` STRING COMMENT '一级分类id(冗余)',
    `sku_default_img` STRING COMMENT '默认显示图片(冗余)',
    `create_time`TIMESTAMP(3)  COMMENT '创建时间',
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8',
  'table-name' = 'dim_kafka_mysql_sku',
  'username' = 'root',
  'password' = '123456'
);

2、在mysql中创建维表

-- mysql中创建接收表

CREATE TABLE `dim_kafka_mysql_sku` (
    `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
    `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
    `price` decimal(10,0) DEFAULT NULL COMMENT '价格',
    `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
    `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述',
    `weight` decimal(10,2) DEFAULT NULL COMMENT '重量',
    `tm_name` varchar(200) DEFAULT NULL COMMENT '品牌(冗余)',
    `category3_name` varchar(200) DEFAULT NULL COMMENT '三级分类id(冗余)',
    `category2_name` varchar(200) DEFAULT NULL COMMENT '二级分类id(冗余)',
    `category1_name` varchar(200) DEFAULT NULL COMMENT '一级分类id(冗余)',
    `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
    `create_time` datetime DEFAULT NULL COMMENT '创建时间',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3、在flink创建创建cdc表

drop table gma_dim.dim_kafka_mysql_sku_cdc;
CREATE TABLE gma_dim.dim_kafka_mysql_sku_cdc(
    `id` bigint COMMENT 'skuid(itemID)',
    `spu_id` bigint COMMENT 'spuid',
    `price` decimal(10,0)  COMMENT '价格',
    `sku_name` STRING COMMENT 'sku名称',
    `sku_desc` STRING COMMENT '商品规格描述',
    `weight` decimal(10,2)  COMMENT '重量',
    `tm_name` STRING COMMENT '品牌名',
    `category3_name` STRING COMMENT '三级分类id(冗余)',
    `category2_name` STRING COMMENT '二级分类id(冗余)',
    `category1_name` STRING COMMENT '一级分类id(冗余)',
    `sku_default_img` STRING COMMENT '默认显示图片(冗余)',
    `create_time`TIMESTAMP(3)  COMMENT '创建时间',
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'master',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'gma_dim',
  'table-name' = 'dim_kafka_mysql_sku'
);

4、将项目打包上传运行

flink run -c com.shujia.dim.DimKafkaMysqlSku  gma9-1.0.jar
4.3、用户维度表

1、在flink sql -client 中创建sink表

drop table gma_dim.dim_kafka_mysql_user_info;
CREATE TABLE gma_dim.dim_kafka_mysql_user_info(
  `id` bigint COMMENT '编号',
  `login_name` STRING COMMENT '用户名称',
  `nick_name` STRING COMMENT '用户昵称',
  `passwd` STRING COMMENT '用户密码',
  `name` STRING COMMENT '用户姓名',
  `phone_num` STRING COMMENT '手机号',
  `email` STRING COMMENT '邮箱',
  `head_img` STRING COMMENT '头像',
  `user_level` STRING COMMENT '用户级别',
  `birthday` DATE COMMENT '用户生日',
  `gender` STRING COMMENT '性别 M男,F女',
  `create_time` TIMESTAMP(3)  COMMENT '创建时间',
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8',
  'table-name' = 'dim_kafka_mysql_user_info',
  'username' = 'root',
  'password' = '123456'
);


2、在mysql中创建维度表

CREATE TABLE `dim_kafka_mysql_user_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  `login_name` varchar(200) DEFAULT NULL COMMENT '用户名称',
  `nick_name` varchar(200) DEFAULT NULL COMMENT '用户昵称',
  `passwd` varchar(200) DEFAULT NULL COMMENT '用户密码',
  `name` varchar(200) DEFAULT NULL COMMENT '用户姓名',
  `phone_num` varchar(200) DEFAULT NULL COMMENT '手机号',
  `email` varchar(200) DEFAULT NULL COMMENT '邮箱',
  `head_img` varchar(200) DEFAULT NULL COMMENT '头像',
  `user_level` varchar(200) DEFAULT NULL COMMENT '用户级别',
  `birthday` date DEFAULT NULL COMMENT '用户生日',
  `gender` varchar(1) DEFAULT NULL COMMENT '性别 M男,F女',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=153 DEFAULT CHARSET=utf8 COMMENT='用户维表';

3、在flink中创建cdc 表

-- 创建flink cdc 维表, 不能作为sink表,只能作为source


drop table gma_dim.dim_kafka_mysql_user_info_cdc;
CREATE TABLE gma_dim.dim_kafka_mysql_user_info_cdc(
  `id` bigint COMMENT '编号',
  `login_name` STRING COMMENT '用户名称',
  `nick_name` STRING COMMENT '用户昵称',
  `passwd` STRING COMMENT '用户密码',
  `name` STRING COMMENT '用户姓名',
  `phone_num` STRING COMMENT '手机号',
  `email` STRING COMMENT '邮箱',
  `head_img` STRING COMMENT '头像',
  `user_level` STRING COMMENT '用户级别',
  `birthday` DATE COMMENT '用户生日',
  `gender` STRING COMMENT '性别 M男,F女',
  `create_time` TIMESTAMP(3)  COMMENT '创建时间',
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'master',
  'port' = '3306',
  'username' = 'root',
  'password' = 'MyNewPass4!',
  'database-name' = 'gma_dim',
  'table-name' = 'dim_kafka_mysql_user_info'
);


4、提交代码运行

flink run -c com.shujia.dim.DimKafkaMysqlUserInfo  gma9-1.0.jar
5、dwd 层 5.1、支付订单明细表

1、在flink中创建支付订单明细表

-- ---------------------------------
-- DWD层,支付订单明细表dwd_paid_order_detail
-- ---------------------------------
CREATE TABLE gma_dwd.dwd_paid_order_detail
(
  detail_id BIGINT COMMENT '编号',
  order_id BIGINT COMMENT '订单编号',
  user_id BIGINT COMMENT '用户id',
  province_id INT COMMENT '地区',
  sku_id BIGINT COMMENT 'sku_id',
  sku_num INT COMMENT '购买个数',
  order_price DECIMAL(10,0) COMMENT '购买价格(下单时sku价格)',
  create_time TIMESTAMP(0) COMMENT '创建时间',
  payment_way STRING COMMENT '付款方式',
  pay_time TIMESTAMP(0) COMMENT '支付时间'
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'gma_dwd.dwd_paid_order_detail',
    'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    'format' = 'changelog-json'
)

2、启动程序

flink run -c com.shujia.dwd.DwdPaidOrderDetail gma9-1.0.jar
6、ads 层 6.1、省指标

1、在flink中创建表

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------


CREATE TABLE gma_ads.ads_province_index(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
    'table-name' = 'ads_province_index',
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123456'
)

2、在mysql中创建表

CREATE TABLE `ads_province_index` (
  province_id INT,
   area_code varchar(200),
   province_name varchar(200),
   region_name varchar(200),
   order_amount DECIMAL(10,2),
   order_count bigint(20),
   dt varchar(200),
  PRIMARY KEY (`province_id`,`dt`)
) COMMENT='地区指标表';


3、启动指标计算程序

flink run -c com.shujia.ads.AdsProvinceIndex gma9-1.0.jar
6.2、商品指标

1、在flink中创建表

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标 1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------



drop table gma_ads.ads_sku_index;
CREATE TABLE gma_ads.ads_sku_index
(
  sku_id BIGINT,
  weight decimal(10,2),
  tm_name STRING ,
  price decimal(10,2),
  spu_id BIGINT,
  category3_name STRING ,
  category2_name STRING,
  category1_name STRING,
  order_count BIGINT NOT NULL,
  order_amount decimal(10,2),
  sku_count int,
  dt STRING,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://node1:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8',
    'table-name' = 'ads_sku_index',
    'driver' = 'com.mysql.jdbc.Driver',
    'username' = 'root',
    'password' = '123456'
)

2、在mysql中创建表

CREATE TABLE `ads_sku_index` (
  sku_id  bigint(20),
  weight decimal(10,2),
  tm_name varchar(200)  ,
  price decimal(10,2),
  spu_id varchar(200) ,
  category3_name varchar(200)  ,
  category2_name varchar(200) ,
  category1_name varchar(200) ,
  order_count  bigint(20),
  order_amount decimal(10,2),
  sku_count  bigint(20),
  dt varchar(200) ,
  PRIMARY KEY (`sku_id`,`dt`)
) COMMENT='商品指标表';

3、启动指标计算任务

flink run -c com.shujia.ads.AdsSkuIndex gma9-1.0.jar

7、提供数据接口

1、修改mysql地址以及用户名密码

com.shujia.gma.controller.ProvinceController

com.shujia.gma.controller.SkuController

2、启动接口服务

com.shujia.gma.GmaApiApplication

3、访问接口

localhost:8080/getSkuIndex?&id=1&day=2019-11-23

localhost:8080/getProvinceIndex?&id=1&day=2019-11-23

8、监控 8.1 部署prometheus (底层TSDB ,时序数据库)

1、上传解压修改配置文件

vim prometheus.yml

# 在最后增加配置

  - job_name: 'pushgateway'
    scrape_interval: 10s
    honor_labels: true
    static_configs:
     - targets: ['localhost:9091']
       labels:
        instance: pushgateway

2、启动prometheus

nohup ./prometheus  & 
8.2 部署pushgateway

1、上传解压启动

nohup ./pushgateway &
8.3 然后在 Flink 的配置⽂件中添加:
cd /usr/local/soft/flink-1.11.2/conf
vim flink-conf.yaml

# 增加配置
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: master # promgateway 地址
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: shujia
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false

8.4、重启flink yarn session 启动flink任务 8.5访问页面查看指标

http://master:9091/

8.6 部署Grafana

上传解压启动

./grafana-server

访问页面 用户名:admin 密码: admin

http://master:3000/dashboard

增加prometheus 数据源

增加数据表

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5678739.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存