package com.hongyaa.mr.mobile.member;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MemberLevelCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // (1)创建配置文件对象 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://localhost:9000"); // (2)新建一个 job 任务 Job job = Job.getInstance(conf); // (3)将 job 所用到的那些类(class)文件,打成jar包 (打成jar包在集群运行必须写) job.setJarByClass(MemberLevelCount.class); // (4)指定 mapper 类和 reducer 类 job.setMapperClass(MemberLevelCountMapper.class); job.setReducerClass(MemberLevelCountReducer.class); // (5)指定 ReduceTask 的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // (6)指定该 mapreduce 程序数据的输入和输出路径 Path inPath = new Path("/mobile/input"); Path outPath = new Path("/mobile/member_level"); // 获取 fs 对象 FileSystem fs = FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath, true); } FileInputFormat.setInputPaths(job, inPath); FileOutputFormat.setOutputPath(job, outPath); // (7)最后给YARN来运行,等着集群运行完成返回反馈信息,客户端退出 boolean waitForCompletion = job.waitForCompletion(true); System.exit(waitForCompletion ? 0 : 1); } public static class MemberLevelCountMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { // (1)获取一行文本的内容 String line = value.toString(); // (2)根据分隔符“t”进行切分 String[] mobile = line.split("t"); // 会员等级 String memberLevel = mobile[3]; // (3)将会员等级作为key,将次数1作为value,分发给Reduce端 context.write(new Text(memberLevel), new IntWritable(1)); } } public static class MemberLevelCountReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { // 做每个会员等级的结果汇总 int sum = 0; // (1)遍历values for (IntWritable v : values) { // 累积求和 sum += v.get(); } // (2)将会员等级作为key,总数作为value,输出最终结果 context.write(key, new IntWritable(sum)); } }
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)