hadoop之MapReduce统计选修课程人数,不及格门数,选课人数

hadoop之MapReduce统计选修课程人数,不及格门数,选课人数,第1张

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


目录

前言

一、题目要求

二、数据解析

        student.txt文件部分数据

三、需求分析及代码编写

        总体的思路:

需求1:求DataBase课程的选修人数

需求2:求每个同学不及格的门数

需求3:求各个课程的选课人数

四、打包代码,上传到集群上运行

五、附带一张实现需求2过程详解



前言

        学习的记录,有误请指正

        需要已经配置好IDEA和hadoop的环境

一、题目要求

1、求DataBase课程的选修人数

2、求每个同学的不及格的门数

3、求各个课程的选课人数

二、数据解析

        student.txt文件部分数据

Alger,Algorithm,50
Alger,OperatingSystem,32
Alger,Python,96
Alger,ComputerNetwork,20
Alger,Software,74
Allen,Algorithm,76
Allen,OperatingSystem,70
Allen,Python,10
Allen,Software,76
Alston,Algorithm,78
Alston,DataStructure,74
Alston,Python,96
Alston,Software,28
Alva,DataBase,72
Alva,DataStructure,64
Alva,CLanguage,0
Alva,ComputerNetwork,58
Alva,Software,82
Alvin,DataBase,88
Alvin,Algorithm,96
Alvin,OperatingSystem,26
Alvin,Python,84
Alvin,ComputerNetwork,76
Alvis,Algorithm,18
Alvis,DataStructure,56

该文件分为三部分

第一部分为学生姓名

第二部分为选修课程

第三部分为该学生对应选修个课程的成绩。

很明显的可以看出他们是以英文标点 "," 相隔。

三、需求分析及代码编写         总体的思路:

        先通过InputFormat的子方法FileInputFormat将文件上传到hadoop集群当中,这是我们的文本数据将会被按照行划分,形成一个的一个键值对。其中k1相当于偏移量,v1则是改行的数据。demo:Alger,Algorithm,50被划分成<0 Alger,Algorithm,50>。此时k1的类型时LongWritable,v1的类型是Text

        第二步,map阶段,在这个阶段,要继承Mapper重写map函数。我们要做的就是对v1进行一些 *** 作,比如按照什么分隔符进行分割文本数据,分割之后选取那些我们需要的数据,并形成一个新的,传入到Reducer阶段

        第三步,Reducer阶段,这个阶段就是对map阶段过来的数据进行一些汇并进一步汇总,并进一步精细。这要继承Reducer重写reduce函数。

        最后,通过OutputFormat的子方法FileOutputFormat将Reducer段的结果输出。

需求1:求DataBase课程的选修人数

        Map端:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Mapper extends Mapper {
    @Override
    protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
        String data = value1.toString();
        String[] names = data.split(",");//按照","对行数据进行分割
        if(names[0].equals("DataBase")) {
            context.write(new Text(names[1]), new IntWritable(Integer.parseInt(names[2])));
        }

    }


}

Reducer端:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;


public class Reducer extends Reducer {

    @Override
    protected void reduce(Text key2, Iterable value2, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable v : value2) {
            count++;
            }
        context.write(new Text(key2),new IntWritable(count));
    }
}

Main:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class Main {
    public static void main(String[] agrs) throws Exception{
        Job job=Job.getInstance(new Configuration());//创建job工作
        job.setJarByClass(Main.class);//主类是sum.class
        job.setMapperClass(Mapper.class);//指定job的mapper和k,v的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(Reducer.class);//指定job的reducer和k,v的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //指定文件路径
        FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.219.131:8020/user/input/data01.txt"));

        Path outpath = new Path("hdfs://192.168.219.131:8020/user/output");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outpath)) {
            fs.delete(outpath, true);
        }//如果outpath存在,则将他删除
        FileOutputFormat.setOutputPath(job, outpath);
        job.waitForCompletion(true);

    }
}

需求2:求每个同学不及格的门数

Map阶段:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Mapper extends Mapper {
    @Override
    protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
        String data = value1.toString();
        String[] names = data.split(",");//按照","对行数据进行分割

                 //names[0]:学生的姓名 names[2]: 学生的成绩       
            context.write(new Text(names[0]), new IntWritable(Integer.parseInt(names[2])));
        }

    }


}

Reducer阶段:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;


public class Reducer extends Reducer {

    @Override
    protected void reduce(Text key2, Iterable value2, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable v : value2) {
                if (v.get()>=60)
                    count++;
            }
        context.write(new Text(key2),new IntWritable(count));
    }
}

Main方法同上

需求3:求各个课程的选课人数

Map阶段:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Mapper extends Mapper {
    @Override
    protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
        String data = value1.toString();
        String[] names = data.split(",");//按照","对行数据进行分割

                 //names[]:课程名        
            context.write(new Text(names[]), new IntWritable(1));
        }

    }


}

Reducer阶段:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;


public class Reducer extends Reducer {

    @Override
    protected void reduce(Text key2, Iterable value2, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable v : value2) {
            count++;
            }
        context.write(new Text(key2),new IntWritable(count));
    }
}

Main方法同上 

四、打包代码,上传到集群上运行

将target下面的...jar包保存下来,放到虚拟机中

打开集群:

start-all.sh

 使用命令运行我们上传上去的...jar包

hadoop jar Index1.0-SNAPSHOT.jar Index.Main

该命令解释:

Index1.0-SNAPSHOT.jar :为你所打包的名字,以及他存放的路径,我是将所打的包放在了我的Hadoop目录下面,这样比较方便

Index.Main : 是你所写的程序的主类

五、附带一张实现需求2过程详解

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/921102.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存