lower-bound:分区字段值的下界
upper-bound:分区字段值的上界
num:分区数
根据以上参数,可以确定总的记录数(maxElemCount=maxVal-minVal),然后再基于分区数计算每个分区将要fetch的记录数(batchSize=maxElemCount / batchNum),即分区步长,接下来就是计算每个分区数据边界,算法很简单:
需要注意的是,分区列的最大值和最小值会作为过滤条件,因此如果设置的不合理,会导致数据查询不完整。
在Sqoop中,从mysql import,如果指定了并发数,即map task数,也要按照一定的分区方法,将数据split到多个map里。sqoop的算法和Flink的算法类似,本质上都是要对数据进行合理分片、分到多个task。sqoop算法如下,numSplits是数据分区数,同时也是map task数:
JDBC Table Source执行时,会先把split绑定到task、设置PreparedStatement where条件,接下来就是循环遍历ResultSet结果集了:
我们还要关注下算子的并行度,因为算子的并行度和数据的分区还是不一样的,所以这里还有一步,怎么把分区数据分配给并行化的算子。算子没有设置并行度时,就用默认并行度:
如果算子并行度设置的比分区数大,会有subtask空跑的情况,如果并行度设置的比分区数小,会有一个或多个subtask读取多个分区的情况。
Sqoop和Flink不同的一个点是,分区列的最大、最小值是运行时决定的,不是指定的,就是说sqoop开始执行时,会根据指定的sql查询出最值;而且sqoop的map task数就是分区数,不会有一个map拉取多个分区数据或一个map没有拉取到分区数据的情况。
Sqoop是一款用于把关系型数据库中的数据导入到hdfs中或者hive中的工具,当然也支持把数据从hdfs或者hive导入到关系型数据库中。Sqoop也是基于Mapreduce来做的数据导入。
关于sqoop的原理
sqoop的原理比较简单,就是根据用户指定的sql或者字段参数,从数据库中读取数据导入到hive或者hdfs中。也支持基于数据库导出工具导出,不过受限于数据库的版本。
在导出的过程中,sqoop会自动切分mapreduce任务。比如某个字段的主键是从1到1000,那么当设置num-mappers为2时,第一个mr任务会读取1-500的数据,第二个mr任务会读取500-1000的数据。如果是字符串还有其他的划分方法.
关于架构
sqoop目前有两个大版本,第一个版本比较简单,只能使用命令行
第二个版本引入了sqoop server,统一处理连接等信息,并提供多种连接方式,还引入了权限控制,另外规范了连接的各项配置。
直接在mysql里从本地文件系统导入数据mysql》LOAD DATA LOCAL INFILE 'C:\\Users\\asys\\Documents\\Tencent Files\\13174605\\FileRecv\\2015082818' INTO TABLE track_log
FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' (注意这里文件是从linux导出的,以\n结尾)
sqoop从hive导出数据到mysql
先在mysql建立表
grant all privileges on *.* to 'root'@'%' identified by 'Nokia123' with grant option
CREATE TABLE `track_log` (
`id` varchar(1000) DEFAULT NULL,
`url` varchar(5000) DEFAULT NULL,
`referer` varchar(5000) DEFAULT NULL,
`keyword` varchar(5000) DEFAULT NULL,
`type` varchar(1000) DEFAULT NULL,
`guid` varchar(1000) DEFAULT NULL,
`pageId` varchar(1000) DEFAULT NULL,
`moduleId` varchar(1000) DEFAULT NULL,
`linkId` varchar(1000) DEFAULT NULL,
`attachedInfo` varchar(1000) DEFAULT NULL,
`sessionId` varchar(1000) DEFAULT NULL,
`trackerU` varchar(1000) DEFAULT NULL,
`trackerType` varchar(1000) DEFAULT NULL,
`ip` varchar(1000) DEFAULT NULL,
`trackerSrc` varchar(1000) DEFAULT NULL,
`cookie` varchar(5000) DEFAULT NULL,
`orderCode` varchar(1000) DEFAULT NULL,
`trackTime` varchar(1000) DEFAULT NULL,
`endUserId` varchar(1000) DEFAULT NULL,
`firstLink` varchar(1000) DEFAULT NULL,
`sessionViewNo` varchar(5000) DEFAULT NULL,
`productId` varchar(1000) DEFAULT NULL,
`curMerchantId` varchar(1000) DEFAULT NULL,
`provinceId` varchar(1000) DEFAULT NULL,
`cityId` varchar(1000) DEFAULT NULL,
`ds` varchar(20) DEFAULT NULL,
`hour` varchar(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1
然后运行sqoop
jimmy》sqoop export --connect jdbc:mysql://localhost:3306/track_log --username root --password Nokia123 --table track_log --export-dir "/user/hive/warehouse/track_log/ds=20150828/hour=18" --fields-terminated-by '\t' --columns "id, url,referer,keyword,type,guid,pageId,moduleId,linkId,attachedInfo,sessionId,trackerU,trackerType,ip,trackerSrc,cookie,orderCode,endUserId,firstLink,sessionViewNo,productId,curMerchantId,provinceId,cityId,ds ,hour"
注意: 1)--table track_log 是目的mysql的表,库在链接指定(这里库名也叫track_log) --connect jdbc:mysql://localhost:3306/track_log
2) 不能直接从hive表里导出(不能--hive-table),只能从hdfs导出
3)必须指定字段--columns
4)字段分割符号要和hdfs文件里的一致 --fields-terminated-by '\t'
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)