问题引出:
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
思考:MapTask是否越多越好?
MapTask并非越多越好,需要根据数据量合理设置MapTask的数量,否则不但不会提高集群性能,反而适得其反。
举例:1k数据,根本不需要多个MapTask,反而启动MapTask花费的时间比处理数据多的多。
MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
job提交流程源码关键部分解读
waitForCompletion() submit(); //1建立连接 connect(); //1)创建提交Job的代理 newCluster(getConfiguration()); //(1)判断是本地yarn还是远程 initialize(jobTrackAddr,conf); //2提交job submitter.submitJobInternal(Job.this,cluster) //1)创建给集群提交数据的Stag路径 PathjobStagingArea= JobSubmissionFiles.getStagingDir(cluster,conf); //2)获取jobid,并创建Job路径 JobIDjobId=submitClient.getNewJobID(); //3)拷贝jar包到集群 copyAndConfigureFiles(job,submitJobDir); rUploader.uploadFiles(job,jobSubmitDir); //4)计算切片,生成切片规划文件 writeSplits(job,submitJobDir); maps=writeNewSplits(job,jobSubmitDir); input.getSplits(job); //5)向Stag路径写XML配置文件 writeConf(conf,submitJobFile); conf.writeXml(out); //6)提交Job,返回提交状态 status=submitClient.submitJob(jobId, submitJobDir.toString(),job.getCredentials());1.2FileInputFormat切片源码解析
(1)程序先找到你数据存储的目录
(2)开始遍历处理(规划切片)目录下的每个文件
(3)遍历第一个文件xx.txt
- 获取文件大小fs.sizeOf(xx.txt)计算切片大小( Math.max(minSize, Math.min(maxSize, blockSize));)默认情况下,切片大小=blocksize开始切,形成第一个切片,然后第二个,依次往后(其中每切一片,都要将剩下部分的大小和块的1.1倍作比较,大于这个大小才会继续切,否则就不切了)将切片信息写到一个切片规划文件中。整个切片的核心过程 在getSplits(job)方法中InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。
(4)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。
1.3FileInputFormat切片机制简单地按照文件的内容长度进行切片切片大小,默认等于Block的大小切片时不考虑数据集整体,而是逐个针对每个文件进行单独切片。
切片大小设置
源码中计算切片大小的公式是:Math.max(minSize, Math.min(maxSize, blockSize));
若是想减小切片大小:调maxSize,降的比blockSize低就可以;
若是想调大:那就增大minSize,一样的道理,比blockSize大就可以。
获取切片信息API
//获取切片的文件名称
String name = inputSplit.getPath().getName();
//根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();1.4CombineTextInputFormat切片机制
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个Maptask,这样如果有大量小文件,就会产生大量MapTask,处理效率及其低下。
1、应用场景
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask。
2、虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job,4194304);//4M
注意:虚拟存储切片最大值设置最好根据实际情况的小文件情况来设置。
3、切片机制
生成切片过程包括:虚拟存储过程和切片过程两部分。
4.使用CombineTextInputFormat
例如:上面的WordCount案例:(不使用的话,默认走TextInputFormat)
在Driver类中增加如下代码,设置虚拟存储切片最大值设置20M
job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job,20971520)1.5FileInputFormat实现类
思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的??
FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputForamt和自定义InputFormat等。
1.5.1TextInputFormatTextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),text类型。
1.5.2KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SUPERATOR,"t");来设定分隔符。默认分隔符是tab(t)。
案例:
mapper
public class KVTextMapper extends Mapper{ IntWritable v = new IntWritable(1); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { //2.写出去 context.write(key,v); } }
reducer
public class KVTextReducer extends Reducer{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; //累加求和 for (IntWritable value : values) { sum += value.get(); } v.set(sum); //写出 context.write(key, v); } }
driver
public class KVTextDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { args = new String[]{"d:/work/input", "d:/output"}; //1.获取job对象 Configuration conf =new Configuration(); conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR," "); Job job = Job.getInstance(conf); //2.设置jar存储路径 job.setJarByClass(KVTextDriver.class); //3.管理mapper和reducer类 job.setMapperClass(KVTextMapper.class); job.setReducerClass(KVTextReducer.class); //4.设置mapper输出的key和value类型 job.setMapOutputValueClass(IntWritable.class); job.setMapOutputKeyClass(Text.class); //5.设置最终的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); //6.设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //7.提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }1.5.3NLineInputFormat
若是使用NLineInputFormat,代表每一个map进程处理的InputSplit不再按Block块去划分,而是按照NLineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=商+1;
案例:
1.5.4自定义InputFormat企业开发中自带的InputFormat无法满足需要,这时就得自定义InputFormat来解决实际问题。
自定义InputFormat步骤:
(1)自定义一个类继承FileInputFormat
(2)改写RecordReader,实现一次读取一个完整文件封装为KV
(3)在输出的时使用SequenceFileOutPutFormat输出合并文件。
案例:
使用自定义InputFormat实现小文件合并
代码实现:
自定义FileInputFormat类
package com.loong.inputformat; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class WholeFileInputFormat extends FileInputFormat
改写RecordReader类
package com.loong.inputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class WholeRecordReader extends RecordReader { private Configuration configuration; private FileSplit split; private boolean isProgress = true; private BytesWritable value = new BytesWritable(); private Text k = new Text(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit) split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (isProgress) { //1定义缓存区 byte[] contents = new byte[(int) split.getLength()]; FileSystem fs = null; FSDataInputStream fis = null; try { //2获取文件系统 Path path = split.getPath(); fs = path.getFileSystem(configuration); //3读取数据 fis = fs.open(path); //4读取文件内容 IOUtils.readFully(fis, contents, 0, contents.length); //5输出文件内容 value.set(contents, 0, contents.length); //6获取文件路径及名称 String name = split.getPath().toString(); //7设置输出的key值 k.set(name); } catch (Exception e) { } finally { IOUtils.closeStream(fis); } isProgress = false; return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return k; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } }
mapper
package com.loong.inputformat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SequenceFileMapper extends Mapper{ @Override protected void map(Text key, BytesWritable value, Mapper .Context context) throws IOException, InterruptedException { context.write(key,value); } }
reducer
package com.loong.inputformat; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SequenceFileReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException, IOException { context.write(key, values.iterator().next()); } }
Driver
package com.loong.inputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import java.io.IOException; public class SequenceFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[]{"e:/input/inputinputformat", "e:/output1"}; //1获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2设置jar包存储位置、关联自定义的mapper和reducer job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); //7设置输入的inputFormat job.setInputFormatClass(WholeFileInputFormat.class); //8设置输出的outputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class); //3设置map输出端的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); //4设置最终输出端的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); //5设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //6提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)