调度工具(ETL+任务流)

调度工具(ETL+任务流),第1张

kettle是一个ETL工具,ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程)。
kettle中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。
所以他的重心是用于数据
oozie是一个工作流,Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。
oozie工作流中是有数据流动的,但是重心是在于工作流的定义。
二者虽然都有相关功能及数据的流动,但是其实用途是不一样的。

查看帮助

列举出所有linux上的数据库

列举出所有Window上的数据库

查看数据库下的所有表

(1)确定mysql服务启动正常

查询控制端口和查询进程来确定,一下两种办法可以确认mysql是否在启动状态

办法1:查询端口

MySQL监控的TCP的3306端口,如果显示3306,证明MySQL服务在运行中

办法二:查询进程

可以看见mysql的进程

没有指定数据导入到哪个目录,默认是/user/root/表名

原因:

如果表中有主键,m的值可以设置大于1的值;如果没有主键只能将m值设置成为1;或者要将m值大于1,需要使用--split-by指定一个字段

设置了-m 1 说明只有一个maptask执行数据导入,默认是4个maptask执行导入 *** 作,但是必须指定一个列来作为划分依据

导入数据到指定目录

在导入表数据到HDFS使用Sqoop导入工具,我们可以指定目标目录。使用参数 --target-dir来指定导出目的地,使用参数—delete-target-dir来判断导出目录是否存在,如果存在就删掉

查询导入

提示:must contain '$CONDITIONS' in WHERE clause。

where id <=1 匹配条件

$CONDITIONS:传递作用。

如果 query 后使用的是双引号,则 $CONDITIONS前必须加转义符,防止 shell 识别为自己的变量。

--query时不能使用--table一起使用

需要指定--target-dir路径

导入到hdfs指定目录并指定要求

数据导出储存方式(数据存储文件格式---( textfil parquet)--as-textfileImports data as plain text (default)--as-parquetfile Imports data to Parquet Files)

导入表数据子集到HDFS

sqoop导入blob数据到hive

对于CLOB,如xml文本,sqoop可以迁移到Hive表,对应字段存储为字符类型。
对于BLOB,如jpg,sqoop无法直接迁移到Hive表,只能先迁移到HDFS路径,然后再使用Hive命令加载到Hive表。迁移到HDFS后BLOB字段存储为16进制形式。

213导入关系表到Hive

第一步:导入需要的jar包

将我们mysql表当中的数据直接导入到hive表中的话,我们需要将hive的一个叫做hive-exec-110-cdh5140jar的jar包拷贝到sqoop的lib目录下

第二步:开始导入

导入关系表到hive并自动创建hive表

们也可以通过命令来将我们的mysql的表直接导入到hive表当中去

通过这个命令,我们可以直接将我们mysql表当中的数据以及表结构一起倒入到hive当中去

--incremental 增量模式。

append id 是获取一个某一列的某个值。

lastmodified “2016-12-15 15:47:35” 获取某个时间后修改的所有数据

-append 附加模式

-merge-key id 合并模式

--check-column 用来指定一些列,可以去指定多个列;通常的是指定主键id

--last -value 从哪个值开始增量

==注意:增量导入的时候,一定不能加参数--delete-target-dir 否则会报错==

第一种增量导入方式(不常用)

1Append方式

使用场景:有个订单表,里面每个订单有一个唯一标识的自增列id,在关系型数据库中以主键的形式存在。之前已经将id在0-1000之间的编号的订单导入到HDFS 中;如果在产生新的订单,此时我们只需指定incremental参数为append,--last-value参数为1000即可,表示只从id大于1000后开始导入。

(1)创建一个MySQL表

(2)创建一个hive表(表结构与mysql一致)

注意:

append 模式不支持写入到hive表中

2lastModify方式

此方式要求原有表有time字段,它能指定一个时间戳,让sqoop把该时间戳之后的数据导入到HDFS;因为后续订单可能状体会变化,变化后time字段时间戳也会变化,此时sqoop依然会将相同状态更改后的订单导入HDFS,当然我们可以只当merge-key参数为order-id,表示将后续新的记录和原有记录合并。

# 将时间列大于等于阈值的数据增量导入HDFS

使用 lastmodified 方式导入数据,要指定增量数据是要 --append(追加)还是要 --merge-key(合并)last-value 指定的值是会包含于增量导入的数据中。

第二种增量导入方式(推荐)

==通过where条件选取数据更加精准==

215从RDBMS到HBase

会报错

原因:sqoop146 只支持 HBase101 之前的版本的自动创建 HBase 表的功能。

解决方案:手动创建 HBase 表

导出前,目标表必须存在与目标数据库中

默认 *** 作是将文件中的数据使用insert语句插入到表中

数据是在HDFS当中的如下目录/sqoop/emp,数据内容如下

第一步:创建MySQL表

第二步:执行导出命令

通过export来实现数据的导出,将hdfs的数据导出到mysql当中去

全量导出

增量导出

更新导出

总结:

参数介绍
--update-key 后面也可以接多个关键字列名,可以使用逗号隔开,Sqoop将会匹配多个关键字后再执行更新 *** 作。
--export-dir 参数配合--table或者--call参数使用,指定了HDFS上需要将数据导入到MySQL中的文件集目录。
--update-mode updateonly和allowinsert。 默认模式为updateonly,如果指定--update-mode模式为allowinsert,可以将目标数据库中原来不存在的数据也导入到数据库表中。即将存在的数据更新,不存在数据插入。
组合测试及说明
1、当指定update-key,且关系型数据库表存在主键时:
A、allowinsert模式时,为更新目标数据库表存的内容,并且原来不存在的数据也导入到数据库表;
B、updateonly模式时,为更新目标数据库表存的内容,并且原来不存在的数据也不导入到数据库表;
2、当指定update-key,且关系型数据库表不存在主键时:
A、allowinsert模式时,为全部数据追加导入到数据库表;
B、updateonly模式时,为更新目标数据库表存的内容,并且原来不存在的数据也不导入到数据库表;
3、当不指定update-key,且关系型数据库表存在主键时:
A、allowinsert模式时,报主键冲突,数据无变化;
B、updateonly模式时,报主键冲突,数据无变化;
4、当不指定update-key,且关系型数据库表不存在主键时:
A、allowinsert模式时,为全部数据追加导入到数据库表;
B、updateonly模式时,为全部数据追加导入到数据库表;

实际案例:

(1)mysql批量导入hive

使用shell脚本:

笔者目前用sqoop把mysql数据导入到Hive中,最后实现命令行导入,sqoop版本147,实现如下

最后需要把这个导入搞成job,每天定时去跑,实现数据的自动化增量导入,sqoop支持job的管理,可以把导入创建成job重复去跑,并且它会在metastore中记录增值,每次执行增量导入之前去查询

创建job命令如下

创建完job就可以去执行它了

sqoop job --exec users

可以把该指令设为Linux定时任务,或者用Azkaban定时去执行它

hive导出到MySQL时,date类型数据发生变化?

问题原因:时区设置问题,date -R查看服务器时间,show VARIABLES LIKE "%time_zone"查看Mysql时间,system并不表示中国的标准时间,要将时间设置为东八区

(1):对市面上最流行的两种调度器,给出以下详细对比,以供技术选型参考。总体来说,ooize相比azkaban是一个重量级的任务调度系统,功能全面,但配置使用也更复杂。如果可以不在意某些功能的缺失,轻量级调度器azkaban是很不错的候选对象。

(2):功能:

两者均可以调度mapreduce,pig,java,脚本工作流任务;

两者均可以定时执行工作流任务;

(3):工作流定义:

Azkaban使用Properties文件定义工作流;

Oozie使用XML文件定义工作流;

(4):工作流传参:

Azkaban支持直接传参,例如${input};

Oozie支持参数和EL表达式,例如${fs:dirSize(myInputDir)};

(5):定时执行:

Azkaban的定时执行任务是基于时间的;

Oozie的定时执行任务基于时间和输入数据;

(6):资源管理:

Azkaban有较严格的权限控制,如用户对工作流进行读/写/执行等 *** 作;

Oozie暂无严格的权限控制;

(7):工作流执行:

Azkaban有两种运行模式,分别是solo server mode(executor server和web server部署在同一台节点)和multi server mode(executor server和web server可以部署在不同节点);

Oozie作为工作流服务器运行,支持多用户和多工作流;

(8):工作流管理:

Azkaban支持浏览器以及ajax方式 *** 作工作流;

Oozie支持命令行、>1、专用服务器
数据库服务器要求每个用户拥有一个专用服务器进程,当用户比较多的时候,则其对服务器的硬件资源,特别是内存,会产生比较大的压力。
适用环境:
1、 只有少数客户端。
2、 为数据仓库搭建的数据库系统。
3、 联机事务处理系统。(大事务的处理,若使用共享服务器模式,很有可能会造成有些事务需要进入队列排队,响应时间拉长)
2、共享服务器进程
多个用户程序可以并发共用一个服务器进程,客户端程序通过调用调度程序与服务器进程相连
如何查看是否是共享服务器模式?
1 查看调度程序
SQL> show parameter dispatchers;
NAME TYPE VALUE
------------------------------------ ----------- -----------------------------
dispatchers string (PROTOCOL=TCP) (SERVICE=sdecp
yXDB)
max_dispatchers integer 5
mts_dispatchers string (PROTOCOL=TCP) (SERVICE=sdecp
yXDB)
mts_max_dispatchers integer 5
2、查看共享服务器进程数
SQL> show parameter shared_servers;
NAME TYPE VALUE
------------------------------------ ----------- -----------------------------
max_shared_servers integer 20
shared_servers integer 1
3、预留
SQL> show parameter shared_server_session;
NAME TYPE VALUE
------------------------------------ ----------- ---------
shared_server_sessions integer 165
其中
dispatchers:调度程序服务器进程
max_shared_servers :指定同时运行的最大服务器进程数
shared_servers :启动实例时可以创建的服务器进程数
shared_server_sessions:指定用于用户会话的总数,配置此参数可为专用服务器保留用户会话
4改变进程数
SQL> alter system set shared_servers=2;
系统已更改。
SQL> show parameter shared_servers;
NAME TYPE VALUE
------------------------------------ ----------- -------------------------
max_shared_servers integer 20
shared_servers integer 2
SQL>
优点是客户端进程多对一,增加了数据库可以支持的用户数。缺点就是各个用户共享一个进程,对用户访问数据库的性能有所影响。

背景

随着互联网的发展,应用服务中的定时任务数量日益增加,常规的垂直应用架构已无法应对,分布式服务架构势在必行。同时,也迫切需要一个分布式任务调度系统来管理分布式服务中的定时任务。

单一应用架构

当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。此时,在该应用中的定时任务如果不多还好,但是一旦比较多,则意味着每次更改一个定时任务的执行时间,就需要重新部署一遍整个应用,导致整个应用停滞一段时间。

垂直应用架构

当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,将应用拆分为互不相干的几个应用来提升效率。此时,相应的任务也会被垂直拆分,每次更改任务带来的影响相应减少。

分布式服务架构

当垂直应用越来越多,应用之间可能会出现不可避免的交互,此时,将核心业务抽取出来,形成单独的服务,各式各样的服务逐渐形成稳定的服务中心,使得前端应用能更快地响应多变的市场需求。此时,用于提高业务复用及整合的分布式服务框架是关键,同时,由于服务独立,则一般能做到定时任务独立的情况,因此,任务的更改对于整体系统的影响小之又小。

分布式任务调度

在分布式服务架构的基础上,由于独立业务的数量可能很多,此时如果定时任务单独在该服务中实现,很可能会出现难以管理的情况,且避免不了定时任务更改导致的业务重启,因此,一个独立的分布式任务调度系统是很必要的,可以用来全局统筹管理所有的定时任务,同时,将任务的配置单独抽离出来作为该分布式任务调度系统的功能,就能做到定时任务的更改不影响任何业务,也不影响整个系统。

架构设计

设计思想

以Dubbo核心,将调度单独抽象出来,成为一个调度中心,调度中心本身不承担实现任何业务逻辑,只是单纯依据调度配置来发起调度请求
将任务抽象成为ExecutorService,由任务执行者来实现具体的任务,并且负责接收调度请求并执行,这样设计可以将任务与调度中心完全解耦,提高整个系统的扩展性,方便接入
将调度中心对任务执行者的一些调用 *** 作提取出来,形成一个单独的管理控制台,可以用来查看任务执行情况,同时该管理控制台通过H5实现,并提供对外Restful API,方便扩展与接入
通过各种中间件,实现一些必须的 *** 作,例如告警,监控,日志收集统计等 *** 作,完全对整个系统安全性,稳定性的保障

调度中心集群通过Zookeeper存储每个Schedular的一致性HashCode,以此来分配Job与Schedular之间的关系:新增的Job将会通过每个Schedular的一致性HashCode获取其对应的Job数,依据Job数的大小以及Schedular的相关属性来计算每个Schedular的权重,根据权重的大小来确定当前新增的Job应该被分配到哪个Schedular上。
当新增Schedular之后,该新增的Schedular的Job正常为0,因此,正常状态下,该新增的Schedular的权重会比较大。
同时,调度中心通过Zookeeper对Schedular实现主备切换,确保系统稳定性

系统组成

Schedular

基于Quartz实现调度,提供对执行者 *** 作的接口,用于 *** 作任务调度配置,调度触发等 *** 作;自身不参与任务逻辑的实现,不会受限于任务

执行者

负责接收调度中心发起的调度请求,实现相应业务逻辑,完成任务执行,同时会对任务逻辑进行切面处理,记录相应日志并在任务结束后发送给调度中心

管理控制台

管理控制台负责展示任务状态,执行情况,任务执行日志等报表数据,同时可以通过管理控制台配置新增任务, *** 作任务的状态,暂停/恢复等;另外,提供对外Restful API与H5的接入

Dubbo Monitor

实时监控调度中心接口调用情况,统计调度频次,成功失败,QPS等,能通过这些报表数据来优化任务调度,优化系统

ELK

通过ELK(ElasticSearch+Logstash+Kibana)来收集调度中心以及各执行机的执行日志,并加以分析统计,形成报表,可以方便提供观察

Alarm

报警系统,通过Chronograf控制台配置告警规则,在出现问题时第一时间通过Kapacitor进行邮件与短信报警,可以有效提高错误提示的及时性并且降低错误发生到错误解决过程中消耗的时间,降低生产环境造成的损失,告警数据通过业务仪表盘获取。例如:可以配置线上机器的cpu当前使用率,设置阀值50%,策略为超过设置阀值时短信告警,此时当线上某台机器cpu超过50%时,即会发送短信告警

业务仪表盘

通过打点的方式,来实时收集接口监控数据,通过logstash传输到kafka,通过kafka再分发到jstorm进行处理,处理完之后再存储到influxdb,形成业务仪表盘,最后通过Grafana控制台产生监控报表

配置中心

整个系统各服务,通过配置中心统一管理相应配置,形成分布式配置管理机制,方便系统内各服务的配置一致性以及准确性

在配置完之后,就可以实现自己的任务逻辑了

接入之后,可以通过日志管理控制台线上实时查看任务执行状态

另外,由于有告警系统,在任务执行异常时,会产生告警邮件与短信,实时发送,告知任务接入者与相应研发人员
配置中心属性配置

特性

支持动态暂停/恢复任务

任务状态停止时,任务将不再被触发,若任务在执行过程中被暂停,则正在执行的任务不会被阻塞(由于任务执行结果状态中存在超时失败状态,因此如果点击暂停按钮时阻塞了当前正在执行的任务,会使当前任务的执行状态变为超时,不符合超时状态真正的意义),会延迟停止,即等到当前任务执行完再真正停止任务
调度中心基于Quartz实现,通过Zookeeper实现主备隔离,保证调度中心HA
当活跃节点宕机,冷备节点就会载入所有活跃节点中正在调度状态的任务,成为新的活跃节点,保证任务调度的准确执行
执行机支持集群部署,任务分布式执行,通过调度中心统一调度
执行机负载均衡,默认根据任务在某个执行机上的执行次数计算执行机调度权重,按照权重来选择本次任务调度分发给哪台执行机,实现负载均衡,可手动更改执行机选择策略

执行机集群方式,

failover,failfast,failsafe,failback,forking,默认为failover(故障切换),调用失败时,重试其他服务器;failfast(快速失败),只会发起一次调用,不会重试,失败立即报错;failsafe(失败安全),出现异常时,直接忽略;failback(失败恢复),调用失败时,定时重发,直到成功,重启会丢失; forking,并行调用多个执行机,只要一个成功即返回

分片任务,支持任务分片,通过参数发送给任务执行机,执行机可以通过判断参数来进行分片作业的开发,同时支持动态分片,以分片参数和执行机为纬度进行分片,支持动态扩容执行机以及分片参数

例如,某张订单表按照订单ID来进行简单纬度分片,且以取模3来进行分片,则可以设置三个分片参数,(0,1,2),执行机在通过context拿到其中的一个分片参数之后可以对该分片参数做判断,来做具体的数据 *** 作,例如当拿到的分片参数为0时,则对于0相应的分片做数据查询 *** 作,依次类推
再例如,某张订单表按照订单ID和Status来进行二维分片,ID取模以3来进行分片,状态以成功,失败两种状态来进行分片,此时就可以设置6个分片参数,({‘id’:0,’status’:0},{‘id’:1,’status’:0},{‘id’:2,’status’:0},{‘id’:0,’status’:1},{‘id’:1,’status’:1},{‘id’:2,’status’:1}),执行机在通过context拿到其中的一个分片参数之后,就可以对其json参数进行判断,来实现分片 *** 作

任务执行一致性,每次任务只会被一个执行机所执行;对于分片任务,在执行机集群部署时,一次任务调度将会广播触发对应集群中相应数量的执行器执行一次任务,同时传递分片参数,可根据分片参数开发分片任务。

当分片参数大于执行器数量时,将会按照执行器路由策略,使得当前分片任务的某个或者某几个执行机执行多个分片的任务

例如:当前分片任务分片参数为(a,b,c),当前任务执行机有3台(A,B,C)时,则会均匀随机得将分片参数发送给某一台执行机,且三台执行机一次只接收一个分片参数,做一次任务处理,即(a->A,b->B,c->C)|(a->A,b->C,c->B)|(a->B,b->A,c->C)|(a->B,b->C,c->A)|(a->C,b->A,c->B) |(a->C,b->B,c->A);

当任务执行机只有2台(A,B)时,每次任务调度时,某台执行机会收到两个分片参数,并分别处理这两个分片参数,即(a->A,b->B,c->A)|(a->A,b->B,c->B)|(a->B,b->A,c->A)|(a->B,b->A,c->B);

而当任务执行机大于分片参数个数,为4台(A,B,C,D)时,(a->A,b->B,c->C)|(a->A,b->C,c->D)|(a->A,b->D,c->B)| (a->B,b->C,c->D)|(a->B,b->D,c->A)|(a->B,b->A,c->C)|(a->C,b->A,c->B)|(a->C,b->B,c->D)| (a->C,b->D,c->A)|(a->D,b->A,c->B)|(a->D,b->B,c->C)|(a->D,b->C,c->A),而统计下来,不管是执行机数目大于或者等于或者小于分片参数数目,被分发给执行机的分片参数始终是保持一致的(每台执行机接收到的总的分片参数是均匀的)

告警系统,系统接入内部告警系统,任务失败时支持邮件,短信,钉钉,电话等告警

d性扩容缩容,调度中心将会实时探测任务执行机,因此一旦有执行机上线或者下线,都将会被探测到,而如果未被调度中心探测到,则可以进行手动探测执行机,而在探测到执行机之后,下次调度将会重新分配任务
任务依赖,支持配置任务依赖关系,当父任务在执行完成之后会自动触发子任务的执行例如:有两个任务,分别为A和B,而B的执行条件为确认A任务执行完,B才能执行,否则,B任务不执行,此时的依赖关系就是B任务依赖A任务,此时在A任务配置完之后,设置B任务为依赖任务,而依赖关系则是A;又例如有6个任务,分别是A1,A2,A3,A4,A5,B,而B任务的执行需要确保A1-5这5个任务都执行完,B任务才执行,此时,B的依赖关系就是B任务依赖于A1-5,此时在配置完A1-5这5个任务之后再设置B为依赖任务,依赖关系则是A1-5

支持运行时查看任务执行情况,任务数量,调用次数,执行器数量等统计信息异常执行恢复机制,有时会遇到不可控情况,即执行机在执行后的执行结果因为网络断开等不可控因素导致不能发送给调度中心,此时能通过异常执行恢复机制临时记录,在下次执行机正常启动时重试发送给调度中心调度手动触发手动执行,特殊需求下,可能会要求调度可以手动执行,例如调度任务失败之后可能需要手动执行一次调度来补偿
并行/串行策略,当定时时间远大于任务执行时间时,可以使用并行策略,任务异步调用执行,提高任务调度精确度;当任务执行时间可能大于定时时间,却需要任务按照某个定时规则定时调度时,可以使用串行策略,调度中心调度的当前任务的上一次触发,如果没有执行完,则当前执行机的下一次定时时间点时不会被触发,当且仅当任务执行结束,以防止某些持续性定时任务的时间不确定性导致异步触发时的数据混乱的情况发生,例如:某一任务的定时时间为10秒钟,但是任务本身可能会出现执行超过10秒的情况,而超过时,如果出现两个时间点的任务并行执行时会出现数据混乱的情况,此时就可以使用串行策略,确保当前执行机上一个任务未执行完,不会触发新的执行
支持调度接口数据监控,产生监控报表,便于观测。

总结

对于互联网公司来说,时间就是金钱,效率决定一切。本系统在接入到8月初将近3个月的时间内,表现不凡,调度了约100万次,给公司内部各服务实现任务调度提供了便利。
原文


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/13495277.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-08-18
下一篇 2023-08-18

发表评论

登录后才能评论

评论列表(0条)

保存