1. 接入层
Canal、Flume、Kafka
针对业务系统数据,Canal监控Binlog日志,发送至kafka;
针对日志数据,由Flume来进行统一收集,并发送至kafka。
消息队列的数据既是离线数仓的原始数据,也是实时计算的原始数据,这样可以保证实时和离线的原始数据是统一的。
2. 计算层
Flink
有了源数据,在 计算层 经过Flink实时计算引擎做一些加工处理,然后落地到存储层中不同存储介质当中。
3. 存储层
HBase、Kafka、ES、Mysql、Hive、Redis
不同的 存储介质 是通过不同的应用场景来选择。
4. 数据应用层
风控、模型、图谱、大屏展示
通过存储层应用于不同的 数据应用 ,数据应用可能是我们的正式产品或者直接的业务系统
二、技术实现
1. 计算引擎
实时计算引擎的功能要求
提供高级 API,支持常见的数据 *** 作比如关联聚合,最好是能支持 SQL
具有状态管理和自动支持久化方案,减少对存储的依赖
可靠的容错机制,低延时,最好能够保证Exactly-once
Flink的优势
Flink的API、容错机制与状态管理都满足实时数仓计算引擎的需求
Flink高吞吐、低延时的特性
端到端的Exactly-once
WaterMark&Event Time的支持
Flink 不仅支持了大量常用的 SQL 语句,还有丰富的数据类型、内置函数以及灵活的自定义函数,基本覆盖了我们的开发场景
2. 存储引擎
根据不同的业务场景,使用最适合的存储引擎:
Kafka主要用于中间数据表的存储
ES主要针对日志数据的存储和分析
HBase、Redis可用于维表存储
Hive用于数据校验
Mysql可以用于指标计算结果的存储
三、数据分层
数据源:目前数据源主要是Binlog,通过Canal监控各个业务系统的Mysql,将binlog发送至kafka。
ODS层:主要将Binlog数据存储至Kafka,这一层不对数据进行任何 *** 作,存储最原始的数据,Binlog 日志在这一层为库级别,即:一个库的变更数据存放在同一个 Kafka Topic 中。
DWD层:主要对数据进行简单的清洗。拆分主题,将库级别的主题拆分为表级别;打平数据,将data数组格式打平。
DWS层:主要根据不同的业务的需求,将该需求所涉及到的表进行join所得。
APP层:根据指标计算需求,对数据进行处理后,存储HBase,为了方便模型查询,主要将表存储为索引表和明细表,直接对数据进行指标计算后,将计算结果存储到HBase。
四、数据监控及校验
1. 数据监控
目前数据的监控的架构是pushgateway + Prometheus + Grafana
数据监控主要是接入Flink的Metric,通过Grafana对Flink系统指标及自定义指标进行图形化界面的展示,对关键指标进行监控报警
2. 数据校验
目前数据的监控的架构是Grafana + Mysql
Grafana用于监控指标的展示及相关阈值数据的报警,Mysql主要用于监控数据的存储
将每个服务的source收到的数据、sink发出的数据,根据表的不同将数据关键字段写入mysql中,通过统计各个阶段各个表中的数据条数,对数据完整性进行监控校验,若出现数据缺时,先查找原因,然后指定时间戳重启服务
五、系统管理
元数据管理
表,字段元数据管理,实时感知元数据的变化,大幅度降低使用数据的成本。
系统配置
对应用启动参数及相关配置参数的管理,对任务进行灵活配置及管理。
血缘管理
主要是梳理实时计算平台中数据依赖关系,以及实时任务的依赖关系,从底层ODS到DWD再到DWS,以及APP层用到哪些数据,将整个链度串联起来。
六、问题及解决方案
1. 数据倾斜
由于要拆分主题,要以table为key对数据进行keyBy,但是由于每个表的数据量相差较大,会出现数据倾斜
解决方案:
加盐,给key加前缀
前缀不能随便加,为了保证同一id的数据在相同的分区中,所以根据id_table进行keyBy
2. 数据重复
任务在进行自动或手动重启时,为了保证数据不丢失,数据会出现重复计算的问题,如果下游只是对数据进行HBase存储的话,由于幂等性,这种重复可以解。但是,如果下游要对数据进行聚合,这样会导致数据被计算多次,影响计算结果的准确性
解决方案:
上游在对数据进行发送时,对kafka producer 进行 exactly once的设置
在对数据统计时进行数据去重
3. 数据延时
由于所处理的数据表的大小不一样,处理大表时,会出现数据延时的问题。
解决方案:
针对大表数据增加并行度
4.数据乱序
由于Flink kafka producer默认是根据hash对数据进行随机分区,kafka consumer在对数据进行消费时,每个分区消费速度不同,这样最终在存储数据时,就会出现乱序即相同的id会出现老数据覆盖新数据的问题
解决方案:
对kafka每个阶段进行自定义分区,将id相同的数据分到同一个分区,保证同一id的数据的有序性
由于整个数据处理过程中可能会出现shuffle,导数数据重新乱序,所以在对数据存储前对数据进行排序
对数据进行排序的关键点时要保证每条数据的唯一性,即要有标记数据先后顺序的字段
5 . 数据唯一标记(很重要)
由于要对数据进行去重或者排序,所以要保证数据的唯一性
解决办法:
使用时间戳不可以,因为数据量很大的情况下,同一时间会处理上百条数据
在最初发出数据的时候,为数据打上标记,使用 partition + offset + idx 的组合来确认数据的唯一性及顺序性
6. 数据可靠性
我们对服务重启或对服务升级时,可能会出现数据的丢失
解决方案:
结合Flink 的checkpoint及savepoint机制保证数据的可靠性
开启Flink的checkpoint机制,服务进行自动重启时,会自动读取上次保存在checkpoint中offset,或者我们指定offset进行数据消费
对服务进行升级时,先将服务的状态保存至savepoint中,重启时指定savepoint进行服务启动,保证数据不丢失
7. 无感升级
由于我们目前数据量比较庞大,且在对服务进行升级时,耗时较长,会影响调用方的使用。
解决办法:
在对服务进行升级时,将数据写入备用库,等数据追上且服务稳定运行后,再将存储库进行切换
链接地址: FlinkX
FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL binlog,Kafka等
1、前置
需要安装maven、java8、配置好github相关参数
2、Fork FlinX项目到自己的仓库中
2、Clone项目到本地
git clone https://github.com/liukunyuan/flinkx.git
3、安装额外的jar包
1)、cd flinkx/bin
2)、执行sh ./install_jars.sh(windows执行install_jars.bat脚本)
4、打包
1)、回到flinkx目录:cd ..
2)、执行打包命令:mvn clean package -Dmaven.test.skip=true
1、配置flink conf文件(暂时不需要安装flink)
1)、进入flinkconf目录
cd flinkconf
2)、修改flink-conf.yaml文件添加一行
rest.bind-port: 8888
2、配置mysqltomysql的json文件,路径:/Users/jack/Documents/jack-project/flinkx/flinkconf/mysql2mysql.json
3、运行任务
4、查看监控网页和log.txt文件: http://localhost:8888/
目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波
主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现
报错 SQL statement must not contain ? character.
查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改
完整的SqlServerDialect文件
最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了
注意url写法为:jdbc:jtds:sqlserver://xxx:1433databaseName=master
[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)