数据集成工具 —— datax与flinkx的使用

数据集成工具 —— datax与flinkx的使用,第1张

数据集成工具 —— datax与flinkx的使用 datax

datax只要上传到linux本地,解压即可使用,如果不想每次执行的时候都要输入路径,可以配置到环境变量里面

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、Oceanbase、SqlServer、Postgre、HDFS、Hive、ADS、Hbase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

datax的具体使用方式,可以通过github上面的中文文档来学习,由于github连接不稳定,因此建议在gitee中,导入github上datax的官方库。

点进doc文件,里面有阅读文档,点进阅读文档

这里面会有配置样例,根据配置样例就可以自己写具体执行的json文件了,另外下面还有各个参数的含义说明。

因此想在哪两个数据源之间进行数据同步,先在仓库里面找到具体的reader和writer,然后点进去查看官方文档的样例模板和下面的参数说明,即可开始自己编写

下面举一个例子,从mysql的一张表,写入到mysql 的另一张表
去mysqlreader文档中找到reader的部分

"reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "splitPk": "db_id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }

再到mysqlwriter里面找到writer的部分

"writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }

光有这些还不够,还要加上外层的job和content
另外官方文档里面的只是模板,是不能直接拿来用的,因此需要根据自己的实际需要进行相应的修改

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age"
                        ],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "teacher"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
                            "age"
                        ],
                        "preSql": [
                            "truncate teacher2"
                        ],                        
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://master:3306/student?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "teacher2"
                                ]
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 6
            }
        }
    }
}

在linux本地编辑一个mysqlTomysql.json文件,将上面的内容粘贴进去
并且在mysql数据库中准备好相应的的表和数据

teacher2只需要创建表,不需要往里面添加数据

然后执行

datax.py mysql2mysql.json


执行完毕后,到数据库中查询一下

与teacher表里面的数据一样,说明成功从mysql的teacher表中获取数据写入mysql的teacher2表中

flinkx

flinkx与datax一样,在github上有官方文档,按照文档来编写json文件,然后用给定的执行器执行json文件

同样举一个例子,这次在flinkx的安装目录下,新建一个jsonConf文件夹,然后在里面创建一个mysqlToHDFS.json文件
将下面的代码粘贴进去

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/student?characterEncoding=utf8"
                                ],
                                "table": [
                                    "teacher"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "customSql": "",
                        "splitPk": "",
                        "queryTimeOut": 1000,
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "path": "hdfs://master:9000/flinkx/teacher",
                        "defaultFS": "hdfs://master:9000",
                        "column": [
                            {
                                "name": "col1",
                                "index": 0,
                                "type": "string"
                            },
                            {
                                "name": "col2",
                                "index": 1,
                                "type": "string"
                            }
                        ],
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "writeMode": "overwrite"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

启动flinkx

flinkx -mode local -job /usr/local/soft/flinkx-1.10/jsonConf/mysqlToHDFS.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/



下载文件并查看,发现内容与mysql的teacher表里面的内容相同,说明从mysql写入到hdfs成功

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653569.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存