1、在mysql中创建数据库gma
2、创建所有的表
init_mysql_table.sql
3、开启canal动态topic功能
vim /usr/local/soft/canal/conf/example/instance.properties
# 监控gma数据库,不同的表发送到表名的topic上
canal.mq.dynamicTopic=gma…*
4、启动zk 和kafka
5、启动canal
cd /usr/local/soft/canal/bin
./startup.sh
6、导入数据
load_data.sql
2、在flink中构建ods层1、需要先启动hadoop hive的元数据服务, flink on yarn-session
# 启动hadoop start-all.sh # 启动hive元数据服务 nohup hive --service metastore >> metastore.log 2>&1 & # 启动yarn-session yarn-session.sh -jm 1024m -tm 1096
2、进入sql-client创建ods层的表
sql-client.sh embedded # 创建库 create database gma_ods; create database gma_dwd; create database gma_dws; create database gma_dim; create database gma_ads;3、在ods层中创建所有表
ods_mysql_kafka_base_category1.sql ods_mysql_kafka_base_category2.sql ods_mysql_kafka_base_category3.sql ods_mysql_kafka_base_province.sql ods_mysql_kafka_base_region.sql ods_mysql_kafka_base_trademark.sql ods_mysql_kafka_date_info.sql ods_mysql_kafka_holiday_info.sql ods_mysql_kafka_holiday_year.sql ods_mysql_kafka_order_detailr.sql ods_mysql_kafka_order_info.sql ods_mysql_kafka_order_status_log.sql ods_mysql_kafka_payment_info.sql ods_mysql_kafka_sku_info.sql ods_mysql_kafka_user_info.sql4、构建dim层
4.1、地区维表1、在mysql 中创建gma_dim数据库用于存储维表数据
再mysql中创建gma_dim
1、在flink sql-client中创建jdbc表
-- 在flink中创建sink表 drop table gma_dim.dim_kafka_mysql_region; CREATE TABLE gma_dim.dim_kafka_mysql_region( id bigint, name STRING, area_code STRING, region_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'dim_kafka_mysql_region', 'username' = 'root', 'password' = '123456' );
2、在mysql中创建地区维表
-- mysql中创建接收表 CREATE TABLE `dim_kafka_mysql_region` ( `id` bigint(20) NOT NULL COMMENT 'id', `name` varchar(20) DEFAULT NULL COMMENT '省名称', `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码', `region_name` varchar(20) DEFAULT NULL COMMENT '大区名', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3、在到flink中创建mysql cdc表
--- 使用mysqlcdc读取维表 --- mysql cdc 只支持读取,不支持写入 drop table gma_dim.dim_kafka_mysql_region_cdc; CREATE TABLE gma_dim.dim_kafka_mysql_region_cdc( id bigint, name STRING, area_code STRING, region_name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'master', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'gma_dim', 'table-name' = 'dim_kafka_mysql_region' );
4、将项目打包提交到集群中运行
flink run -c com.shujia.dim.DImKafkaMysqlRegion gma9-1.0.jar4.2 、商品维度表
1、在flink中创建sink表
-- 在flink中创建sink表 drop table gma_dim.dim_kafka_mysql_sku; CREATE TABLE gma_dim.dim_kafka_mysql_sku( `id` bigint COMMENT 'skuid(itemID)', `spu_id` bigint COMMENT 'spuid', `price` decimal(10,0) COMMENT '价格', `sku_name` STRING COMMENT 'sku名称', `sku_desc` STRING COMMENT '商品规格描述', `weight` decimal(10,2) COMMENT '重量', `tm_name` STRING COMMENT '品牌名', `category3_name` STRING COMMENT '三级分类id(冗余)', `category2_name` STRING COMMENT '二级分类id(冗余)', `category1_name` STRING COMMENT '一级分类id(冗余)', `sku_default_img` STRING COMMENT '默认显示图片(冗余)', `create_time`TIMESTAMP(3) COMMENT '创建时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'dim_kafka_mysql_sku', 'username' = 'root', 'password' = '123456' );
2、在mysql中创建维表
-- mysql中创建接收表 CREATE TABLE `dim_kafka_mysql_sku` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)', `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid', `price` decimal(10,0) DEFAULT NULL COMMENT '价格', `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称', `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述', `weight` decimal(10,2) DEFAULT NULL COMMENT '重量', `tm_name` varchar(200) DEFAULT NULL COMMENT '品牌(冗余)', `category3_name` varchar(200) DEFAULT NULL COMMENT '三级分类id(冗余)', `category2_name` varchar(200) DEFAULT NULL COMMENT '二级分类id(冗余)', `category1_name` varchar(200) DEFAULT NULL COMMENT '一级分类id(冗余)', `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)', `create_time` datetime DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3、在flink创建创建cdc表
drop table gma_dim.dim_kafka_mysql_sku_cdc; CREATE TABLE gma_dim.dim_kafka_mysql_sku_cdc( `id` bigint COMMENT 'skuid(itemID)', `spu_id` bigint COMMENT 'spuid', `price` decimal(10,0) COMMENT '价格', `sku_name` STRING COMMENT 'sku名称', `sku_desc` STRING COMMENT '商品规格描述', `weight` decimal(10,2) COMMENT '重量', `tm_name` STRING COMMENT '品牌名', `category3_name` STRING COMMENT '三级分类id(冗余)', `category2_name` STRING COMMENT '二级分类id(冗余)', `category1_name` STRING COMMENT '一级分类id(冗余)', `sku_default_img` STRING COMMENT '默认显示图片(冗余)', `create_time`TIMESTAMP(3) COMMENT '创建时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'master', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'gma_dim', 'table-name' = 'dim_kafka_mysql_sku' );
4、将项目打包上传运行
flink run -c com.shujia.dim.DimKafkaMysqlSku gma9-1.0.jar4.3、用户维度表
1、在flink sql -client 中创建sink表
drop table gma_dim.dim_kafka_mysql_user_info; CREATE TABLE gma_dim.dim_kafka_mysql_user_info( `id` bigint COMMENT '编号', `login_name` STRING COMMENT '用户名称', `nick_name` STRING COMMENT '用户昵称', `passwd` STRING COMMENT '用户密码', `name` STRING COMMENT '用户姓名', `phone_num` STRING COMMENT '手机号', `email` STRING COMMENT '邮箱', `head_img` STRING COMMENT '头像', `user_level` STRING COMMENT '用户级别', `birthday` DATE COMMENT '用户生日', `gender` STRING COMMENT '性别 M男,F女', `create_time` TIMESTAMP(3) COMMENT '创建时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/gma_dim?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'dim_kafka_mysql_user_info', 'username' = 'root', 'password' = '123456' );
2、在mysql中创建维度表
CREATE TABLE `dim_kafka_mysql_user_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号', `login_name` varchar(200) DEFAULT NULL COMMENT '用户名称', `nick_name` varchar(200) DEFAULT NULL COMMENT '用户昵称', `passwd` varchar(200) DEFAULT NULL COMMENT '用户密码', `name` varchar(200) DEFAULT NULL COMMENT '用户姓名', `phone_num` varchar(200) DEFAULT NULL COMMENT '手机号', `email` varchar(200) DEFAULT NULL COMMENT '邮箱', `head_img` varchar(200) DEFAULT NULL COMMENT '头像', `user_level` varchar(200) DEFAULT NULL COMMENT '用户级别', `birthday` date DEFAULT NULL COMMENT '用户生日', `gender` varchar(1) DEFAULT NULL COMMENT '性别 M男,F女', `create_time` datetime DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=153 DEFAULT CHARSET=utf8 COMMENT='用户维表';
3、在flink中创建cdc 表
-- 创建flink cdc 维表, 不能作为sink表,只能作为source drop table gma_dim.dim_kafka_mysql_user_info_cdc; CREATE TABLE gma_dim.dim_kafka_mysql_user_info_cdc( `id` bigint COMMENT '编号', `login_name` STRING COMMENT '用户名称', `nick_name` STRING COMMENT '用户昵称', `passwd` STRING COMMENT '用户密码', `name` STRING COMMENT '用户姓名', `phone_num` STRING COMMENT '手机号', `email` STRING COMMENT '邮箱', `head_img` STRING COMMENT '头像', `user_level` STRING COMMENT '用户级别', `birthday` DATE COMMENT '用户生日', `gender` STRING COMMENT '性别 M男,F女', `create_time` TIMESTAMP(3) COMMENT '创建时间', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'master', 'port' = '3306', 'username' = 'root', 'password' = 'MyNewPass4!', 'database-name' = 'gma_dim', 'table-name' = 'dim_kafka_mysql_user_info' );
4、提交代码运行
flink run -c com.shujia.dim.DimKafkaMysqlUserInfo gma9-1.0.jar5、dwd 层 5.1、支付订单明细表
1、在flink中创建支付订单明细表
-- --------------------------------- -- DWD层,支付订单明细表dwd_paid_order_detail -- --------------------------------- CREATE TABLE gma_dwd.dwd_paid_order_detail ( detail_id BIGINT COMMENT '编号', order_id BIGINT COMMENT '订单编号', user_id BIGINT COMMENT '用户id', province_id INT COMMENT '地区', sku_id BIGINT COMMENT 'sku_id', sku_num INT COMMENT '购买个数', order_price DECIMAL(10,0) COMMENT '购买价格(下单时sku价格)', create_time TIMESTAMP(0) COMMENT '创建时间', payment_way STRING COMMENT '付款方式', pay_time TIMESTAMP(0) COMMENT '支付时间' ) WITH ( 'connector' = 'kafka', 'topic' = 'gma_dwd.dwd_paid_order_detail', 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', 'format' = 'changelog-json' )
2、启动程序
flink run -c com.shujia.dwd.DwdPaidOrderDetail gma9-1.0.jar6、ads 层 6.1、省指标
1、在flink中创建表
-- --------------------------------- -- 使用 DDL创建MySQL中的ADS层表 -- 指标:1.每天每个省份的订单数 -- 2.每天每个省份的订单金额 -- --------------------------------- CREATE TABLE gma_ads.ads_province_index( province_id INT, area_code STRING, province_name STRING, region_name STRING, order_amount DECIMAL(10,2), order_count BIGINT, dt STRING, PRIMARY KEY (province_id, dt) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'ads_province_index', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123456' )
2、在mysql中创建表
CREATE TABLE `ads_province_index` ( province_id INT, area_code varchar(200), province_name varchar(200), region_name varchar(200), order_amount DECIMAL(10,2), order_count bigint(20), dt varchar(200), PRIMARY KEY (`province_id`,`dt`) ) COMMENT='地区指标表';
3、启动指标计算程序
flink run -c com.shujia.ads.AdsProvinceIndex gma9-1.0.jar6.2、商品指标
1、在flink中创建表
-- --------------------------------- -- 使用 DDL创建MySQL中的ADS层表 -- 指标 1.每天每个商品对应的订单个数 -- 2.每天每个商品对应的订单金额 -- 3.每天每个商品对应的数量 -- --------------------------------- drop table gma_ads.ads_sku_index; CREATE TABLE gma_ads.ads_sku_index ( sku_id BIGINT, weight decimal(10,2), tm_name STRING , price decimal(10,2), spu_id BIGINT, category3_name STRING , category2_name STRING, category1_name STRING, order_count BIGINT NOT NULL, order_amount decimal(10,2), sku_count int, dt STRING, PRIMARY KEY (sku_id,dt) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://node1:3306/gma_ads?useUnicode=true&characterEncoding=UTF-8', 'table-name' = 'ads_sku_index', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'root', 'password' = '123456' )
2、在mysql中创建表
CREATE TABLE `ads_sku_index` ( sku_id bigint(20), weight decimal(10,2), tm_name varchar(200) , price decimal(10,2), spu_id varchar(200) , category3_name varchar(200) , category2_name varchar(200) , category1_name varchar(200) , order_count bigint(20), order_amount decimal(10,2), sku_count bigint(20), dt varchar(200) , PRIMARY KEY (`sku_id`,`dt`) ) COMMENT='商品指标表';
3、启动指标计算任务
flink run -c com.shujia.ads.AdsSkuIndex gma9-1.0.jar
7、提供数据接口1、修改mysql地址以及用户名密码
com.shujia.gma.controller.ProvinceController
com.shujia.gma.controller.SkuController
2、启动接口服务
com.shujia.gma.GmaApiApplication
3、访问接口
localhost:8080/getSkuIndex?&id=1&day=2019-11-23
localhost:8080/getProvinceIndex?&id=1&day=2019-11-23
8、监控 8.1 部署prometheus (底层TSDB ,时序数据库)1、上传解压修改配置文件
vim prometheus.yml # 在最后增加配置 - job_name: 'pushgateway' scrape_interval: 10s honor_labels: true static_configs: - targets: ['localhost:9091'] labels: instance: pushgateway
2、启动prometheus
nohup ./prometheus &8.2 部署pushgateway
1、上传解压启动
nohup ./pushgateway &8.3 然后在 Flink 的配置⽂件中添加:
cd /usr/local/soft/flink-1.11.2/conf vim flink-conf.yaml # 增加配置 metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter metrics.reporter.promgateway.host: master # promgateway 地址 metrics.reporter.promgateway.port: 9091 metrics.reporter.promgateway.jobName: shujia metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false8.4、重启flink yarn session 启动flink任务 8.5访问页面查看指标
http://master:9091/
8.6 部署Grafana上传解压启动
./grafana-server
访问页面 用户名:admin 密码: admin
http://master:3000/dashboard
增加prometheus 数据源
增加数据表
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)