DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、Hbase、FTP等各种异构数据源之间稳定高效的数据同步功能。
官网地址:https://github.com/alibaba/DataX
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图。
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
DataX本身作为离线数据同步框架,采用framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
下面用一个DataX作业生命周期的时序图说明DataX的运行流程、核心概念以及每个概念之间的关系。
举例来说,用户提交了一个DataX作业,并且配置了总的并发度为20,目的是对一个有100张分表的mysql数据源进行同步。DataX的调度决策思路是:
1)DataX Job根据分库分表切分策略,将同步工作分成100个Task。
2)根据配置的总的并发度20,以及每个Task Group的并发度5,DataX计算共需要分配4个TaskGroup。
3)4个TaskGroup平分100个Task,每一个TaskGroup负责运行25个Task。
1)下载DataX安装包并上传到node1的/opt/software
下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
2)解压datax.tar.gz到/opt/server/
[linux@node1 software]$ tar -zxvf datax.tar.gz -C /opt/server/
3)自检,执行如下命令
[linux@node1 software]$ python /opt/server/datax/bin/datax.py /opt/server/datax/job/job.json
出现如下内容,则表明安装成功 …… 2021-10-12 21:51:12.335 [job-0] INFO JobContainer - 任务启动时刻 : 2021-10-12 21:51:02 任务结束时刻 : 2021-10-12 21:51:12 任务总计耗时 : 10s 任务平均流量 : 253.91KB/s 记录写入速度 : 10000rec/s 读出记录总数 : 100000 读写失败总数 : 0第4章 DataX使用 4.1 DataX使用概述
4.1.1 DataX任务提交命令
DataX的使用十分简单,用户只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可。
[linux@node1 software]$ python bin/datax.py path/to/your/job.json
4.2.2 DataX配置文件格式
可以使用如下命名查看DataX配置文件模板。
[linux@node1 software]$ python bin/datax.py -r mysqlreader -w hdfswriter
配置文件模板如下,json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
Reader和Writer的具体参数可参考官方文档,地址如下:
https://github.com/alibaba/DataX/blob/master/README.md4.2 同步MySQL数据到HDFS案例
案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目录
需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者使用table,column,where等属性声明需要同步的数据;后者使用一条SQL查询语句声明需要同步的数据。
下面分别使用两种模式进行演示。
1)编写配置文件
(1)创建配置文件base_province.json
[linux@node1 software]$ vim /opt/server/datax/job/base_province.json
(2)配置文件内容如下
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [ "id", "name", "region_id", "area_code", "iso_code", "iso_3166_2" ], "where": "id>=3", "connection": [ { "jdbcUrl": [ "jdbc:mysql://hadoop102:3306/gmall" ], "table": [ "base_province" ] } ], "password": "000000", "splitPk": "", "username": "root" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "name", "type": "string" }, { "name": "region_id", "type": "string" }, { "name": "area_code", "type": "string" }, { "name": "iso_code", "type": "string" }, { "name": "iso_3166_2", "type": "string" } ], "compress": "gzip", "defaultFS": "hdfs://hadoop102:8020", "fieldDelimiter": "t", "fileName": "base_province", "fileType": "text", "path": "/base_province", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
2)配置文件说明
(1)Reader参数说明
(2)Writer参数说明
注意事项:
HFDS Writer并未提供nullFormat参数:也就是用户并不能自定义null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(’’),而Hive默认的null值存储格式为N。所以后期将DataX同步的文件导入Hive表就会出现问题。
解决该问题的方案有两个:
一是修改DataX HDFS Writer的源码,增加自定义null值存储格式的逻辑,可参考https://blog.csdn.net/u010834071/article/details/105506580
二是在Hive中建表时指定null值存储格式为空字符串(’’),例如:
DROp TABLE IF EXISTS base_province; CREATE EXTERNAL TABLE base_province ( `id` STRING COMMENT '编号', `name` STRING COMMENT '省份名称', `region_id` STRING COMMENT '地区ID', `area_code` STRING COMMENT '地区编码', `iso_code` STRING COMMENT '旧版ISO-3166-2编码,供可视化使用', `iso_3166_2` STRING COMMENT '新版IOS-3166-2编码,供可视化使用' ) COMMENT '省份表' ROW FORMAT DELIMITED FIELDS TERMINATED BY 't' NULL DEFINED AS '' LOCATION '/base_province/';
(3)Setting参数说明
3)提交任务
(1)在HDFS创建/base_province目录
使用DataX向HDFS同步数据时,需确保目标路径已存在
[linux@node1 ~]$ hadoop fs -mkdir /base_province
(2)进入DataX根目录
[linux@node1 ~]$ cd /opt/server/datax
(3)执行如下命令
[linux@node1 datax]$ python bin/datax.py job/base_province.json
4)查看结果
(1)DataX打印日志
2021-10-13 11:13:14.930 [job-0] INFO JobContainer - 任务启动时刻 : 2021-10-13 11:13:03 任务结束时刻 : 2021-10-13 11:13:14 任务总计耗时 : 11s 任务平均流量 : 66B/s 记录写入速度 : 3rec/s 读出记录总数 : 32 读写失败总数 : 0
(2)查看HDFS文件
[linux@node1 datax]$ hadoop fs -cat /base_province/* | zcat
3 山西 1 140000 CN-14 CN-SX 4 内蒙古 1 150000 CN-15 CN-NM 5 河北 1 130000 CN-13 CN-HE 6 上海 2 310000 CN-31 CN-SH 7 江苏 2 320000 CN-32 CN-JS 8 浙江 2 330000 CN-33 CN-ZJ 9 安徽 2 340000 CN-34 CN-AH 10 福建 2 350000 CN-35 CN-FJ 11 江西 2 360000 CN-36 CN-JX 12 山东 2 370000 CN-37 CN-SD 14 台湾 2 710000 CN-71 CN-TW 15 黑龙江 3 230000 CN-23 CN-HL 16 吉林 3 220000 CN-22 CN-JL 17 辽宁 3 210000 CN-21 CN-LN 18 陕西 7 610000 CN-61 CN-SN 19 甘肃 7 620000 CN-62 CN-GS 20 青海 7 630000 CN-63 CN-QH 21 宁夏 7 640000 CN-64 CN-NX 22 新疆 7 650000 CN-65 CN-XJ 23 河南 4 410000 CN-41 CN-HA 24 湖北 4 420000 CN-42 CN-HB 25 湖南 4 430000 CN-43 CN-HN 26 广东 5 440000 CN-44 CN-GD 27 广西 5 450000 CN-45 CN-GX 28 海南 5 460000 CN-46 CN-HI 29 香港 5 810000 CN-91 CN-HK 30 澳门 5 820000 CN-92 CN-MO 31 四川 6 510000 CN-51 CN-SC 32 贵州 6 520000 CN-52 CN-GZ 33 云南 6 530000 CN-53 CN-YN 13 重庆 6 500000 CN-50 CN-CQ 34 西藏 6 540000 CN-54 CN-XZ4.2.2 MySQLReader之QuerySQLMode
1)编写配置文件
(1)修改配置文件base_province.json
[linux@node1 ~]$ vim /opt/server/datax/job/base_province.json
(2)配置文件内容如下
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": [ "jdbc:mysql://hadoop102:3306/gmall" ], "querySql": [ "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3" ] } ], "password": "000000", "username": "root" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "name", "type": "string" }, { "name": "region_id", "type": "string" }, { "name": "area_code", "type": "string" }, { "name": "iso_code", "type": "string" }, { "name": "iso_3166_2", "type": "string" } ], "compress": "gzip", "defaultFS": "hdfs://hadoop102:8020", "fieldDelimiter": "t", "fileName": "base_province", "fileType": "text", "path": "/base_province", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
2)配置文件说明
(1)Reader参数说明
3)提交任务
(1)清空历史数据
[linux@node1 datax]$ hadoop fs -rm -r -f /base_province/*
(2)进入DataX根目录
[linux@node1 datax]$ cd /opt/server/datax
(3)执行如下命令
[linux@node1 datax]$ python bin/datax.py job/base_province.json
4)查看结果
(1)DataX打印日志
2021-10-13 11:13:14.930 [job-0] INFO JobContainer - 任务启动时刻 : 2021-10-13 11:13:03 任务结束时刻 : 2021-10-13 11:13:14 任务总计耗时 : 11s 任务平均流量 : 66B/s 记录写入速度 : 3rec/s 读出记录总数 : 32 读写失败总数 : 0
(2)查看HDFS文件
[linux@node1 datax]$ hadoop fs -cat /base_province/* | zcat
3 山西 1 140000 CN-14 CN-SX 4 内蒙古 1 150000 CN-15 CN-NM 5 河北 1 130000 CN-13 CN-HE 6 上海 2 310000 CN-31 CN-SH 7 江苏 2 320000 CN-32 CN-JS 8 浙江 2 330000 CN-33 CN-ZJ 9 安徽 2 340000 CN-34 CN-AH 10 福建 2 350000 CN-35 CN-FJ 11 江西 2 360000 CN-36 CN-JX 12 山东 2 370000 CN-37 CN-SD 14 台湾 2 710000 CN-71 CN-TW 15 黑龙江 3 230000 CN-23 CN-HL 16 吉林 3 220000 CN-22 CN-JL 17 辽宁 3 210000 CN-21 CN-LN 18 陕西 7 610000 CN-61 CN-SN 19 甘肃 7 620000 CN-62 CN-GS 20 青海 7 630000 CN-63 CN-QH 21 宁夏 7 640000 CN-64 CN-NX 22 新疆 7 650000 CN-65 CN-XJ 23 河南 4 410000 CN-41 CN-HA 24 湖北 4 420000 CN-42 CN-HB 25 湖南 4 430000 CN-43 CN-HN 26 广东 5 440000 CN-44 CN-GD 27 广西 5 450000 CN-45 CN-GX 28 海南 5 460000 CN-46 CN-HI 29 香港 5 810000 CN-91 CN-HK 30 澳门 5 820000 CN-92 CN-MO 31 四川 6 510000 CN-51 CN-SC 32 贵州 6 520000 CN-52 CN-GZ 33 云南 6 530000 CN-53 CN-YN 13 重庆 6 500000 CN-50 CN-CQ 34 西藏 6 540000 CN-54 CN-XZ4.2.3 DataX传参
通常情况下,离线数据同步任务需要每日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对每日同步的数据加以区分,也就是说每日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一效果,就需要使用DataX传参的功能。
DataX传参的用法如下,在JSON配置文件中使用${param}引用参数,在提交任务时使用-p"-Dparam=value"传入参数值,具体示例如下。
1)编写配置文件
(1)修改配置文件base_province.json
[linux@node1 ~]$ vim /opt/server/datax/job/base_province.json
(2)配置文件内容如下
{ "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "connection": [ { "jdbcUrl": [ "jdbc:mysql://hadoop102:3306/gmall" ], "querySql": [ "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3" ] } ], "password": "000000", "username": "root" } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "bigint" }, { "name": "name", "type": "string" }, { "name": "region_id", "type": "string" }, { "name": "area_code", "type": "string" }, { "name": "iso_code", "type": "string" }, { "name": "iso_3166_2", "type": "string" } ], "compress": "gzip", "defaultFS": "hdfs://hadoop102:8020", "fieldDelimiter": "t", "fileName": "base_province", "fileType": "text", "path": "/base_province/${dt}", "writeMode": "append" } } } ], "setting": { "speed": { "channel": 1 } } } }
2)提交任务
(1)创建目标路径
[linux@node1 datax]$ hadoop fs -mkdir /base_province/2020-06-14
(2)进入DataX根目录
[linux@node1 datax]$ cd /opt/module/datax
(3)执行如下命令
[linux@node1 datax]$ python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json
3)查看结果
[linux@node1 datax]$ hadoop fs -ls /base_province Found 2 items drwxr-xr-x - linux supergroup 0 2021-10-15 21:41 /base_province/2020-06-144.3 同步HDFS数据到MySQL案例
案例要求:同步HDFS上的/base_province目录下的数据到MySQL gmall 数据库下的test_province表。
需求分析:要实现该功能,需选用HDFSReader和MySQLWriter。
1)编写配置文件
(1)创建配置文件test_province.json
[linux@node1 ~]$ vim /opt/server/datax/job/base_province.json
(2)配置文件内容如下
{ "job": { "content": [ { "reader": { "name": "hdfsreader", "parameter": { "defaultFS": "hdfs://hadoop102:8020", "path": "/base_province", "column": [ "*" ], "fileType": "text", "compress": "gzip", "encoding": "UTF-8", "nullFormat": "\N", "fieldDelimiter": "t", } }, "writer": { "name": "mysqlwriter", "parameter": { "username": "root", "password": "000000", "connection": [ { "table": [ "test_province" ], "jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8" } ], "column": [ "id", "name", "region_id", "area_code", "iso_code", "iso_3166_2" ], "writeMode": "replace" } } } ], "setting": { "speed": { "channel": 1 } } } }
2)配置文件说明
(1)Reader参数说明
(2)Writer参数说明
3)提交任务
(1)在MySQL中创建gmall.test_province表
DROP TABLE IF EXISTS `test_province`; CREATE TABLE `test_province` ( `id` bigint(20) NOT NULL, `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
(2)进入DataX根目录
[linux@node1 datax]$ cd /opt/server/datax
(3)执行如下命令
[linux@node1 datax]$ python bin/datax.py job/test_province.json
4)查看结果
(1)DataX打印日志
2021-10-13 15:21:35.006 [job-0] INFO JobContainer - 任务启动时刻 : 2021-10-13 15:21:23 任务结束时刻 : 2021-10-13 15:21:35 任务总计耗时 : 11s 任务平均流量 : 70B/s 记录写入速度 : 3rec/s 读出记录总数 : 34 读写失败总数 : 0
(2)查看MySQL目标表数据
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以承受的范围内达到最佳的同步速度。
关键优化参数如下:
注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channle的byte限速,总record限速/单个channel的record限速)
配置示例:
{ "core": { "transport": { "channel": { "speed": { "byte": 1048576 //单个channel byte限速1M/s } } } }, "job": { "setting": { "speed": { "byte" : 5242880 //总byte限速5M/s } }, ... } }5.2 内存调整
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)