「开源」数据同步ETL工具,支持多数据源间的增、删、改数据同步

「开源」数据同步ETL工具,支持多数据源间的增、删、改数据同步,第1张

bboss数据同步可以方便地实现多种数据源之间的数据同步功能,支持增、删、改数据同步,本文为大家程序各种数据同步案例。

使用Apache-2.0开源协议

通过bboss,可以非常方便地采集database/mongodb/Elasticsearch/kafka/hbase/本地或者Ftp日志文件源数据,经过数据转换处理后,再推送到目标库elasticsearch/database/file/ftp/kafka/dummy/logger。

数据导入的方式

支持各种主流数据库、各种es版本以及本地/Ftp日志文件数据采集和同步、加工处理

支持从kafka接收数据;经过加工处理的数据亦可以发送到kafka;

支持将单条记录切割为多条记录;

可以将加工后的数据写入File并上传到ftp/sftp服务器;

支持备份采集完毕日志文件功能,可以指定备份文件保存时长,定期清理超过时长文件;

支持自动清理下载完毕后ftp服务器上的文件

支持excel、csv文件采集(本地和ftp/sftp)

支持导出数据到excel和csv文件,并支持上传到ftp/sftp服务器

提供自定义处理采集数据功能,可以自行将采集的数据按照自己的要求进行处理到目的地,支持数据来源包括:database,elasticsearch,kafka,mongodb,hbase,file,ftp等,想把采集的数据保存到什么地方,有自己实现CustomOutPut接口处理即可。

支持的数据库: mysql,maridb,postgress,oracle ,sqlserver,db2,tidb,hive,mongodb、HBase等

支持的Elasticsearch版本: 1.x,2.x,5.x,6.x,7.x,8.x,+

支持海量PB级数据同步导入功能

支持将ip转换为对应的运营商和城市地理坐标位置信息

支持设置数据bulk导入任务结果处理回调函数,对每次bulk任务的结果进行成功和失败反馈,然后针对失败的bulk任务通过error和exception方法进行相应处理

支持以下三种作业调度机制:

bboss另一个显著的特色就是直接基于java语言来编写数据同步作业程序,基于强大的java语言和第三方工具包,能够非常方便地加工和处理需要同步的源数据,然后将最终的数据保存到目标库(Elasticsearch或者数据库);同时也可以非常方便地在idea或者eclipse中调试和运行同步作业程序,调试无误后,通过bboss提供的gradle脚本,即可构建和发布出可部署到生产环境的同步作业包。因此,对广大的java程序员来说,bboss无疑是一个轻易快速上手的数据同步利器。

如果需要增量导入,还需要导入sqlite驱动:

如果需要使用xxjob来调度作业任务,还需要导入坐标:

本文从mysql数据库表td_cms_document导入数据到es中,除了导入上述maven坐标,还需要额外导入mysql驱动坐标(其他数据库驱动程序自行导入): mysql 5.x驱动依赖包

mysql 8.x驱动依赖包(mysql 8必须采用相应版本的驱动,否则不能正确运行)

私信回复:数据同步ETL工具

或访问一飞开源:https://code.exmay.com/

kettle是一个ETL工具,ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程)。

kettle中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。

所以他的重心是用于数据

oozie是一个工作流,Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。

oozie工作流中是有数据流动的,但是重心是在于工作流的定义。

二者虽然都有相关功能及数据的流动,但是其实用途是不一样的。

查看帮助

列举出所有linux上的数据库

列举出所有Window上的数据库

查看数据库下的所有表

(1)确定mysql服务启动正常

查询控制端口和查询进程来确定,一下两种办法可以确认mysql是否在启动状态

办法1:查询端口

MySQL监控的TCP的3306端口,如果显示3306,证明MySQL服务在运行中

办法二:查询进程

可以看见mysql的进程

没有指定数据导入到哪个目录,默认是/user/root/表名

原因:

如果表中有主键,m的值可以设置大于1的值如果没有主键只能将m值设置成为1或者要将m值大于1,需要使用--split-by指定一个字段

设置了-m 1 说明只有一个maptask执行数据导入,默认是4个maptask执行导入 *** 作,但是必须指定一个列来作为划分依据

导入数据到指定目录

在导入表数据到HDFS使用Sqoop导入工具,我们可以指定目标目录。使用参数 --target-dir来指定导出目的地,使用参数—delete-target-dir来判断导出目录是否存在,如果存在就删掉

查询导入

提示:must contain '$CONDITIONS' in WHERE clause。

where id <=1 匹配条件

$CONDITIONS:传递作用。

如果 query 后使用的是双引号,则 $CONDITIONS前必须加转义符,防止 shell 识别为自己的变量。

--query时不能使用--table一起使用

需要指定--target-dir路径

导入到hdfs指定目录并指定要求

数据导出储存方式(数据存储文件格式---( textfil parquet)--as-textfileImports data as plain text (default)--as-parquetfile Imports data to Parquet Files)

导入表数据子集到HDFS

sqoop导入blob数据到hive

对于CLOB,如xml文本,sqoop可以迁移到Hive表,对应字段存储为字符类型。

对于BLOB,如jpg图片,sqoop无法直接迁移到Hive表,只能先迁移到HDFS路径,然后再使用Hive命令加载到Hive表。迁移到HDFS后BLOB字段存储为16进制形式。

2.1.3导入关系表到Hive

第一步:导入需要的jar包

将我们mysql表当中的数据直接导入到hive表中的话,我们需要将hive的一个叫做hive-exec-1.1.0-cdh5.14.0.jar的jar包拷贝到sqoop的lib目录下

第二步:开始导入

导入关系表到hive并自动创建hive表

们也可以通过命令来将我们的mysql的表直接导入到hive表当中去

通过这个命令,我们可以直接将我们mysql表当中的数据以及表结构一起倒入到hive当中去

--incremental 增量模式。

append id 是获取一个某一列的某个值。

lastmodified “2016-12-15 15:47:35” 获取某个时间后修改的所有数据

-append 附加模式

-merge-key id 合并模式

--check-column 用来指定一些列,可以去指定多个列;通常的是指定主键id

--last -value 从哪个值开始增量

==注意:增量导入的时候,一定不能加参数--delete-target-dir 否则会报错==

第一种增量导入方式(不常用)

1.Append方式

使用场景:有个订单表,里面每个订单有一个唯一标识的自增列id,在关系型数据库中以主键的形式存在。之前已经将id在0-1000之间的编号的订单导入到HDFS 中;如果在产生新的订单,此时我们只需指定incremental参数为append,--last-value参数为1000即可,表示只从id大于1000后开始导入。

(1)创建一个MySQL表

(2)创建一个hive表(表结构与mysql一致)

注意:

append 模式不支持写入到hive表中

2.lastModify方式

此方式要求原有表有time字段,它能指定一个时间戳,让sqoop把该时间戳之后的数据导入到HDFS;因为后续订单可能状体会变化,变化后time字段时间戳也会变化,此时sqoop依然会将相同状态更改后的订单导入HDFS,当然我们可以只当merge-key参数为order-id,表示将后续新的记录和原有记录合并。

# 将时间列大于等于阈值的数据增量导入HDFS

使用 lastmodified 方式导入数据,要指定增量数据是要 --append(追加)还是要 --merge-key(合并)last-value 指定的值是会包含于增量导入的数据中。

第二种增量导入方式(推荐)

==通过where条件选取数据更加精准==

2.1.5从RDBMS到HBase

会报错

原因:sqoop1.4.6 只支持 HBase1.0.1 之前的版本的自动创建 HBase 表的功能。

解决方案:手动创建 HBase 表

导出前,目标表必须存在与目标数据库中

默认 *** 作是将文件中的数据使用insert语句插入到表中

数据是在HDFS当中的如下目录/sqoop/emp,数据内容如下

第一步:创建MySQL表

第二步:执行导出命令

通过export来实现数据的导出,将hdfs的数据导出到mysql当中去

全量导出

增量导出

更新导出

总结:

参数介绍

--update-key 后面也可以接多个关键字列名,可以使用逗号隔开,Sqoop将会匹配多个关键字后再执行更新 *** 作。

--export-dir 参数配合--table或者--call参数使用,指定了HDFS上需要将数据导入到MySQL中的文件集目录。

--update-mode updateonly和allowinsert。 默认模式为updateonly,如果指定--update-mode模式为allowinsert,可以将目标数据库中原来不存在的数据也导入到数据库表中。即将存在的数据更新,不存在数据插入。

组合测试及说明

1、当指定update-key,且关系型数据库表存在主键时:

A、allowinsert模式时,为更新目标数据库表存的内容,并且原来不存在的数据也导入到数据库表;

B、updateonly模式时,为更新目标数据库表存的内容,并且原来不存在的数据也不导入到数据库表;

2、当指定update-key,且关系型数据库表不存在主键时:

A、allowinsert模式时,为全部数据追加导入到数据库表;

B、updateonly模式时,为更新目标数据库表存的内容,并且原来不存在的数据也不导入到数据库表;

3、当不指定update-key,且关系型数据库表存在主键时:

A、allowinsert模式时,报主键冲突,数据无变化;

B、updateonly模式时,报主键冲突,数据无变化;

4、当不指定update-key,且关系型数据库表不存在主键时:

A、allowinsert模式时,为全部数据追加导入到数据库表;

B、updateonly模式时,为全部数据追加导入到数据库表;

实际案例:

(1)mysql批量导入hive

使用shell脚本:

笔者目前用sqoop把mysql数据导入到Hive中,最后实现命令行导入,sqoop版本1.4.7,实现如下

最后需要把这个导入搞成job,每天定时去跑,实现数据的自动化增量导入,sqoop支持job的管理,可以把导入创建成job重复去跑,并且它会在metastore中记录增值,每次执行增量导入之前去查询

创建job命令如下

创建完job就可以去执行它了

sqoop job --exec users

可以把该指令设为Linux定时任务,或者用Azkaban定时去执行它

hive导出到MySQL时,date类型数据发生变化?

问题原因:时区设置问题,date -R查看服务器时间,show VARIABLES LIKE "%time_zone"查看Mysql时间,system并不表示中国的标准时间,要将时间设置为东八区

(1):对市面上最流行的两种调度器,给出以下详细对比,以供技术选型参考。总体来说,ooize相比azkaban是一个重量级的任务调度系统,功能全面,但配置使用也更复杂。如果可以不在意某些功能的缺失,轻量级调度器azkaban是很不错的候选对象。

(2):功能:

两者均可以调度mapreduce,pig,java,脚本工作流任务;

两者均可以定时执行工作流任务;

(3):工作流定义:

Azkaban使用Properties文件定义工作流;

Oozie使用XML文件定义工作流;

(4):工作流传参:

Azkaban支持直接传参,例如${input};

Oozie支持参数和EL表达式,例如${fs:dirSize(myInputDir)};

(5):定时执行:

Azkaban的定时执行任务是基于时间的;

Oozie的定时执行任务基于时间和输入数据;

(6):资源管理:

Azkaban有较严格的权限控制,如用户对工作流进行读/写/执行等 *** 作;

Oozie暂无严格的权限控制;

(7):工作流执行:

Azkaban有两种运行模式,分别是solo server mode(executor server和web server部署在同一台节点)和multi server mode(executor server和web server可以部署在不同节点);

Oozie作为工作流服务器运行,支持多用户和多工作流;

(8):工作流管理:

Azkaban支持浏览器以及ajax方式 *** 作工作流;

Oozie支持命令行、HTTP REST、Java API、浏览器 *** 作工作流;

浏览器页面访问

http://node03:8081/

使用Oozie时通常整合hue,用户数据仓库调度

就是刚才选择的脚本

脚本里需要的参数,尽量设置为动态自动获取,如 ${date}

第一步的参数是所有文件和当天日期,后面的只需要日期,最后一步是导出所有结果,相应填入

添加文件和设置相应参数

运行后会有状态提示页面,可以看到任务进度

点击调度任务的页面情况

修改定时任务名和描述

添加需要定时调度的任务

sm-workflow的参数都是写死的,没有设置动态,这里的下拉列表就不会有可选项。

设置参数

将sm-workflow的日期修改为 ${do_date},保存

进入定时计划sm-dw中,会看到有参数 do_date

填入相应参数,前一天日期

Oozie常用系统常量

当然,也可以通过这样将参数传入workflow任务中,代码或者shell中需要的参数。

如,修改sm-workflow 中的 sqoop_import.sh,添加一个参数 ${num}。

编辑文件(需要登陆Hue的用户有对HDFS *** 作的权限),修改shell中的一个值为参数,保存。

在workflow中,编辑添加参数 ${num} ,或者num=${num} 保存。

进入schedule中,可以看到添加的参数,编辑输入相应参数即可。

Bundle统一管理所有定时调度,阶段划分:Bundle >Schedule >workflow

时间 :2019-08-23

背景 :业务需求,需将Mysql源库中一表A同步到数仓oracle中,工具依然选用 pentaho kettle 8.2。

问题 :没想到被遇到的一个小问题耽误了许久。

步入正题。

报错如下: Invalid column type:16 

很显然的字段类型不匹配,但因此表的字段过多(40个左右),数据量颇大(近2000W)。通过目测判断可能出错的字段类型: date,time,timestamp,并且一一尝试,但是都一直报错。

在观察报错日志,显然是Boolean,显然是布尔型的报错.....

我在oracle端create此表的时候, 没有根据kettle自动生成的建表SQL 。因为自动生成的往往会有一些小问题。

下图是我在MySQL使用show create table A ,展示的建表语句。 注意我标注的字段 ,就是此字段导致的问题。

如下图,相同的表,相同的字段。

mysql 认为是char(1),但是kettle认为是Boolean。

使用 cast(deleted as char) 此函数处理,就是把deleted转为字符型。 

问题解决。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/7339579.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-04
下一篇 2023-04-04

发表评论

登录后才能评论

评论列表(0条)

保存