增量同步的方式有很多种,我使用的是: 快照表 + 触发器
需求:
当主库库表发生增删改时,从库库表与主库库表数据保持一致。
环境:
1、Mysql
2、kettle 7.1
思路:
1、在主库中,将需要同步的库表新建快照表,表结构一致。
2、在主库中,分别新增库表的增、删、改的触发器。
2、新建一个转换,该转换只针对一张表的增删改。
3、新建‘表输入’控件,查询主库的快照表
4、新建‘插入/更新’控件,插入数据到从库的库表,查询的关键字要求唯一。
5、新建‘删除’控件,将主库的快照表中的数据删除。
注意: 主库的库表,要新增针对增、删、改的三张表快照,三张表的步骤同上面的1 - 5 一致。
6、新建作业控件
7、配置发送邮件服务
8、完成
下载pdi-ce-4.4.0-stable.zip,解压到文件夹,打开data-integration中的Spoon.bat2
出现欢迎界面后来到Repository Connection窗口,选择建立一个新的repository,随后出现“资源库信息”窗口:
在“资源库信息”窗口中选择新建一个数据库连接,d出“Database Connection”窗口:
在其中输入Connection Name, Host Name, Database Name, Port Number, User Name,Password信息即可建立连接,完成之后在Repository Connection窗口以admin用户名登陆。
新建一个名为cscgTransTest的Transformation,从“核心对象”中将两个“表输入”和一个“插入/更新”拖入到cscgTransTest中,并建立它们之间的连接,如下图所示:
在cscgTransTest中建立一个新的数据库连接ttt,通过表输入“max_createtime”从目标数据库ttt中获取某个表中最新数据的建立时间:
SELECT max(trunc(createtime)) FROMumdata.toeventmedia
在cscgTransTest中建立一个新的数据库连接testdb,以表输入“max_createtime”的查询结果替代表输入“umdata.toeventmedia”中的变量,执行SQL语句从数据库testdb中获取需要插入或者更新到ttt数据库的数据
SELECT * FROMumdata.toeventmedia where trunc(createtime) >= trunc(?)
在“插入/更新”中选择“数据库连接”、“目标模式”、“目标表”等信息,“用来查询的关键字”中的字段用来查询某条记录是否在目标表中存在,不存在则插入记录;如果存在,则继续比较其他字段是否与流里的字段值相同,如果相同则不执行任何 *** 作,如果不同则更新“更新字段”中所列字段。
“用来查询的关键字”所列字段是该表的primarykey,从而可以唯一标识一条记录。
分别为每一个表建立一个如上模式的转换步骤。
新建一个名为“cscgJobTest”的Job,在核心对象中将“START”和“Transformation”拖入cscgJobTest中,并建立两者之间的连接。
选中START中的“重复执行”,类型为“不需要定时”;在Transformation中将转换名设置为之前建立的“cscgTransTest”.
点击“Run this Job”运行。Job和Transformation的执行结果如如下:
一、离线数据同步
DataX
阿里的Datax是比较优秀的产品,基于python,提供各种数据村塾的读写插件,多线程执行,使用起来也很简单, *** 作简单通常只需要两步;
创建作业的配置文件(json格式配置reader,writer);
启动执行配置作业。
非常适合离线数据,增量数据可以使用一些编码的方式实现,
缺点:仅仅针对insert数据比较有效,update数据就不适合。缺乏对增量更新的内置支持,因为DataX的灵活架构,可以通过shell脚本等方式方便实现增量同步。
参考资料:
github地址:https://github.com/alibaba/DataX
dataX3.0介绍:https://www.jianshu.com/p/65c440f9bce1
datax初体验:https://www.imooc.com/article/15640
文档:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
Sqoop
Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
地址:http://sqoop.apache.org/
Sqoop导入:导入工具从RDBMS到HDFS导入单个表。表中的每一行被视为HDFS的记录。所有记录被存储在文本文件的文本数据或者在Avro和序列文件的二进制数据。
Sqoop导出:导出工具从HDFS导出一组文件到一个RDBMS。作为输入到Sqoop文件包含记录,这被称为在表中的行。那些被读取并解析成一组记录和分隔使用用户指定的分隔符。
Sqoop支持全量数据导入和增量数据导入(增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)),同时可以指定数据是否以并发形式导入。
Kettle
Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。
Kettle的Spoon有丰富的Steps可以组装开发出满足多种复杂应用场景的数据集成作业,方便实现全量、增量数据同步。缺点是通过定时运行,实时性相对较差。
NiFi
Apache NiFi 是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。
NiFi基于Web方式工作,后台在服务器上进行调度。 用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。
几个核心概念:
Nifi 的设计理念接近于基于流的编程 Flow Based Programming。
FlowFile:表示通过系统移动的每个对象,包含数据流的基本属性
FlowFile Processor(处理器):负责实际对数据流执行工作
Connection(连接线):负责不同处理器之间的连接,是数据的有界缓冲区
Flow Controller(流量控制器):管理进程使用的线程及其分配
Process Group(过程组):进程组是一组特定的进程及其连接,允许组合其他组件创建新组件
参考资料
Nifi简介及核心概念整理
官方网站:http://nifi.apache.org/index.html
二、实时数据同步
实时同步最灵活的还是用kafka做中间转发,当数据发生变化时,记录变化到kafka,需要同步数据的程序订阅消息即可,需要研发编码支持。这里说个mysql数据库的同步组件,阿里的canal和otter
canal
https://github.com/alibaba/canal
数据抽取简单的来说,就是将一个表的数据提取到另一个表中。有很多的ETL工具可以帮助我们来进行数据的抽取和转换,ETL工具能进行一次性或者定时作业抽取数据,不过canal作为阿里巴巴提供的开源的数据抽取项目,能够做到实时抽取,原理就是伪装成mysql从节点,读取mysql的binlog,生成消息,客户端订阅这些数据变更消息,处理并存储。下面我们来一起搭建一下canal服务
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
基于日志增量订阅&消费支持的业务:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
otter
https://github.com/alibaba/otter
otter是在canal基础上又重新实现了可配置的消费者,使用otter的话,刚才说过的消费者就不需要写了,而otter提供了一个web界面,可以自定义同步任务及map表。非常适合mysql库之间的同步。
另外:otter已在阿里云推出商业化版本 数据传输服务DTS, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的同步高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)