DorisDB 流批一体 实时架构

DorisDB 流批一体 实时架构,第1张

DorisDB 流批一体 实时架构
  • 场景:

在做大数据分析,报表项目的时候,一般会有实时和离线分析两种场景,一般情况下,是实时一套代码,离线一套代码,代码开发量比较大,怎么做到

  1. 实时和离线公用一份代码(流批一体)
  2. 性能满足实时场景的时效性(实时)

我们这里以统计 每天每个游戏的登录用户数 为例,登录明细表和最终结果表如下

CREATE TABLE `dwd_user_login` (
  `login_time` datetime NULL COMMENT "登陆时间",
  `game_id` bigint(20) NULL COMMENT "游戏ID",
  `user_id` bigint(20) NULL COMMENT "账户ID",
  `ip` varchar(39) NULL COMMENT "ip",
  `updated_at` datetime NULL COMMENT "更新时间"
) ENGINE=OLAP
UNIQUE KEY(`login_time`,  `game_id`,  `user_id`)
COMMENT "OLAP"
PARTITION BY RANGE(`login_time`)
(
PARTITION p20211129 VALUES [('1000-11-29 00:00:00'), ('2021-11-30 00:00:00')),
PARTITION p20211130 VALUES [('2021-11-30 00:00:00'), ('2021-12-01 00:00:00')),
PARTITION p20211201 VALUES [('2021-12-01 00:00:00'), ('2021-12-02 00:00:00')),
PARTITION p20211202 VALUES [('2021-12-02 00:00:00'), ('2021-12-03 00:00:00')),
PARTITION p20211203 VALUES [('2021-12-03 00:00:00'), ('2021-12-04 00:00:00')))
DISTRIBUTED BY HASH(`user_id`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"colocate_with" = "user_id",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-365",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_num" = "3",
"dynamic_partition.buckets" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);

CREATE TABLE `app_day_game_kpi` (
  `login_date` date NULL COMMENT "登陆时间",
  `game_id` bigint(20) NULL COMMENT "游戏ID",
  `login_count` bigint(20) NULL COMMENT "登录次数",
  `login_user_count` bigint(20) NULL COMMENT "登录用户数"
) ENGINE=OLAP
UNIQUE KEY(`login_date`,`game_id`)
COMMENT "OLAP"
PARTITION BY RANGE(`login_date`)
(
PARTITION p20211129 VALUES [('1000-11-29'), ('2021-11-30')),
PARTITION p20211130 VALUES [('2021-11-30'), ('2021-12-01')),
PARTITION p20211201 VALUES [('2021-12-01'), ('2021-12-02')),
PARTITION p20211202 VALUES [('2021-12-02'), ('2021-12-03')),
PARTITION p20211203 VALUES [('2021-12-03'), ('2021-12-04')))
DISTRIBUTED BY HASH(`game_id`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-365",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_num" = "3",
"dynamic_partition.buckets" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);


  • hadoop生态圈经典架构

这是我们经常用到的hadoop生态圈,最经典的架构

实时分析:  数据->flume->kafka-> spark streaming/flink-> mysql 

离线分析: 数据->flume->kafka->hadoop-> spark sql -> mysql 

问题
  •  实时一套代码,离线一套代码 ,代码开发量比较大
  • 当实时出现问题的时候,需要用离线任务来修复数据,实时数据的offset 难以确定

  • DorisDB 常用架构 

 这是DorisDB 最基本的架构:

实时分析:  数据->flume->kafka-> DorisDB-> 直接查明细数据

离线分析:  数据->flume->kafka-> DorisDB->DorisDB统计计算->查统计好的数据

问题:

实时:虽然 DorisDB 是实时数据仓库,性能很优越,但是所有数据都查明细数据,查询时间会很长

离线: 每天的数据离线更新,数据时效性很差

  • DorisDB流批一体 实时架构

 优点: 
  • 流批一体,代码简洁: 先实现离线sql,再把离线sql 复用到实时代码
  • 兼容时效和性能: 今天的数据实时查询,所以基本算实时;今天以前的数据查统计好的代码,所以性能比较优越
实现SQL:
-- 离线sql,每天跑一次
insert  into  app_day_game_kpi
select date(login_time) login_date,game_id,count(1) login_count, count(distinct  user_id) login_user_count  from  dwd_user_login 
where  login_time < DATE_FORMAT(NOW(),'%Y-%m-%d 00:00:00')
group by date(login_time),game_id

-- 复用离线sql,把今天的统计创建成 view
CREATE  view  app_day_game_kpi_view as 
select date(login_time) login_date,game_id,count(1) login_count, count(distinct  user_id) login_user_count  from  dwd_user_login 
where  login_time>=DATE_FORMAT(NOW(),'%Y-%m-%d 00:00:00')
group by date(login_time),game_id

--  离线和实时数据结合,流批一体
select *  from  
(
SELECt  *  from  app_day_game_kpi
union
SELECt  *  from  app_day_game_kpi_view
) t

  • 结语
  • sql 很简单,只用于说明思路,思路通了,复杂的业务逻辑一样可以实现
  • 我太聪明了,哈哈哈 

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5634741.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存