MapReduce

MapReduce,第1张

MapReduce

package com.hongyaa.mr.mobile.member;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MemberLevelCount {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
	// (1)创建配置文件对象
	Configuration conf = new Configuration();
	conf.set("fs.defaultFS", "hdfs://localhost:9000");

	// (2)新建一个 job 任务
	Job job = Job.getInstance(conf);

	// (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写)
	job.setJarByClass(MemberLevelCount.class);

	// (4)指定 mapper 类和 reducer 类
	job.setMapperClass(MemberLevelCountMapper.class);
	job.setReducerClass(MemberLevelCountReducer.class);

	// (5)指定 ReduceTask 的输出key-value类型
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);

	// (6)指定该 mapreduce 程序数据的输入和输出路径
	Path inPath = new Path("/mobile/input");
	Path outPath = new Path("/mobile/member_level");

	// 获取 fs 对象
	FileSystem fs = FileSystem.get(conf);
	if (fs.exists(outPath)) {
		fs.delete(outPath, true);
	}
	FileInputFormat.setInputPaths(job, inPath);
	FileOutputFormat.setOutputPath(job, outPath);

	// (7)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出
	boolean waitForCompletion = job.waitForCompletion(true);
	System.exit(waitForCompletion ? 0 : 1);
}


public static class MemberLevelCountMapper extends Mapper {
	
	@Override
	protected void map(LongWritable key, Text value, Mapper.Context context)
			throws IOException, InterruptedException {
		// (1)获取一行文本的内容
		String line = value.toString();
		// (2)根据分隔符“t”进行切分
		String[] mobile = line.split("t");
		// 会员等级
		String memberLevel = mobile[3];
		// (3)将会员等级作为key,将次数1作为value,分发给Reduce端
		context.write(new Text(memberLevel), new IntWritable(1));
	}
}


public static class MemberLevelCountReducer extends Reducer {
	@Override
	protected void reduce(Text key, Iterable values,
			Reducer.Context context)
			throws IOException, InterruptedException {
		// 做每个会员等级的结果汇总
		int sum = 0;
		// (1)遍历values
		for (IntWritable v : values) {
			// 累积求和
			sum += v.get();
		}
		// (2)将会员等级作为key,总数作为value,输出最终结果
		context.write(key, new IntWritable(sum));
	}
}

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4667074.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存