MapReduce学习二:切片机制

MapReduce学习二:切片机制,第1张

MapReduce学习二:切片机制 一、InputFormat数据输入 1.1切片与MapTask并行度决定机制

问题引出:

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

思考:MapTask是否越多越好?

        MapTask并非越多越好,需要根据数据量合理设置MapTask的数量,否则不但不会提高集群性能,反而适得其反。

        举例:1k数据,根本不需要多个MapTask,反而启动MapTask花费的时间比处理数据多的多。

MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

 job提交流程源码关键部分解读

waitForCompletion()
submit();
//1建立连接
connect();
//1)创建提交Job的代理
newCluster(getConfiguration());
//(1)判断是本地yarn还是远程
initialize(jobTrackAddr,conf);
//2提交job
submitter.submitJobInternal(Job.this,cluster)
//1)创建给集群提交数据的Stag路径
PathjobStagingArea=
JobSubmissionFiles.getStagingDir(cluster,conf);
//2)获取jobid,并创建Job路径
JobIDjobId=submitClient.getNewJobID();
//3)拷贝jar包到集群
copyAndConfigureFiles(job,submitJobDir);
rUploader.uploadFiles(job,jobSubmitDir);
//4)计算切片,生成切片规划文件
writeSplits(job,submitJobDir);
maps=writeNewSplits(job,jobSubmitDir);
input.getSplits(job);
//5)向Stag路径写XML配置文件
writeConf(conf,submitJobFile);
conf.writeXml(out);
//6)提交Job,返回提交状态
status=submitClient.submitJob(jobId,
submitJobDir.toString(),job.getCredentials());

1.2FileInputFormat切片源码解析

(1)程序先找到你数据存储的目录

(2)开始遍历处理(规划切片)目录下的每个文件

(3)遍历第一个文件xx.txt

    获取文件大小fs.sizeOf(xx.txt)计算切片大小( Math.max(minSize, Math.min(maxSize, blockSize));)默认情况下,切片大小=blocksize开始切,形成第一个切片,然后第二个,依次往后(其中每切一片,都要将剩下部分的大小和块的1.1倍作比较,大于这个大小才会继续切,否则就不切了)将切片信息写到一个切片规划文件中。整个切片的核心过程 在getSplits(job)方法中InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。

(4)提交切片规划文件到YARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。

1.3FileInputFormat切片机制

简单地按照文件的内容长度进行切片切片大小,默认等于Block的大小切片时不考虑数据集整体,而是逐个针对每个文件进行单独切片。

切片大小设置

        源码中计算切片大小的公式是:Math.max(minSize, Math.min(maxSize, blockSize));

若是想减小切片大小:调maxSize,降的比blockSize低就可以;

若是想调大:那就增大minSize,一样的道理,比blockSize大就可以。

获取切片信息API

//获取切片的文件名称

String name = inputSplit.getPath().getName();

//根据文件类型获取切片信息

FileSplit inputSplit = (FileSplit) context.getInputSplit();
1.4CombineTextInputFormat切片机制

        框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个Maptask,这样如果有大量小文件,就会产生大量MapTask,处理效率及其低下。

1、应用场景

        CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask。

2、虚拟存储切片最大值设置

        CombineTextInputFormat.setMaxInputSplitSize(job,4194304);//4M

注意:虚拟存储切片最大值设置最好根据实际情况的小文件情况来设置。

3、切片机制

        生成切片过程包括:虚拟存储过程和切片过程两部分。

 

4.使用CombineTextInputFormat

例如:上面的WordCount案例:(不使用的话,默认走TextInputFormat)

在Driver类中增加如下代码,设置虚拟存储切片最大值设置20M

job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job,20971520)
1.5FileInputFormat实现类

思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的??

        FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputForamt和自定义InputFormat等。

1.5.1TextInputFormat

        TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),text类型。

 1.5.2KeyValueTextInputFormat

        每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SUPERATOR,"t");来设定分隔符。默认分隔符是tab(t)。

 案例:

 mapper

public class KVTextMapper extends Mapper {
    
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        //2.写出去
        context.write(key,v);
    }
}

reducer

public class KVTextReducer extends Reducer {

    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;

        //累加求和
        for (IntWritable value : values) {
            sum += value.get();
        }

        v.set(sum);

        //写出
        context.write(key, v);
    }
}

driver

public class KVTextDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        args = new String[]{"d:/work/input", "d:/output"};

        //1.获取job对象
        Configuration conf =new Configuration();
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR," ");
        Job job = Job.getInstance(conf);

        //2.设置jar存储路径
        job.setJarByClass(KVTextDriver.class);

        //3.管理mapper和reducer类
        job.setMapperClass(KVTextMapper.class);
        job.setReducerClass(KVTextReducer.class);

        //4.设置mapper输出的key和value类型
        job.setMapOutputValueClass(IntWritable.class);
        job.setMapOutputKeyClass(Text.class);

        //5.设置最终的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        //6.设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //7.提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
1.5.3NLineInputFormat

        若是使用NLineInputFormat,代表每一个map进程处理的InputSplit不再按Block块去划分,而是按照NLineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不整除,切片数=商+1;

 案例:

 1.5.4自定义InputFormat

        企业开发中自带的InputFormat无法满足需要,这时就得自定义InputFormat来解决实际问题。

自定义InputFormat步骤:

(1)自定义一个类继承FileInputFormat

(2)改写RecordReader,实现一次读取一个完整文件封装为KV

(3)在输出的时使用SequenceFileOutPutFormat输出合并文件。

案例:

        使用自定义InputFormat实现小文件合并

 

 代码实现:

自定义FileInputFormat类

package com.loong.inputformat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;


public class WholeFileInputFormat extends FileInputFormat {

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split,context);
        return recordReader;
    }
}

改写RecordReader类

package com.loong.inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;


public class WholeRecordReader extends RecordReader {

    private Configuration configuration;
    private FileSplit split;
    private boolean isProgress = true;
    private BytesWritable value = new BytesWritable();
    private Text k = new Text();

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.split = (FileSplit) split;
        configuration = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (isProgress) {
            //1定义缓存区
            byte[] contents = new byte[(int) split.getLength()];
            FileSystem fs = null;
            FSDataInputStream fis = null;
            try {
                //2获取文件系统
                Path path = split.getPath();
                fs = path.getFileSystem(configuration);
                //3读取数据
                fis = fs.open(path);
                //4读取文件内容
                IOUtils.readFully(fis, contents, 0, contents.length);
                //5输出文件内容
                value.set(contents, 0, contents.length);
                //6获取文件路径及名称
                String name = split.getPath().toString();
                //7设置输出的key值
                k.set(name);
            } catch (Exception e) {
            } finally {
                IOUtils.closeStream(fis);
            }
            isProgress = false;
            return true;
        }
        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException,
            InterruptedException {
        return k;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException,
            InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {

    }
}

mapper

package com.loong.inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class SequenceFileMapper extends Mapper {
    @Override
    protected void map(Text key, BytesWritable value, Mapper.Context context) throws IOException, InterruptedException {
        context.write(key,value);
    }
}

reducer

package com.loong.inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;



public class SequenceFileReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable
            values, Context context) throws IOException,
            InterruptedException, IOException {
        context.write(key, values.iterator().next());
    }
}

Driver

package com.loong.inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;


public class SequenceFileDriver {
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
//输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[]{"e:/input/inputinputformat",
                "e:/output1"};
//1获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2设置jar包存储位置、关联自定义的mapper和reducer
        job.setJarByClass(SequenceFileDriver.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);
        //7设置输入的inputFormat
        job.setInputFormatClass(WholeFileInputFormat.class);
        //8设置输出的outputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        //3设置map输出端的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        //4设置最终输出端的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        //5设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //6提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709714.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存