day11MapReduce

day11MapReduce,第1张

day11MapReduce 一.分区

1.默认情况下一个splits等于一个block:128M

2.源码

//mapreduce 对block的逻辑切片
public List getSplits(JobContext job) throws IOException {
        StopWatch sw = (new StopWatch()).start();
        //切片的最大最小值Math.max(1L,1L)
        long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
        //无穷大
        long maxSize = getMaxSplitSize(job);
        //决定切片的大小公示 调小把maxsize调小,调大minsize调大
        //long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
        //protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
        //math.max(1L,Math.min(无穷,128))
        //return Math.max(minSize, Math.min(maxSize, blockSize));}
        List splits = new ArrayList();
        List files = this.listStatus(job);
        Iterator var9 = files.iterator();

        while(true) {
            while(true) {
                while(var9.hasNext()) {
                    FileStatus file = (FileStatus)var9.next();
                    Path path = file.getPath();
                    long length = file.getLen();
                    if (length != 0L) {
                        BlockLocation[] blkLocations;
                        if (file instanceof LocatedFileStatus) {
                            blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                        } else {
                            FileSystem fs = path.getFileSystem(job.getConfiguration());
                            blkLocations = fs.getFileBlockLocations(file, 0L, length);
                        }

                        if (this.isSplitable(job, path)) {
                            long blockSize = file.getBlockSize();
                            long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

                            long bytesRemaining;
                            int blkIndex;
                            for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }

                            if (bytesRemaining != 0L) {
                                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                            }
                        } else {
                            splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                        }
                    } else {
                        splits.add(this.makeSplit(path, 0L, length, new String[0]));
                    }
                }

                job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
                sw.stop();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS));
                }

                return splits;
            }
        }
    }

修改minsize和maxsize

Configuration con=new Configuration();
//只对当前程序起作用
con.set("mapred.min.split.size",“200”);
con.set("mapred.max.split.size",“100”)

永久起作用修改配置文件vim mapred-site.xml


     mapred.min.split.size
     填字节


     mapred.max.split.size
     填字节

3.如果一行的数据垮了两个block,读取数据
LineRecordReader读取数据时,如果读取的不是第一个切片,则默认不会读取第一行

   if (this.start != 0L) {//不是第一个切片
            this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start));
        }

LineRecordReader读取数据时,当读取到最后最后一行,永远会多读取一行

while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) {
            if (this.pos == 0L) {
                newSize = this.skipUtfByteOrderMark();
            } else {
                newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos));
                this.pos += (long)newSize;
            }

            if (newSize == 0 || newSize < this.maxLineLength) {
                break;
            }

            LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize));
        }

二.分区
    分区的概念:打编号把小于15的分成两个文件代码:
    导入pom文件:


    4.0.0
    
        
            org.apache.hadoop
            hadoop-common
            2.7.5
        
        
            org.apache.hadoop
            hadoop-client
            2.7.5
        
        
            org.apache.hadoop
            hadoop-hdfs
            2.7.5
        
        
            org.apache.hadoop
            hadoop-mapreduce-client-core
            2.7.5
        
        
            junit
            junit
            4.12
        
        
            org.slf4j
            slf4j-simple
            1.7.25
        
    

    com.hlzq
    day10
    1.0-SNAPSHOT



package com.hlzq;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyPartiMapper extends Mapper {
//  ki 0   v2:1	0	1	2017-07-31 23:10:12	837255	6	4+1+1=6	小,双	0	0.00	0.00	1	0.00	1	1
//  k21	0	1	2017-07-31 23:10:12	837255	6	4+1+1=6	小,双	0	0.00	0.00	1	0.00	1	1   v2:nullwritable
    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
      context.write(value,NullWritable.get());
    }
}
package com.hlzq;


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyPartiReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}
package com.hlzq;
//自定义分区规则:集成partitioner类,重写getPartioner
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;



public class MyParti  extends Partitioner {
    //k2 :Text text,v2: NullWritable nullWritable,rduce个数: int i在jobmain设这
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        String[] split = text.toString().split("t");
        int num = Integer.parseInt(split[5]);
        if (num<15){
            return 0;
        }else {
            return 1;
        }

    }
}
package com.hlzq;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class Jobmain {
    public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
        //1创建一个job任务对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "mypartioner");
        //2.指定job所在jar包
        job.setJarByClass(Jobmain.class);
        //3.制定源文件的读取方式和源文件的读取路径
        job.setInputFormatClass(TextInputFormat.class);//按照行读取


        TextInputFormat.addInputPath(job,new Path("file:///G:\input\partition.csv"));//指定源文件所在目录
        //4.指定自定义的mapper类,和k2,v2类型
        job.setMapperClass(MyPartiMapper.class);//指定mapper累
        job.setMapOutputKeyClass(Text.class);//k2类型
        job.setMapOutputValueClass(NullWritable.class);//v2类型
        //5.指定自定义分区(如果有的话)
        job.setPartitionerClass(MyParti.class);
        //设置reduce个数默认只有一个
        job.setNumReduceTasks(2);
        //6.指定自定义分组类(如果有的话)
        //7.指定自定义的reduce类和k3v3的数据类型
        job.setReducerClass(MyPartiReducer.class);//指定reduce累
        job.setOutputKeyClass(Text.class);//k3
        job.setOutputValueClass(NullWritable.class);//v3

        Path outputPath = new Path("file:///G:\output");
        FileSystem fileSystem = FileSystem.get(new URI("file:///"), new Configuration());

        boolean flag=fileSystem.exists(outputPath);
        if (flag){
            fileSystem.delete(outputPath,true);
        }

        //8.指定输出方式类和结果输出路径
        job.setOutputFormatClass(TextOutputFormat.class);

        TextOutputFormat.setOutputPath(job,outputPath);


        //9.将job提交给样集群
        boolean bl=job.waitForCompletion(true);//true表示可以看到任务的执行进度

        //10.退出执行进程
        System.exit(bl?0:1);




    }
}


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5705632.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存