Flink CookBook- JDBC Table Source并发详解

Flink CookBook- JDBC Table Source并发详解,第1张

    Flink在读取JDBC表时,为了加快速度,通常可以并发的方式读取,只需要增加以下几个参数:

column:分区字段

lower-bound:分区字段值的下界

upper-bound:分区字段值的上界

num:分区数

根据以上参数,可以确定总的记录数(maxElemCount=maxVal-minVal),然后再基于分区数计算每个分区将要fetch的记录数(batchSize=maxElemCount / batchNum),即分区步长,接下来就是计算每个分区数据边界,算法很简单:

需要注意的是,分区列的最大值和最小值会作为过滤条件,因此如果设置的不合理,会导致数据查询不完整。

    在Sqoop中,从mysql import,如果指定了并发数,即map  task数,也要按照一定的分区方法,将数据split到多个map里。sqoop的算法和Flink的算法类似,本质上都是要对数据进行合理分片、分到多个task。sqoop算法如下,numSplits是数据分区数,同时也是map task数:

JDBC Table Source执行时,会先把split绑定到task、设置PreparedStatement where条件,接下来就是循环遍历ResultSet结果集了:

      我们还要关注下算子的并行度,因为算子的并行度和数据的分区还是不一样的,所以这里还有一步,怎么把分区数据分配给并行化的算子。算子没有设置并行度时,就用默认并行度:

    如果算子并行度设置的比分区数大,会有subtask空跑的情况,如果并行度设置的比分区数小,会有一个或多个subtask读取多个分区的情况。

    Sqoop和Flink不同的一个点是,分区列的最大、最小值是运行时决定的,不是指定的,就是说sqoop开始执行时,会根据指定的sql查询出最值;而且sqoop的map task数就是分区数,不会有一个map拉取多个分区数据或一个map没有拉取到分区数据的情况。

Sqoop是一款用于把关系型数据库中的数据导入到hdfs中或者hive中的工具,当然也支持把数据从hdfs或者hive导入到关系型数据库中。

Sqoop也是基于Mapreduce来做的数据导入。

关于sqoop的原理

sqoop的原理比较简单,就是根据用户指定的sql或者字段参数,从数据库中读取数据导入到hive或者hdfs中。也支持基于数据库导出工具导出,不过受限于数据库的版本。

在导出的过程中,sqoop会自动切分mapreduce任务。比如某个字段的主键是从1到1000,那么当设置num-mappers为2时,第一个mr任务会读取1-500的数据,第二个mr任务会读取500-1000的数据。如果是字符串还有其他的划分方法.

关于架构

sqoop目前有两个大版本,第一个版本比较简单,只能使用命令行

第二个版本引入了sqoop server,统一处理连接等信息,并提供多种连接方式,还引入了权限控制,另外规范了连接的各项配置。

直接在mysql里从本地文件系统导入数据

mysql》LOAD DATA LOCAL INFILE 'C:\\Users\\asys\\Documents\\Tencent Files\\13174605\\FileRecv\\2015082818' INTO TABLE track_log

FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' (注意这里文件是从linux导出的,以\n结尾)

sqoop从hive导出数据到mysql

先在mysql建立表

grant all privileges on *.* to 'root'@'%' identified by 'Nokia123' with grant option

CREATE TABLE `track_log` (

`id` varchar(1000) DEFAULT NULL,

`url` varchar(5000) DEFAULT NULL,

`referer` varchar(5000) DEFAULT NULL,

`keyword` varchar(5000) DEFAULT NULL,

`type` varchar(1000) DEFAULT NULL,

`guid` varchar(1000) DEFAULT NULL,

`pageId` varchar(1000) DEFAULT NULL,

`moduleId` varchar(1000) DEFAULT NULL,

`linkId` varchar(1000) DEFAULT NULL,

`attachedInfo` varchar(1000) DEFAULT NULL,

`sessionId` varchar(1000) DEFAULT NULL,

`trackerU` varchar(1000) DEFAULT NULL,

`trackerType` varchar(1000) DEFAULT NULL,

`ip` varchar(1000) DEFAULT NULL,

`trackerSrc` varchar(1000) DEFAULT NULL,

`cookie` varchar(5000) DEFAULT NULL,

`orderCode` varchar(1000) DEFAULT NULL,

`trackTime` varchar(1000) DEFAULT NULL,

`endUserId` varchar(1000) DEFAULT NULL,

`firstLink` varchar(1000) DEFAULT NULL,

`sessionViewNo` varchar(5000) DEFAULT NULL,

`productId` varchar(1000) DEFAULT NULL,

`curMerchantId` varchar(1000) DEFAULT NULL,

`provinceId` varchar(1000) DEFAULT NULL,

`cityId` varchar(1000) DEFAULT NULL,

`ds` varchar(20) DEFAULT NULL,

`hour` varchar(20) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=latin1

然后运行sqoop

jimmy》sqoop export --connect jdbc:mysql://localhost:3306/track_log --username root --password Nokia123 --table track_log --export-dir "/user/hive/warehouse/track_log/ds=20150828/hour=18" --fields-terminated-by '\t' --columns "id, url,referer,keyword,type,guid,pageId,moduleId,linkId,attachedInfo,sessionId,trackerU,trackerType,ip,trackerSrc,cookie,orderCode,endUserId,firstLink,sessionViewNo,productId,curMerchantId,provinceId,cityId,ds ,hour"

注意: 1)--table track_log 是目的mysql的表,库在链接指定(这里库名也叫track_log) --connect jdbc:mysql://localhost:3306/track_log

2) 不能直接从hive表里导出(不能--hive-table),只能从hdfs导出

3)必须指定字段--columns

4)字段分割符号要和hdfs文件里的一致 --fields-terminated-by '\t'


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/11559565.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-17
下一篇 2023-05-17

发表评论

登录后才能评论

评论列表(0条)

保存