案例数据
手机使用的流量数据,每次手机上网记录一条信息
需求:统计每个手机号的上传总流量、下载总流量、 总流量
分析核心点:
希望哪些数据相同的合并在一起,map端就以它为key输出即可
# 案例数据 id 手机号 ip地址 上传 下载 状态码 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 # 期望结果 13726230503 上传流量:4962 下载流量:49362 总数据流量: 54324 13826544101 上传流量:528 下载流量:0 总数据流量: 528 13926251106 上传流量:480 下载流量:0 总数据流量: 480 13926435656 上传流量:264 下载流量:3024 总数据流量: 3288
# Hadoop序列化 MapReduce执行过程中,被处理的key-value数据,需要在网络中传输,就需要对象转化为字节,字节转化为对象,这就是序列化和反序列化过程; key和value都要经过序列化传输。 1. Java序列化(序列化数据+对象描述信息) (1).序列化会包含java的继承关系,验证信息,验证信息。(重量级) (2).不便于在网络中传输。 2. Hadoop序列化(仅关注数据序列化) (1)空间紧凑 (2)传输快速,网络开销小 (3)对象重用(反序列化的时候,只创建一个) 结论: MapReduce中所有key-value都要支持序列化。
Hadoop内置可序列化类型
# 自定义序列化类型 自定义一个类实现WritableComparable 1. 可以被Hadoop序列化传输。 2. 可以支持排序。
public class XxxxWritable implements WritableComparable{ private String id; private String name; private String date; 有参数和无参构造方法 //反序列化要调用无参构造方法 get和set方法 toString方法 //输出到文件的时候,默认按照tostring形式输出 compareTo方法 //接口必须实现,在shuffle排序阶段要使用 write方法 //接口方法,map和reduce中间需要通过网络发送该对象数据 readFields方法 //接口方法,将网络输出来的对象数据读取到,并赋值给该对象
注意序列化和反序列化的属性 *** 作顺序要完全一致
//序列化示例代码 public class PhoneLogWritable implements WritableComparable数据清洗{ private Logger log = Logger.getLogger(PhoneLogWritable.class); private int upload; private int download; private int sum; public PhoneLogWritable(int upload, int download, int sum) { this.upload = upload; this.download = download; this.sum = sum; } public PhoneLogWritable() { log.info("----对象创建----"); } public int compareTo(PhoneLogWritable o) { log.info("--比较--"); return this.sum-o.sum; } public void write(DataOutput dataOutput) throws IOException { log.info("------write---"); dataOutput.writeInt(upload); dataOutput.writeInt(download); dataOutput.writeInt(sum); } public void readFields(DataInput dataInput) throws IOException { log.info("--read---"); upload = dataInput.readInt(); download = dataInput.readInt(); sum = dataInput.readInt(); } @Override public boolean equals(Object o) { System.out.println("--equals---"); if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; PhoneLogWritable that = (PhoneLogWritable) o; return upload == that.upload && download == that.download && sum == that.sum; } @Override public int hashCode() { System.out.println("--hashcode---"); return Objects.hash(upload, download, sum); } public int getUpload() { return upload; } public void setUpload(int upload) { this.upload = upload; } public int getDownload() { return download; } public void setDownload(int download) { this.download = download; } public int getSum() { return sum; } public void setSum(int sum) { this.sum = sum; } @Override public String toString() { return "PhoneLogWritable{" + "upload=" + upload + ", download=" + download + ", sum=" + sum + '}'; } }
# MapReduce中可以没有reduce 效果:只进行map阶段的执行,执行完毕后即输出到文件中。 # 代码实现 1. 取消MapReduce的job有关reduce的所有设置 2. 保留并设置如下 job.setNumReduceTasks(0);//取消reduce TextOutputFormat.setOutputPath(job,new Path("hdfs路径"));
# 原数据 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157995052 13826544109 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 1363157995052 null 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 240 0 200 1363157991076 13926435659 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 null null null
# 测试案例 删除其中手机号不符合要求,上传流量确实和下载流量缺失的数据,并仅保留手机号、上传流量、下载流量。(效果如下) 13726230503 2481 24681 13826544101 264 0 13926435656 132 1512 13926251106 240 0 13726230503 2481 24681 13826544101 264 0 13926435656 132 1512 13926251106 240 0计数器Counter
排序用来记录Hadoop执行过程的工具,可以理解为Hadoop的日志。
# 形式 group name 数量# 代码 context.getCounter("map阶段","map输出").increment(1L); # 效果如下
# 1. 简介 Shuffle期间,MapReduce会对map输出的数据,对key进行排序。 # 2. 时机: 1. map输出之后,shuffle过程中。 2. map输出之后的map端。 # 3. 规则: 1. key如果是Text类型按照字典顺序,进行字符串排序。 2. key如果是IntWritable、LongWritable则按照数字大小进行升序排序。
# 测试数据 用户id 观众人数 团团 300 小黑 200 哦吼 400 卢本伟 100 八戒 250 悟空 100 唐僧 100 # 需求:按照观众人数升序排序? 悟空 100 唐僧 100 卢本伟 100 小黑 200
# 默认排序
# 自定义排序 核心:排序所依据的字段作为map输出的key。
# 测试数据 用户id 观众人数 直播时长 团团 300 1000 小黑 200 2000 哦吼 400 7000 卢本伟 100 6000 八戒 250 5000 悟空 100 4000 唐僧 100 3000 # 需求:按照观众人数降序排序,如果观众人数相同,按照直播时长降序。
核心思路: 1. 排序所依据的字段,要作为key。 2. 实现Hadoop的序列化。 # 重写WritableComparable的compareTo方法
public class LivePlayLog implements WritableComparableMapTask局部计算并行度{ private int viewer;//观众人数 private long length;//直播时长 public LivePlayLog(){} @Override public int compareTo(LivePlayLog o) { if(this.viewer != o.viewer){ return -(this.viewer-o.viewer); }else if(this.length != o.length){ return -(int)(this.length-o.length); }else{ return 0; } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(viewer); dataOutput.writeLong(length); } @Override public void readFields(DataInput dataInput) throws IOException { viewer = dataInput.readInt(); length = dataInput.readLong(); } ......
# 0. 问题: MapTask并行度,是不是越大越好?
# 1. MapTask的并行度的产生 1. InputFormat根据配置信息,获得hdfs中文件的split大小和位置。 2. 每个split就会启动一个MapTask,进行处理。 # 2. 总结MapTask并行度决定机制 split的个数 # 3. 概念: block:hdfs文件的最小单元。 split:文件切分信息,虚拟的文件切片。 1. 默认:blocksize的大小就是split的大小128M,也就是一个MapTask执行的任务。 这样能够减少多个节点的MapTask之间的网络IO。 2. 切片 *** 作是针对1个文件,多个文件的切片不会合并。InputFormat数据输入
# 1. 作用 1. 读取HDFS中的文件,将读入的结果交给map进行处理。 2. 对文件进行split切片
# 2. TextInputFormat 接口:org.apache.hadoop.mapreduce.InputFormat 实现类: org.apache.hadoop.mapreduce.lib.input.TextInputFormat 特点:逐行读入,并形成key-value,key是偏移量,value是当前行的数据。 1. 指定一个输入文件 TextInputFormat.addInputPath(job,new Path("/hdfs文件")); 2. 指定一个输入目录 TextInputFormat.addInputPath(job,new Path("/hdfs目录")); 3. 指定多个输入文件 job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job,new Path("/hdfs/文件1.txt")); FileInputFormat.addInputPath(job,new Path("/hdfs/文件2.txt")); FileInputFormat.addInputPath(job,new Path("/hdfs/文件3.txt"));
# 3. CombineTextInputFormat 1. 将多个文件合并成1个split处理,设置切片大小为10M。 job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job,10485760);//10M,只要加起来不超过10M的数据,都会合并成1个split处理。 FileInputFormat.addInputPath(job,new Path("/hdfs/目录"));Combiner合并
ReduceTask汇总并行度
map端的局部reduce:对map输出的结果,进行一次reduce。
减少MapTask向ReduceTask通过网络传输的数据,减轻Reduce的工作压力。
默认不开启,开启方式
job.setCombinerClass(Reducer的类.class);
- 应用场景
仅适合统计支持迭代性的 *** 作:适合累加,统计个数、最大值、最小值等 *** 作,不适合平均值 *** 作
# 提高ReduceTask的数量,提高Reduce的并行度,提高效率 1. 增加ReduceTask的并行度(数量) ,可以启动多个程序处理map的汇总结果,可以提高效率。 2. 每个ReduceTask输出结果,都会单独的输出到1个文件。 # ReduceTask的数量是可以在程序中手动指定 默认数量为: 1个 Reduce 可以通过: job.setNumReduceTasks(数字);//0就是没有,数字是几就是几个
# 测试案例 2020年3月3日 www.baizhiedu.com /product/detail/10001.html iphoneSE 10001 30 2020年3月3日 www.baizhiedu.com /product/detail/10001.html iphoneSE 10001 60 2020年3月3日 www.baizhiedu.com /product/detail/10001.html iphoneSE 10001 100 2020年3月3日 www.baizhiedu.com /product/detail/10002.html xps15 10002 10 2020年3月3日 www.baizhiedu.com /product/detail/10003.html thinkpadx390 10003 200 2020年3月3日 www.baizhiedu.com /product/detail/10004.html iphoneX 10004 100 2020年3月3日 www.baizhiedu.com /product/detail/10003.html thinkpadx390 10003 100 2020年3月3日 www.baizhiedu.com /product/detail/10001.html iphoneSE 10001 120 2020年3月4日 www.baizhiedu.com /product/detail/10001.html iphoneSE 10001 200 2020年3月5日 www.baizhiedu.com /product/detail/10001.html iphoneSE 10001 25 2020年3月6日 www.baizhiedu.com /product/detail/10001.html iphoneSE 10001 20 # 期望结果 10001 7 10002 1 10003 2 10004 1
# 默认分区规则
# 分区流程(发生时机)
MapReduce分区的整个流程 1. 当MapTask任务中的mapper.map()输出结果后,会先根据map输出的key判断分区。(默认按照key.hashcode%reduceTasks) 不同的key-value进入不同的分区。(从此分道扬镳) 2. 对分区后数据各自做排序。(免去了分区之间数据的比较交换排序 *** 作) 3. 如果设置Combiner,会自动对各自分区做本地reduce汇总 *** 作。 4. 将结果输出mapTask机器本地。(分区存放:分区0、分区1) 5. ReduceTask阶段拷贝MapTask输出结果,按照分区拷贝。 a: ReduceTask0 从所有MapTask阶段拷贝所有的分区0的数据。(n多个分区0数据) b: 合并所有远程拷贝到的分区0的文件数据,排序(归并排序) c: 合并当前分区0中的key的value。(merge)[k-v1,v2,v3] d: 启动1个执行ReduceTask,输出到文件中。 6. ReduceTask阶段拷贝MapTask输出结果,按照分区拷贝。 a: ReduceTask1 从所有MapTask阶段拷贝所有的分区1的数据。(n多个分区1数据) b: 合并所有远程拷贝到的分区1的文件数据,排序(归并排序) c: 合并当前分区1中的key的value。(merge)[k-v1,v2,v3] d: 启动1个执行ReduceTask,输出到文件中。 5和6 reduce阶段各自处理各自分区的数据自定义Partition
# 自定义partition 将下面数据分区处理: 张三 语文 10 李四 数学 30 王五 语文 20 赵6 英语 40 张三 数据 50 李四 语文 10 张三 英语 70 李四 英语 80 王五 英语 45 王五 数学 10 赵6 数学 10 赵6 语文 100 期望结果: 按照科目分区,并按照成绩降序排序 赵6 语文 100 李四 英语 80 张三 英语 70 李四 语文 60 张三 数据 50 王五 英语 45 赵6 英语 40 李四 数学 30 王五 语文 20 王五 数学 10 赵6 数学 10 张三 语文 10 思路: 1:分区依据要作为key 2:排序字段也要作为key。 3:避免合并,key要唯一,不重复(所有key都不一样)
思路:通过修改Reduce的个数,设置分区的个数。
① 定义分区类
# 执行时机: Map输出key-value,后,会调用getPartition方法,决定当前key-value进入哪个分区。
② 使用分区类
job.setPartitionerClass(自定义Partitioner.class);
③ 设定reducer个数
job.setNumReduceTasks(数字);//reduceTask数量要和分区数量一样。
MapReduce工作原理 Spill溢写1. map输出的结果会存入环形缓冲区(从start下标开始写,写到80%,则启动溢写程序。环形缓冲区继续写入) 2. 当环形缓冲区中的数据,达到80%,则开始溢写。(每次写够80%,就开始溢写。) 3. 如果设置了分区,则对数据进行分区 4. 然后对分区后的数据各自做排序 5. 如果设置combiner,则执行map端的reduce合并处理 6. 将本次溢写的数据写入到本地的磁盘上。 7. 循环2~6,将多个文件溢写到磁盘上。 8. 将各个分区中,多次溢写的文件,再进行一次合并排序,然后将合并后的数据写入到对应的磁盘的分区上。全工作流程
# mapreduce工作流程 MapTask过程 1. 创建InputFormat,读取数据 ① 获得文件split ② 读取split范围内的数据,k-v。 2. Mapper.map()方法处理,InputFormat读取到k-v, 循环读取文件中k-v,每次读取,调用一次mapper.map(); while(读下一条){ mapper.map(k,v); } map执行结果context.write(ko,vo) 3. mapper输出结果 ① 获得ko-vo获得分区号。(Partitioner.getpartion()) ② 将ko-vo写出到环形缓冲区中。 4. 一旦环形缓冲区中数据达到溢写条件(80%,写完了) ① 读取环形缓冲区中的数据 ② 根据分区号,分区排序、(Combiner) ③ 将处理结果溢写到磁盘中文件中。 ④ 每次达到溢写条件(80%,写完了),①~③,在mapTask本地磁盘形成分区文件。 ⑤ 最后在本地完成一次分区内多个溢写文件并进行归并排序,产生1个文件(maptask处理结果)。 ReduceTask过程: 1. 根据分区号,启动ReduceTask,下载多个MapTask处理结果中的对应分区文件 MapTaskA(分区0)----ReduceTask0 MapTaskB(分区0)----ReduceTask0 2. 将当前分区中,来自不同MapTask的分区文件,归并排序。(为了reduce的merge *** 作效率) 产生1个大的本分区的文件,且内容key有序。 3. merge *** 作,将有序的结果,合并key的value。 4. 循环调用reducer的reduce方法,处理汇总的数据 while(xxx){ reduce.reduce(key,values); context.write(k,v) } 5. ReduceTask调用OutputFormat将结果写入到hdfs文件中。 Shuffle: # map阶段 1. mapper输出结果 ① 获得ko-vo获得分区号。(Partitioner.getpartion()) ② 将ko-vo写出到环形缓冲区中。 2. 一旦环形缓冲区中数据达到溢写条件(80%,写完了) ① 读取环形缓冲区中的数据 ② 根据分区号,分区排序、(Combiner) ③ 将处理结果溢写到磁盘中文件中。 ④ 每次达到溢写条件(80%,写完了),①~③,在mapTask本地磁盘形成分区文件。 ⑤ 最后在本地完成一次分区内多个溢写文件 归并排序,产生1个文件(maptask处理结果)。 # reduce阶段 3. 根据分区号,启动ReduceTask,下载多个MapTask处理结果中的对应分区文件 MapTaskA(分区0)----ReduceTask0 MapTaskB(分区0)----ReduceTask0 4. 将当前分区中,来自不同MapTask的分区文件,归并排序。(为了reduce的merge *** 作效率) 产生1个大的本分区的文件,且内容key有序。 5. merge *** 作,将有序的结果,合并key的value。源码分析
# 1. MapTask
public class MapTask{ // 1. 启动一个新的Mapper程序 privatevoid runNewMapper(){ TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //创建Mapper Mapper mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job); //创建InputFormat InputFormat inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); org.apache.hadoop.mapreduce.InputSplit split = null; //创建split split = (org.apache.hadoop.mapreduce.InputSplit)this.getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader input = new MapTask.NewTrackingRecordReader(split, inputFormat, reporter, taskContext); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); RecordWriter output = null; if (job.getNumReduceTasks() == 0) { output = new MapTask.NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new MapTask.NewOutputCollector(taskContext, job, umbilical, reporter); } MapContext mapContext = new MapContextImpl(job, this.getTaskID(), input, (RecordWriter)output, this.committer, reporter, split); org.apache.hadoop.mapreduce.Mapper.Context mapperContext = (new WrappedMapper()).getMapContext(mapContext); try { input.initialize(split, mapperContext); //执行Mapper处理数据 mapper.run(mapperContext); this.mapPhase.complete(); this.setPhase(Phase.SORT); this.statusUpdate(umbilical); input.close(); input = null; ((RecordWriter)output).close(mapperContext); output = null; } finally { this.closeQuietly((org.apache.hadoop.mapreduce.RecordReader)input); this.closeQuietly((RecordWriter)output, mapperContext); } } //2. 收集key-value进入环形缓冲区 public void close(TaskAttemptContext context) throws IOException, InterruptedException { //3. sortAndSpill() 溢写过程 } }
# mapper对象的代码机制 1. 每启动MapTask,执行一次runNewMapper方法,创建一个mapper类。 2. 每读取key-value,调用mapper.map();
# 2. InputFormat
public abstract class InputFormat//切片方法 public abstract List getSplits()... //根据split信息返回一个RecordReader(用来读取数据) public abstract RecordReader createRecordReader(InputSplit split... public abstract class FileInputFormat extends InputFormat { public List getSplits(JobContext job){ long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//获得最小值 long maxSize = getMaxSplitSize(job);//获得最大值,Long的最大值 ... long splitSize = computeSplitSize(blockSize, minSize, maxSize);//获取split的切片大小。对应配置文件(split.minsize) ... //当文件剩余大小大于split大小的1.1倍时,进行分片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //获取block块的索引位置 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //分片 splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); //源文件减去已经分片大小 bytesRemaining -= splitSize; }
# 3. ReduceTask
privatevoid runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException { rIter = new RawKeyValueIterator() { public void close() throws IOException { rIter.close(); } public DataInputBuffer getKey() throws IOException { return rIter.getKey(); } public Progress getProgress() { return rIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rIter.getValue(); } public boolean next() throws IOException { boolean ret = rIter.next(); reporter.setProgress(rIter.getProgress().getProgress()); return ret; } }; TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //创建reducer org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job); org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext); job.setBoolean("mapred.skip.on", this.isSkipping()); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass); try { //调用reducer的run方法 reducer.run(reducerContext); } finally { trackedRW.close(reducerContext); } }
# 4. OutputFormat
public class TextOutputFormat{ // 将key-value写出到文件中。 public synchronized void write(K key, V value) }Yarn分布式集群搭建
# 0:保证HDFS分布式集群搭建环境确保正确 1. jps看到如下结果 NameNode DataNode SecondaryNameNode 2. 查看hadoop10:50070. 在datanode标签页看到4个正常的datanode节点信息。 # 关闭所有NameNode节点和DataNode节点 stop-dfs.sh
# 1:初始化yarn相关配置 1. mapred-site.xml2. yarn-site.xml mapreduce.framework.name yarn yarn.nodemanager.aux-services mapreduce_shuffle # 2:同步该配置到其他节点服务器上。 scp -r 本地Hadoop配置文件 root@远程linuxip:/远程linux的Hadoop配置文件路径 yarn.resourcemanager.hostname Hadoop
# 3:启动yarn集群 1. 启动HDFS集群 start-dfs.sh 2. 启动yarn集群 start-yarn.sh
# 4:验证 1. jps
[root@hadoop10 ~]# jps 6160 DataNode 6513 ResourceManager 6614 NodeManager 6056 NameNode 6349 SecondaryNameNode 6831 Jps
2. 访问yarn的资源调度器web网页。 http://ip:8088
# 关闭集群 1. 先关闭yarn stop-yarn.sh 2. 在关闭hdfs stop-hdfs.sh案例-TOPN
需求:获得主播观众人数前3名的信息。 # 原始数据 团团 2345 1000 小黑 67123 2000 哦吼 3456 7000 卢本伟 912345 6000 八戒 1234 5000 悟空 456 4000 唐僧 123345 3000 # 期望结果 卢本伟 912345 6000 唐僧 123345 3000 小黑 67123 2000
# 方案1 1.按照观众人数,降序排序。 2.reduce端输出前3个。
# 方案2 1.每个MapTask端先各自计算Top3,并只输出Top3。 2.Reduce端只需要统计多个MapTask的Top3中的Top3。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)