大数据常用同步工具

大数据常用同步工具,第1张

一、离线数据同步

DataX

阿里的Datax是比较优秀的产品,基于python,提供各种数据村塾的读写插件,多线程执行,使用起来也很简单, *** 作简单通常只需要两步;

创建作业的配置文件(json格式配置reader,writer);

启动执行配置作业。

非常适合离线数据,增量数据可以使用一些编码的方式实现,

缺点:仅仅针对insert数据比较有效,update数据就不适合。缺乏对增量更新的内置支持,因为DataX的灵活架构,可以通过shell脚本等方式方便实现增量同步。

参考资料:

github地址:https://github.com/alibaba/DataX

dataX3.0介绍:https://www.jianshu.com/p/65c440f9bce1

datax初体验:https://www.imooc.com/article/15640

文档:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md

Sqoop

Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

地址:http://sqoop.apache.org/

Sqoop导入:导入工具从RDBMS到HDFS导入单个表。表中的每一行被视为HDFS的记录。所有记录被存储在文本文件的文本数据或者在Avro和序列文件的二进制数据。

Sqoop导出:导出工具从HDFS导出一组文件到一个RDBMS。作为输入到Sqoop文件包含记录,这被称为在表中的行。那些被读取并解析成一组记录和分隔使用用户指定的分隔符。

Sqoop支持全量数据导入和增量数据导入(增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)),同时可以指定数据是否以并发形式导入。

Kettle

Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。

Kettle的Spoon有丰富的Steps可以组装开发出满足多种复杂应用场景的数据集成作业,方便实现全量、增量数据同步。缺点是通过定时运行,实时性相对较差。

NiFi

Apache NiFi 是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。

NiFi基于Web方式工作,后台在服务器上进行调度。 用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。

几个核心概念:

Nifi 的设计理念接近于基于流的编程 Flow Based Programming。

FlowFile:表示通过系统移动的每个对象,包含数据流的基本属性

FlowFile Processor(处理器):负责实际对数据流执行工作

Connection(连接线):负责不同处理器之间的连接,是数据的有界缓冲区

Flow Controller(流量控制器):管理进程使用的线程及其分配

Process Group(过程组):进程组是一组特定的进程及其连接,允许组合其他组件创建新组件

参考资料

Nifi简介及核心概念整理

官方网站:http://nifi.apache.org/index.html

二、实时数据同步

实时同步最灵活的还是用kafka做中间转发,当数据发生变化时,记录变化到kafka,需要同步数据的程序订阅消息即可,需要研发编码支持。这里说个mysql数据库的同步组件,阿里的canal和otter

canal

https://github.com/alibaba/canal

数据抽取简单的来说,就是将一个表的数据提取到另一个表中。有很多的ETL工具可以帮助我们来进行数据的抽取和转换,ETL工具能进行一次性或者定时作业抽取数据,不过canal作为阿里巴巴提供的开源的数据抽取项目,能够做到实时抽取,原理就是伪装成mysql从节点,读取mysql的binlog,生成消息,客户端订阅这些数据变更消息,处理并存储。下面我们来一起搭建一下canal服务

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

数据库镜像

数据库实时备份

多级索引 (卖家和买家各自分库索引)

search build

业务cache刷新

价格变化等重要业务消息

otter

https://github.com/alibaba/otter

otter是在canal基础上又重新实现了可配置的消费者,使用otter的话,刚才说过的消费者就不需要写了,而otter提供了一个web界面,可以自定义同步任务及map表。非常适合mysql库之间的同步。

另外:otter已在阿里云推出商业化版本 数据传输服务DTS, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的同步高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。

同步业务库的数据到ODS层,之前一直是全量同步数据,主要考虑IO太大,耗时太长,重复拉取同样的数据,现在考虑增量同步的方式实现,同时对库表数据做分区。

增量同步主要分为两步,第一步,存量数据一次性同步;第二步,在存量数据的基础之上,做增量;后期的每一次同步都是增量同步。以下是具体同步方案:

用Sqoop同步表中全部数据到Hive表中;

a.根据hive中最大更新时间,用Sqoop提取更新时间为这个时间之后的增量数据;

1)获取表的所有列,把datetime和timestamp类型,统一在java中映射成TIMESTAMP类型,脚本如下:

2) 用sqoop import拉取数据,脚本如下:

1)创建增量同步的sqoop job,脚本如下:

a、从hive中获取表的最大更新时间

b、以上面获取的最大更新时间,作为起点,创建sqoop job,脚本如下:

c、创建sqoop job之后,就是执行job了,脚本如下:

具体参数详解,参考: https://www.cnblogs.com/Alcesttt/p/11432547.html

sqoop使用hsql来存储job信息,开启metastor service将job信息共享,所有node上的sqoop都可以运行同一个job

一、sqoop的配置文件在sqoop.site.xml中:

1、sqoop.metastore.server.location

本地存储路径,默认在tmp下,改为其他路径

2、sqoop.metastore.server.port

metastore service端口号

3、sqoop.metastore.client.autoconnect.url

sqoop自动连接的metastore地址,默认是本地的metastore地址

4、sqoop.metastore.client.enable.autoconnect

开启自动连接。sqoop默认连接本地metastore。注释这个配置会开启自动连接。

二、开启metastore service

sqoop下,nohup bin/sqoop metastore

三、创建job

sqoop支持两种增量导入模式,

一种是 append,即通过指定一个递增的列,比如:

--incremental append --check-column num_iid --last-value 0

varchar类型的check字段也可以通过这种方式增量导入(id为varchar类型的递增数字):

--incremental append --check-column id --last-value 8

另种是可以根据时间戳,比如:

--incremental lastmodified --check-column created --last-value '2012-02-01 11:0:00'

就是只导入created 比'2012-02-01 11:0:00'更大的数据。

bin/sqoop job --meta-connect jdbc:hsqldb:hsql://10.106.1.234:16000/sqoop --create job_zyztest13 -- import --connect jdbc:oracle:thin:@10.106.1.236:1521:orcl --username SQOOP --password sqoop --table LXC_TEST_HBASE_TO_ORACLE --columns NAME,SEX,age,CSRQ -m 1 --hbase-table SQOOP_IMPORT_TEST3 --column-family info --hbase-row-key NAME --split-by NAME --incremental lastmodified --check-column CSRQ --last-value '2012-02-01 11:0:00' --verbose

nohup /opt/hadoopcluster/sqoop-1.4.2.bin__hadoop-2.0.0-alpha/bin/sqoop job --exec job_zyztest13 >job_zyztest13.out 2>&1 &

此时,在10.106.1.234上创建了job_zyztest13这个job,通过bin/sqoop job --meta-connect jdbc:hsqldb:hsql://10.106.1.234:16000/sqoop --list可以查看所有job

四、定时执行

使用linux定时器:crontab -e

编辑定时器,添加*/4 * * * * /opt/hadoopcluster/sqoop-1.4.2.bin__hadoop-2.0.0-alpha/bin/sqoop job --meta-connect jdbc:hsqldb:hsql://10.106.1.234:16000/sqoop --exec job_zyztest13 >job_zyztest13.out 2>&1 &任务将会每四分钟执行一次


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6774446.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-28
下一篇 2023-03-28

发表评论

登录后才能评论

评论列表(0条)

保存