第一题链接
2021年安徽省大数据与人工智能应用竞赛——MapReduce(数据预处理)题目解答
题目:请使用MapReduce统计 calls.txt中的每个手机号码的,呼叫时长和呼叫次数,被叫时长,被叫次数 ,并输出格式 为 手机号码,呼叫时长,呼叫次数,被叫时长,被叫次数;
calls.txt 通话记录
样例:18620192711,15733218050,1506628174,1506628265,650000,810000
字段分别为:
呼叫者手机号,接受者手机号,开始时间戳,结束时间戳,呼叫者地址省份编码,接受者地址省份编码
package Demo.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.BasicConfigurator; import java.io.IOException; import java.net.URI; import java.util.Date; public class subject2 { public static class demoMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(","); String send_phone = split[0]; String receive_phone = split[1]; Date time1 = new Date(Long.parseLong(split[2]) * 1000L); Date time2 = new Date(Long.parseLong(split[3]) * 1000L); long talk_time = (time2.getTime() - time1.getTime())/1000; context.write(new Text(send_phone),new Text("send,"+talk_time)); context.write(new Text(receive_phone),new Text("receive,"+talk_time)); } } public static class demoReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int send_time = 0; int receive_time = 0; int send_count = 0; int receive_count = 0; for (Text value : values) { String string = value.toString(); String[] split = string.split(","); if("send".equals(split[0])){ send_time += Integer.parseInt(split[1]); send_count++; }else{ receive_time += Integer.parseInt(split[1]); receive_count++; } } context.write(new Text(key),new Text(","+send_time+"秒,"+send_count+"次,"+receive_time+"秒,"+receive_count+"次")); } } public static void main(String[] args) throws Exception{ BasicConfigurator.configure(); // 配置mapreduce Job job = Job.getInstance(); job.setJobName("zhang"); job.setJarByClass(subject2.class); job.setMapperClass(demoMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(demoReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定路径 Path input1 = new Path("hdfs://master:9000/data/calls.txt"); FileInputFormat.addInputPath(job,input1); Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在 //获取文件系统对象fs,利用fs来对hdfs中的文件进行 *** 作 FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration()); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job,output); //启动 job.waitForCompletion(true); } }
结果为
这题主要的难点在于能不能想到,在Map端写两个 context.write()语句。即传入两个字段的值作为key,同时还要区分value的值
这题要传到reduce端里面的有两个字段,但是这两个字段的值其实是一致的,都是手机号码。因此即使写了两个 context.write() 语句,具体传入的时候也是以一个个的手机号码来送入reduce端的。相同的key值,即相同的手机号会被组合在一起,但是value值需要区分。因为同一个手机号有两个身份,一个身份是呼叫者,对应呼叫时长和呼叫次数。另一个身份是被呼叫者,对应被叫时长和被叫次数。
呼叫次数和和被叫次数,可以通过在reduce端遍历values的数量时用count++的方式来统计
但是map端要传给reduce端,通话时长。因此给value值,也就是通话时长,加上一个前缀。然后在reduce端用equals匹配这个前缀,这样就区分了同一个手机号的呼叫时长与被呼时长
另外注意呼叫时长与被呼时长应该累加,而不是直接输出,因为values里面会有多个通话时间,按照前缀分成两个分区,呼叫时长分区和被呼时长分区,每个分区里面依旧会有多个通话时间的值,此时应该累加得到最终结果。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)