提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
目录
前言
一、题目要求
二、数据解析
student.txt文件部分数据
三、需求分析及代码编写
总体的思路:
需求1:求DataBase课程的选修人数
需求2:求每个同学不及格的门数
需求3:求各个课程的选课人数
四、打包代码,上传到集群上运行
五、附带一张实现需求2过程详解
前言
学习的记录,有误请指正
需要已经配置好IDEA和hadoop的环境
一、题目要求1、求DataBase课程的选修人数
2、求每个同学的不及格的门数
3、求各个课程的选课人数
二、数据解析 student.txt文件部分数据Alger,Algorithm,50
Alger,OperatingSystem,32
Alger,Python,96
Alger,ComputerNetwork,20
Alger,Software,74
Allen,Algorithm,76
Allen,OperatingSystem,70
Allen,Python,10
Allen,Software,76
Alston,Algorithm,78
Alston,DataStructure,74
Alston,Python,96
Alston,Software,28
Alva,DataBase,72
Alva,DataStructure,64
Alva,CLanguage,0
Alva,ComputerNetwork,58
Alva,Software,82
Alvin,DataBase,88
Alvin,Algorithm,96
Alvin,OperatingSystem,26
Alvin,Python,84
Alvin,ComputerNetwork,76
Alvis,Algorithm,18
Alvis,DataStructure,56
该文件分为三部分
第一部分为学生姓名
第二部分为选修课程
第三部分为该学生对应选修个课程的成绩。
很明显的可以看出他们是以英文标点 "," 相隔。
三、需求分析及代码编写 总体的思路: 先通过InputFormat的子方法FileInputFormat将文件上传到hadoop集群当中,这是我们的文本数据将会被按照行划分,形成一个
第二步,map阶段,在这个阶段,要继承Mapper重写map函数。我们要做的就是对v1进行一些 *** 作,比如按照什么分隔符进行分割文本数据,分割之后选取那些我们需要的数据,并形成一个新的
第三步,Reducer阶段,这个阶段就是对map阶段过来的数据进行一些汇并进一步汇总,并进一步精细。这要继承Reducer重写reduce函数。
最后,通过OutputFormat的子方法FileOutputFormat将Reducer段的结果输出。
需求1:求DataBase课程的选修人数Map端:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Mapper extends Mapper {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
String data = value1.toString();
String[] names = data.split(",");//按照","对行数据进行分割
if(names[0].equals("DataBase")) {
context.write(new Text(names[1]), new IntWritable(Integer.parseInt(names[2])));
}
}
}
Reducer端:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Reducer extends Reducer {
@Override
protected void reduce(Text key2, Iterable value2, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : value2) {
count++;
}
context.write(new Text(key2),new IntWritable(count));
}
}
Main:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Main {
public static void main(String[] agrs) throws Exception{
Job job=Job.getInstance(new Configuration());//创建job工作
job.setJarByClass(Main.class);//主类是sum.class
job.setMapperClass(Mapper.class);//指定job的mapper和k,v的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(Reducer.class);//指定job的reducer和k,v的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定文件路径
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.219.131:8020/user/input/data01.txt"));
Path outpath = new Path("hdfs://192.168.219.131:8020/user/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}//如果outpath存在,则将他删除
FileOutputFormat.setOutputPath(job, outpath);
job.waitForCompletion(true);
}
}
需求2:求每个同学不及格的门数
Map阶段:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Mapper extends Mapper {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
String data = value1.toString();
String[] names = data.split(",");//按照","对行数据进行分割
//names[0]:学生的姓名 names[2]: 学生的成绩
context.write(new Text(names[0]), new IntWritable(Integer.parseInt(names[2])));
}
}
}
Reducer阶段:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Reducer extends Reducer {
@Override
protected void reduce(Text key2, Iterable value2, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : value2) {
if (v.get()>=60)
count++;
}
context.write(new Text(key2),new IntWritable(count));
}
}
Main方法同上
需求3:求各个课程的选课人数Map阶段:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Mapper extends Mapper {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
String data = value1.toString();
String[] names = data.split(",");//按照","对行数据进行分割
//names[]:课程名
context.write(new Text(names[]), new IntWritable(1));
}
}
}
Reducer阶段:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Reducer extends Reducer {
@Override
protected void reduce(Text key2, Iterable value2, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : value2) {
count++;
}
context.write(new Text(key2),new IntWritable(count));
}
}
Main方法同上
四、打包代码,上传到集群上运行将target下面的...jar包保存下来,放到虚拟机中
打开集群:
start-all.sh
使用命令运行我们上传上去的...jar包
hadoop jar Index1.0-SNAPSHOT.jar Index.Main
该命令解释:
Index1.0-SNAPSHOT.jar :为你所打包的名字,以及他存放的路径,我是将所打的包放在了我的Hadoop目录下面,这样比较方便
Index.Main : 是你所写的程序的主类
五、附带一张实现需求2过程详解欢迎分享,转载请注明来源:内存溢出
评论列表(0条)