1.默认情况下一个splits等于一个block:128M
2.源码
//mapreduce 对block的逻辑切片 public ListgetSplits(JobContext job) throws IOException { StopWatch sw = (new StopWatch()).start(); //切片的最大最小值Math.max(1L,1L) long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); //无穷大 long maxSize = getMaxSplitSize(job); //决定切片的大小公示 调小把maxsize调小,调大minsize调大 //long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); //protected long computeSplitSize(long blockSize, long minSize, long maxSize) { //math.max(1L,Math.min(无穷,128)) //return Math.max(minSize, Math.min(maxSize, blockSize));} List splits = new ArrayList(); List files = this.listStatus(job); Iterator var9 = files.iterator(); while(true) { while(true) { while(var9.hasNext()) { FileStatus file = (FileStatus)var9.next(); Path path = file.getPath(); long length = file.getLen(); if (length != 0L) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus)file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0L, length); } if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, new String[0])); } } job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; } } }
修改minsize和maxsize
Configuration con=new Configuration(); //只对当前程序起作用 con.set("mapred.min.split.size",“200”); con.set("mapred.max.split.size",“100”)
永久起作用修改配置文件vim mapred-site.xml
mapred.min.split.size 填字节 mapred.max.split.size 填字节
3.如果一行的数据垮了两个block,读取数据
LineRecordReader读取数据时,如果读取的不是第一个切片,则默认不会读取第一行
if (this.start != 0L) {//不是第一个切片 this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start)); }
LineRecordReader读取数据时,当读取到最后最后一行,永远会多读取一行
while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) { if (this.pos == 0L) { newSize = this.skipUtfByteOrderMark(); } else { newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos)); this.pos += (long)newSize; } if (newSize == 0 || newSize < this.maxLineLength) { break; } LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize)); }二.分区
- 分区的概念:打编号把小于15的分成两个文件代码:
导入pom文件:
4.0.0 org.apache.hadoop hadoop-common2.7.5 org.apache.hadoop hadoop-client2.7.5 org.apache.hadoop hadoop-hdfs2.7.5 org.apache.hadoop hadoop-mapreduce-client-core2.7.5 junit junit4.12 org.slf4j slf4j-simple1.7.25 com.hlzq day101.0-SNAPSHOT
package com.hlzq; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MyPartiMapper extends Mapper{ // ki 0 v2:1 0 1 2017-07-31 23:10:12 837255 6 4+1+1=6 小,双 0 0.00 0.00 1 0.00 1 1 // k21 0 1 2017-07-31 23:10:12 837255 6 4+1+1=6 小,双 0 0.00 0.00 1 0.00 1 1 v2:nullwritable @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } }
package com.hlzq; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MyPartiReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } }
package com.hlzq; //自定义分区规则:集成partitioner类,重写getPartioner import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyParti extends Partitioner{ //k2 :Text text,v2: NullWritable nullWritable,rduce个数: int i在jobmain设这 @Override public int getPartition(Text text, NullWritable nullWritable, int i) { String[] split = text.toString().split("t"); int num = Integer.parseInt(split[5]); if (num<15){ return 0; }else { return 1; } } }
package com.hlzq; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class Jobmain { public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException { //1创建一个job任务对象 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, "mypartioner"); //2.指定job所在jar包 job.setJarByClass(Jobmain.class); //3.制定源文件的读取方式和源文件的读取路径 job.setInputFormatClass(TextInputFormat.class);//按照行读取 TextInputFormat.addInputPath(job,new Path("file:///G:\input\partition.csv"));//指定源文件所在目录 //4.指定自定义的mapper类,和k2,v2类型 job.setMapperClass(MyPartiMapper.class);//指定mapper累 job.setMapOutputKeyClass(Text.class);//k2类型 job.setMapOutputValueClass(NullWritable.class);//v2类型 //5.指定自定义分区(如果有的话) job.setPartitionerClass(MyParti.class); //设置reduce个数默认只有一个 job.setNumReduceTasks(2); //6.指定自定义分组类(如果有的话) //7.指定自定义的reduce类和k3v3的数据类型 job.setReducerClass(MyPartiReducer.class);//指定reduce累 job.setOutputKeyClass(Text.class);//k3 job.setOutputValueClass(NullWritable.class);//v3 Path outputPath = new Path("file:///G:\output"); FileSystem fileSystem = FileSystem.get(new URI("file:///"), new Configuration()); boolean flag=fileSystem.exists(outputPath); if (flag){ fileSystem.delete(outputPath,true); } //8.指定输出方式类和结果输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,outputPath); //9.将job提交给样集群 boolean bl=job.waitForCompletion(true);//true表示可以看到任务的执行进度 //10.退出执行进程 System.exit(bl?0:1); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)