DataX
阿里的Datax是比较优秀的产品,基于python,提供各种数据村塾的读写插件,多线程执行,使用起来也很简单, *** 作简单通常只需要两步;
创建作业的配置文件(json格式配置reader,writer);
启动执行配置作业。
非常适合离线数据,增量数据可以使用一些编码的方式实现,
缺点:仅仅针对insert数据比较有效,update数据就不适合。缺乏对增量更新的内置支持,因为DataX的灵活架构,可以通过shell脚本等方式方便实现增量同步。
参考资料:
github地址:https://github.com/alibaba/DataX
dataX3.0介绍:https://www.jianshu.com/p/65c440f9bce1
datax初体验:https://www.imooc.com/article/15640
文档:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md
Sqoop
Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。
地址:http://sqoop.apache.org/
Sqoop导入:导入工具从RDBMS到HDFS导入单个表。表中的每一行被视为HDFS的记录。所有记录被存储在文本文件的文本数据或者在Avro和序列文件的二进制数据。
Sqoop导出:导出工具从HDFS导出一组文件到一个RDBMS。作为输入到Sqoop文件包含记录,这被称为在表中的行。那些被读取并解析成一组记录和分隔使用用户指定的分隔符。
Sqoop支持全量数据导入和增量数据导入(增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)),同时可以指定数据是否以并发形式导入。
Kettle
Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。
Kettle的Spoon有丰富的Steps可以组装开发出满足多种复杂应用场景的数据集成作业,方便实现全量、增量数据同步。缺点是通过定时运行,实时性相对较差。
NiFi
Apache NiFi 是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。
NiFi基于Web方式工作,后台在服务器上进行调度。 用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。
几个核心概念:
Nifi 的设计理念接近于基于流的编程 Flow Based Programming。
FlowFile:表示通过系统移动的每个对象,包含数据流的基本属性
FlowFile Processor(处理器):负责实际对数据流执行工作
Connection(连接线):负责不同处理器之间的连接,是数据的有界缓冲区
Flow Controller(流量控制器):管理进程使用的线程及其分配
Process Group(过程组):进程组是一组特定的进程及其连接,允许组合其他组件创建新组件
参考资料
Nifi简介及核心概念整理
官方网站:http://nifi.apache.org/index.html
二、实时数据同步
实时同步最灵活的还是用kafka做中间转发,当数据发生变化时,记录变化到kafka,需要同步数据的程序订阅消息即可,需要研发编码支持。这里说个mysql数据库的同步组件,阿里的canal和otter
canal
https://github.com/alibaba/canal
数据抽取简单的来说,就是将一个表的数据提取到另一个表中。有很多的ETL工具可以帮助我们来进行数据的抽取和转换,ETL工具能进行一次性或者定时作业抽取数据,不过canal作为阿里巴巴提供的开源的数据抽取项目,能够做到实时抽取,原理就是伪装成mysql从节点,读取mysql的binlog,生成消息,客户端订阅这些数据变更消息,处理并存储。下面我们来一起搭建一下canal服务
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
基于日志增量订阅&消费支持的业务:
数据库镜像
数据库实时备份
多级索引 (卖家和买家各自分库索引)
search build
业务cache刷新
价格变化等重要业务消息
otter
https://github.com/alibaba/otter
otter是在canal基础上又重新实现了可配置的消费者,使用otter的话,刚才说过的消费者就不需要写了,而otter提供了一个web界面,可以自定义同步任务及map表。非常适合mysql库之间的同步。
另外:otter已在阿里云推出商业化版本 数据传输服务DTS, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的同步高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。
同步业务库的数据到ODS层,之前一直是全量同步数据,主要考虑IO太大,耗时太长,重复拉取同样的数据,现在考虑增量同步的方式实现,同时对库表数据做分区。
增量同步主要分为两步,第一步,存量数据一次性同步;第二步,在存量数据的基础之上,做增量;后期的每一次同步都是增量同步。以下是具体同步方案:
用Sqoop同步表中全部数据到Hive表中;
a.根据hive中最大更新时间,用Sqoop提取更新时间为这个时间之后的增量数据;
1)获取表的所有列,把datetime和timestamp类型,统一在java中映射成TIMESTAMP类型,脚本如下:
2) 用sqoop import拉取数据,脚本如下:
1)创建增量同步的sqoop job,脚本如下:
a、从hive中获取表的最大更新时间
b、以上面获取的最大更新时间,作为起点,创建sqoop job,脚本如下:
c、创建sqoop job之后,就是执行job了,脚本如下:
具体参数详解,参考: https://www.cnblogs.com/Alcesttt/p/11432547.html
sqoop使用hsql来存储job信息,开启metastor service将job信息共享,所有node上的sqoop都可以运行同一个job一、sqoop的配置文件在sqoop.site.xml中:
1、sqoop.metastore.server.location
本地存储路径,默认在tmp下,改为其他路径
2、sqoop.metastore.server.port
metastore service端口号
3、sqoop.metastore.client.autoconnect.url
sqoop自动连接的metastore地址,默认是本地的metastore地址
4、sqoop.metastore.client.enable.autoconnect
开启自动连接。sqoop默认连接本地metastore。注释这个配置会开启自动连接。
二、开启metastore service
sqoop下,nohup bin/sqoop metastore
三、创建job
sqoop支持两种增量导入模式,
一种是 append,即通过指定一个递增的列,比如:
--incremental append --check-column num_iid --last-value 0
varchar类型的check字段也可以通过这种方式增量导入(id为varchar类型的递增数字):
--incremental append --check-column id --last-value 8
另种是可以根据时间戳,比如:
--incremental lastmodified --check-column created --last-value '2012-02-01 11:0:00'
就是只导入created 比'2012-02-01 11:0:00'更大的数据。
bin/sqoop job --meta-connect jdbc:hsqldb:hsql://10.106.1.234:16000/sqoop --create job_zyztest13 -- import --connect jdbc:oracle:thin:@10.106.1.236:1521:orcl --username SQOOP --password sqoop --table LXC_TEST_HBASE_TO_ORACLE --columns NAME,SEX,age,CSRQ -m 1 --hbase-table SQOOP_IMPORT_TEST3 --column-family info --hbase-row-key NAME --split-by NAME --incremental lastmodified --check-column CSRQ --last-value '2012-02-01 11:0:00' --verbose
nohup /opt/hadoopcluster/sqoop-1.4.2.bin__hadoop-2.0.0-alpha/bin/sqoop job --exec job_zyztest13 >job_zyztest13.out 2>&1 &
此时,在10.106.1.234上创建了job_zyztest13这个job,通过bin/sqoop job --meta-connect jdbc:hsqldb:hsql://10.106.1.234:16000/sqoop --list可以查看所有job
四、定时执行
使用linux定时器:crontab -e
编辑定时器,添加*/4 * * * * /opt/hadoopcluster/sqoop-1.4.2.bin__hadoop-2.0.0-alpha/bin/sqoop job --meta-connect jdbc:hsqldb:hsql://10.106.1.234:16000/sqoop --exec job_zyztest13 >job_zyztest13.out 2>&1 &任务将会每四分钟执行一次
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)