Hadoop实现词频统计(按照词频降序排列以及相同词频的单词按照字母序排列)
分为两步词频统计和排序。第一个map reduce与过滤停用词的代码相同;第二个map reduce中的map将键值对内容交换,map到reduce的shufle中会自动进行key值升序排序,这时需要重写排序为降序。Reduce中需要将相同词频的单词按照字母进行升序排列,.sort()方法默认时升序排列的。
重点:1.两个map reduce时需要将加入控制器
2.重写sort类,在shuffle中的排序
public static class Sort extends IntWritable.Comparator{
public int compare(WritableComparable a, WritableComparable b){
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
与knn代码写降序排序的.sort()方法对比:
Collections.sort(sortvalue, new Comparator() {
@Override public int compare(String o1, String o2) { // 升序 //return o1.getAge()-o2.getAge(); double x = Double.parseDouble(o1.split(",")[1]); double y = Double.parseDouble(o2.split(",")[1]); return Double.compare(x, y); // 降序 // return Double.compare(y, x); } });
代码:
//利用分布式缓存方式设置全局变量
package wordcount;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException; //报错类
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer; //StringTokenizer类,用于将空白字符作为分割符的类
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;//Hadoop中用于读取配置信息的类
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path; //有关文件系统输入输出数据的类
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable; //封装定义了IntWritable类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; //封装定义了Text类
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.Job; //封装定义了Job类
import org.apache.hadoop.mapreduce.Mapper; //封装定义了Mapper类
import org.apache.hadoop.mapreduce.Reducer; //封装定义了Reducer类
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; //文件输入要用到的类
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //文件输出要用到的类
import org.apache.hadoop.util.GenericOptionsParser; //GenericOptionsParser类,用来解释常用hadoop命令,并根据需要为Configuration对象设置相应的值
public class yixiangji{
public static class Map extends Mapper
{
private Set stopwords;
private String localFiles;
public void setup(Context context) throws IOException,InterruptedException{
stopwords=new TreeSet ();
// 获取在main函数中设置的conf配置文件
Configuration conf = context.getConfiguration();
// 获取停词表所在的hdfs路径
localFiles = conf.getStrings(“stopwords”)[0];
FileSystem fs = FileSystem.get(URI.create(localFiles), conf);
FSDataInputStream hdfsInStream = fs.open(new Path(localFiles));
// 从hdfs中读取
InputStreamReader isr = new InputStreamReader(hdfsInStream, “utf-8”);
String line;
BufferedReader br = new BufferedReader(isr);
while ((line = br.readLine()) != null) {
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
// 得到停词表
stopwords.add(itr.nextToken());
}
}
}
public void map(LongWritable key, Text value, Context context //定义Map方法
) throws IOException, InterruptedException {
//这里说一下context类,它是Mapper的一个内部类,它用来与MapReduce系统进行通信,如把map的结果传给reduce处理。简单的说顶级接口用它在map或是reduce任务中跟踪task的状态,MapContext就是记录了map执行的上下文,在mapper类中,这个context可以存储一些job conf的信息,同时context作为了map和reduce执行中各个函数的一个桥梁,我们可以在map函数中处理这个信息
StringTokenizer itr = new StringTokenizer(value.toString());//实例化了一个以空白字符为分隔符的StringTokenizer类的对象itr String temp = new String(); final IntWritable one = new IntWritable(1); while (itr.hasMoreTokens()) {//如果判断还有下一个分隔符(空格) temp=itr.nextToken(); if (!stopwords.contains(temp)) { Text word = new Text(); word.set(temp); context.write(word, one); } }
}
}
public static class Reduce //自定义的IntSumReducer类,继承自前面导入的Reducer类
extends Reducer
private IntWritable result = new IntWritable(); //实例化了一个IntWritable类的result对象
int minFrequency;
public void setup(Context context)
{
Configuration jobconf = context.getConfiguration();
minFrequency = jobconf.getInt(“minFrequency”, -1);//将minFrequency的值默认设为-1
}
public void reduce(Text key, Iterable values,Context context//定义Reduce方法,这里迭代器(Iterator)是一种设计模式,它是一个对象,它可以遍历并选择序列(IntWritable)中的对象,而开发人员不需要了解该序列的底层结构。
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();//将该词的出现次数相加
}
if(sum>minFrequency)
{
this.result.set(sum);
context.write(key, this.result);
}
}
}
public static class Map2 extends Mapper
{
@Override
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
{
// 读取第一个mapreduce的结果,通过制表符将键和值分开
String[] data = value.toString().split("t");
// 将词频作为键,单词作为值
context.write(new IntWritable(Integer.parseInt(data[1])), new Text(data[0]));
}
} // 第二个reduce public static class Reduce2 extends Reducer{ IntWritable result = new IntWritable(); @Override public void reduce(IntWritable key,Iterable values,Context context) throws IOException,InterruptedException { // 相同词频的单词发送到一个reduce上,则只需要将相同词频的单词在第二个reduce中按字母序排列即可 List sort = new ArrayList ();
// Iterator
for(Text value : values){
sort.add(value.toString());
}
String[] strings = new String[sort.size()];
sort.toArray(strings);
// 对单词按照字母序排序
Arrays.sort(strings);
for (int i = 0;i
context.write(new Text(strings[i]), key);
}
}
}
// 对第二个mapreduce中map的key进行排序,实现降序排列
public static class Sort extends IntWritable.Comparator{
public int compare(WritableComparable a, WritableComparable b){
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static void main(String[] args ) throws Exception
{ Configuration conf1 = new Configuration(true); // 停词表所在的路径 conf1.setStrings("stopwords", "hdfs://localhost:9000/user/hadoop/shujuji/input/stopwords.txt"); // 设置词频阈值,小于该阈值的不输出 conf1.set("num", args[0]); // 输入文件输出文件的路径 String[] ars=new String[]{"hdfs://localhost:9000/user/hadoop/input/lala","hdfs://localhost:9000/user/hadoop/yixiangji/output1","hdfs://localhost:9000/user/hadoop/yixiangji/output2"}; String[] otherArgs=new GenericOptionsParser(conf1,ars).getRemainingArgs(); // job1,词频统计 Job job1= Job.getInstance(conf1,"yixiangji"); job1.setJarByClass(yixiangji.class); job1.setMapperClass(Map.class); job1.setReducerClass(Reduce.class); FileInputFormat.addInputPath(job1,new Path(otherArgs[0]));
// job.setOutputFormatClass(TextOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job1,new Path(otherArgs[1]));
// 将job1加入控制器
ControlledJob ctrlJob1 = new ControlledJob(conf1);
ctrlJob1.setJob(job1);
// job2,将词频按照降序排列,并且相同词频的单词按照字母序排列
Configuration conf2 = new Configuration(true);
Job job2= Job.getInstance(conf2,“sort”);
job2.setJarByClass(yixiangji.class);
job2.setMapperClass(Map2.class);
job2.setReducerClass(Reduce2.class);
FileInputFormat.addInputPath(job2,new Path(otherArgs[1]));
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
// 设置对map输出排序的自定义类
job2.setSortComparatorClass(Sort.class);
FileOutputFormat.setOutputPath(job2,new Path(otherArgs[2]));
// 将job2加入控制器
ControlledJob ctrlJob2 = new ControlledJob(conf2);
ctrlJob2.setJob(job2);
//设置作业之间的依赖关系,job2的输入以来job1的输出
ctrlJob2.addDependingJob(ctrlJob1);
//设置主控制器,控制job1和job2两个作业
JobControl jobCtrl = new JobControl(“lala”);
//添加到总的JobControl里,进行控制
jobCtrl.addJob(ctrlJob1);
jobCtrl.addJob(ctrlJob2);
Thread thread = new Thread(jobCtrl);
//在线程中启动,记住一定要有这个
thread.start();
while (true) {
if (jobCtrl.allFinished()) {
System.out.println(jobCtrl.getSuccessfulJobList());
jobCtrl.stop();
break;
}
}
}
}
结果截图:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)