Hadoop高手之路--03--MapReduce分布式计算框架

Hadoop高手之路--03--MapReduce分布式计算框架,第1张

Hadoop高手之路--03--MapReduce分布式计算框架 MapReduce分布式计算框架 一、概述

MapReduce是Hadoop系统核心组件之一,它是一种可用于大数据并行处理的计算模型、框架和平台,主要解决海量数据的计算,是目前分布式计算模型中应用较为广泛的一种。

二、本地模式 1、新建一个maven项目


2、修改本地镜像

3、添加maven依赖,修改pom文件

4、新创建三个包及三个组件

5、写mapper组件
package cn.edu.hgu.mapreduce.mapper;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 接收传入的一行文本
        String line = value.toString();
        // 把一行文本分割成一个个的单词
        String[] words = line.split(" ");
        // 把每个单词组成<单词,1>二元组k-v格式输出,作为reducer的输入
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }

}



6、写reducer组件
package cn.edu.hgu.mapreduce.reducer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;


public class WordCountReducer extends Reducer {

    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        // 定义一个计数器
        int count = 0;
        // 对迭代器做累加的 *** 作
        for (IntWritable value : values) {
            // get()获取IntWritable整型值
            count += value.get();

        }
        // 输出
        context.write(key, new IntWritable(count));
    }

}


7、写driver组件

注意路径的修改

package cn.edu.hgu.mapreduce.driver;

import cn.edu.hgu.mapreduce.mapper.WordCountMapper;
import cn.edu.hgu.mapreduce.reducer.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        // 设置mapreduce的运行模式,为本地模式
//        conf.set("mapreduce.framework.name","local");
        // 创建mapreduce的job
        Job job = Job.getInstance(conf);
        // 开始装配各个组件
        // 指定job的运行主类
        job.setJarByClass(WordCountDriver.class);
        // 指定job的mapper组件
        job.setMapperClass(WordCountMapper.class);
        // 指定job的reduce组件
        job.setReducerClass(WordCountReducer.class);
        // 设置Mapper类的key和value的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 设置reduce类的输出阶段的key和value的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 指定本地模式的数据源
        FileInputFormat.setInputPaths(job, "E:/wordcount/input");
        // 指定本地模式的输出目的的
        FileOutputFormat.setOutputPath(job,new Path("E:/wordcount/output"));
        // 提交程序运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}



8、建立数据集


之后将下载的文件放在E:wordcountinput文件夹内

之后,运行driver程序

并在E:wordcountoutput下查看运行结果:


三、集群模式

先把项目打成jar包,然后提交到hadoop集群上运行

1、新建一个maven项目

2、添加maven依赖

3、添加mapper、reducer、driver组件

Mapper组件


Reducer组件


Driver组件

4、准备用于计数的文件和文件夹

在Hadoop集群上创建文件夹

将a.txt 以及 b.txt上传到Hadoop集群上

5、项目本地进行调试

首先确认文件路径为本地路径


之后运行,查看结果,确认无误

6、项目打jar包

7、jar包上传到虚拟机上

集群是没有e盘的,所以需要将路径改为集群路径


8、提交到hadoop集群运行


9、查看结果

四、经典案例 1、倒排索引 (1) 新建包cn.itcast.mr.invertedIndex并创建四个类

(2) Map阶段实现
package cn.edu.hgu.invertedIndex;


import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertedIndexMapper extends Mapper {
    private static Text keyInfo = new Text();//存储单词和URL组合
    private static final Text valueInfo = new Text("1");//存储词频,初始化为1
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException{
        String line = value.toString();
        String[] fields = StringUtils.split(line, " ");//得到字段数组
        FileSplit fileSplit = (FileSplit) context.getInputSplit();//得到这行数据所在的文件切片
        String fileName = fileSplit.getPath().getName();//根据文件切片得到文件名
        for (String field : fields){
            //key值由单词和URL组成,如"MapReduce:file1"
            keyInfo.set(field +":" + fileName);
            context.write(keyInfo, valueInfo);
        }
    }
}


(3) Combine阶段实现
package cn.edu.hgu.invertedIndex;


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexCombiner extends Reducer{
    private static Text info = new Text();
    //输入: 
    //输出: 
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException,InterruptedException{
        int sum=0;//统计词频
        for (Text value : values) {
            sum += Integer.parseInt(value.toString());
        }
        int splitIndex = key.toString().indexOf(":");//重新设置value值由URL和词频组成
        info.set(key.toString().substring(splitIndex +1) +":" + sum);
        //重新设置key值为单词
        key.set(key.toString().substring(0, splitIndex));
        context.write(key, info);
    }
}

(4) Reduce阶段实现
package cn.edu.hgu.invertedIndex;


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexReducer extends Reducer {
    private static Text result = new Text();
    //输入: 
    //输出: 
    @Override
    protected void reduce(Text key, Iterable values, Context context)
            throws IOException, InterruptedException {
        //生成文档列表
        String fileList = new String();
        for (Text value : values) {
            fileList += value.toString() +";";
        }
        result.set(fileList);
        context.write(key, result);
    }
}


(5) Driver程序主类实现
package cn.edu.hgu.invertedIndex;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input. FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class InvertedIndexDriver {
    public static void main(String[] args) throws ClassNotFoundException,IOException,InterruptedException{
        Configuration conf = new Configuration();
        Job job = Job.getInstance();
        job.setJarByClass(InvertedIndexDriver.class);
        job.setMapperClass(InvertedIndexMapper.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job, new Path("E:\InvertedIndex\input"));
        //指定处理完成之后的结果所保存的位置
        FileOutputFormat.setOutputPath(job, new Path("E:\InvertedIndex\output"));
        //向yarn集群提交这个job
        boolean res =job.waitForCompletion(true);
        System.exit(res? 0: 1);
    }
}

(6) 准备文件和文件夹


之后再新建三个txt文件

(7) 运行主程序Driver并查看结果


查看结果,运行成功

2、数据去重 (1)新建包dedup并新建三个组件

(2)Map阶段实现
package cn.edu.hgu.dedup;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class DedupMapper extends Mapper {
    private static Text field = new Text();
    //<0,2020-9-3 c><11,2020-9-4 d>@Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        field = value;
        //NullWritable.get()方法设置空值
        context.write(field, NullWritable.get());
    }
}

(3)Reduce阶段实现
package cn.edu.hgu.dedup;

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DedupReducer extends Reducer {
    //<2020-9-3 c.null> <2020-9-4 d.null><2020-9-4 d.null>
    @Override
    protected void reduce(Text key, Iterable values, Context context)
            throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

(4)Driver程序主类实现
package cn.edu.hgu.dedup;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class DedupDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
        Configuration conf = new Configuration();
        Job job =Job.getInstance();
        job.setJarByClass(DedupDriver.class);
        job.setMapperClass(DedupMapper.class);
        job.setReducerClass(DedupReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path("E://Dedup//input"));
        //指定处理完成之后的结果所保存的位置
        FileOutputFormat.setOutputPath(job, new Path("E://Dedup//output"));
        job.waitForCompletion(true);
    }
}

(5)准备文件以及文件夹

在E盘下新建Dedup/input文件夹并在其内新建三个txt文件如下所示:

(6)运行主程序Driver并查看结果

查看结果为:

3、TopN (1)新建包topN及三个组件

(2)Map阶段实现
package cn.edu.hgu.topN;



import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TopNMapper extends Mapper {
    //TreeMap默认对key升序排序
    private TreeMap treemap = new TreeMap();
    private IntWritable iw = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value,
                       Mapper.Context context)
            throws IOException, InterruptedException {
        String val = value.toString();
        String[] vals = val.split(" ");
        for (String v : vals) {
            treemap.put(Integer.parseInt(v), v);
            if (treemap.size() > 5) {
                //如果treemap长度大于5,就把第一个key删掉
                treemap.remove(treemap.firstKey());
            }
        }
    }

    @Override
    protected void cleanup(Mapper.Context context)
            throws IOException, InterruptedException {
        for (Integer i : treemap.keySet()) {
            iw.set(i);
            context.write(NullWritable.get(), iw);
        }
    }
}

(3)Reduce阶段实现
package cn.edu.hgu.topN;



import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;


public class TopNReducer extends Reducer {
    private IntWritable iw = new IntWritable();
    //倒序输出
    private TreeMap treemap = new TreeMap(new Comparator() {

        public int compare(Integer o1, Integer o2) {

            return o2 - o1;
        }
    });

    @Override
    protected void reduce(NullWritable key, Iterable value,
                          Reducer.Context context)
            throws IOException, InterruptedException {
        for (IntWritable iw : value) {
            treemap.put(iw.get(), NullWritable.get() + " ");
            if (treemap.size() > 5) {
                treemap.remove(treemap.lastKey());
            }
        }
    }

    @Override
    protected void cleanup(Reducer.Context context)
            throws IOException, InterruptedException {
        for (Integer i : treemap.keySet()) {
            iw.set(i);
            context.write(NullWritable.get(), iw);
        }
    }
}


(4)Driver程序主类实现
package cn.edu.hgu.topN;


import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class TopNDriver {

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(TopNDriver.class);
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("E:\topN\input"));
        FileOutputFormat.setOutputPath(job, new Path("E:\topN\output"));
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }

}

(5)准备文件以及文件夹

在E盘下新建topN/input文件夹并内新建txt文件夹内容如下

(6)运行主程序Driver并查看结果

查看结果如下:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5701842.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存