上周我们分析了org.apache.hadoop.mapreduce.Cluster中的的核心代码,本周将继续对mapreduce部分进行分析。在对Cluster类有初步了解的基础上,我们继续分析与Cluster相关的org.apache.hadoop.mapreduce.ClusterMetrics。
我们首先来看ClusterMetrics的全局变量与构造方法。
@InterfaceAudience.Public @InterfaceStability.Evolving public class ClusterMetrics implements Writable { private int runningMaps; private int runningReduces; private int occupiedMapSlots; private int occupiedReduceSlots; private int reservedMapSlots; private int reservedReduceSlots; private int totalMapSlots; private int totalReduceSlots; private int totalJobSubmissions; private int numTrackers; private int numBlacklistedTrackers; private int numGraylistedTrackers; private int numDecommissionedTrackers; public ClusterMetrics() { } public ClusterMetrics(int runningMaps, int runningReduces, int occupiedMapSlots, int occupiedReduceSlots, int reservedMapSlots, int reservedReduceSlots, int mapSlots, int reduceSlots, int totalJobSubmissions, int numTrackers, int numBlacklistedTrackers, int numDecommissionedNodes) { this(runningMaps, runningReduces, occupiedMapSlots, occupiedReduceSlots, reservedMapSlots, reservedReduceSlots, mapSlots, reduceSlots, totalJobSubmissions, numTrackers, numBlacklistedTrackers, 0, numDecommissionedNodes); } public ClusterMetrics(int runningMaps, int runningReduces, int occupiedMapSlots, int occupiedReduceSlots, int reservedMapSlots, int reservedReduceSlots, int mapSlots, int reduceSlots, int totalJobSubmissions, int numTrackers, int numBlacklistedTrackers, int numGraylistedTrackers, int numDecommissionedNodes) { this.runningMaps = runningMaps; this.runningReduces = runningReduces; this.occupiedMapSlots = occupiedMapSlots; this.occupiedReduceSlots = occupiedReduceSlots; this.reservedMapSlots = reservedMapSlots; this.reservedReduceSlots = reservedReduceSlots; this.totalMapSlots = mapSlots; this.totalReduceSlots = reduceSlots; this.totalJobSubmissions = totalJobSubmissions; this.numTrackers = numTrackers; this.numBlacklistedTrackers = numBlacklistedTrackers; this.numGraylistedTrackers = numGraylistedTrackers; this.numDecommissionedTrackers = numDecommissionedNodes; }
可以看出在ClusterMetrics中定义了许多的与cluster相关的变量,例如:
集群的大小。
列入黑名单和退役的跟踪器数量。
集群的槽位容量。
当前占用/保留的map和reduce槽的数量。
当前运行的 map 和 reduce 任务的数量。
作业提交的数量。
因此不难推测ClusterMetrics主要是用于记录cluster的相关信息,提供给用户。
在官网提供的api中也可以看到对ClusterMetrics类的描述。
Status information on the current state of the Map-Reduce cluster.,即用于记录Map-Reduce cluster的具体信息,在上次一的博客中我们已经学习了cluster的相关内容,这里便不展开赘述。
我们还可以看到Clients can query for the latest ClusterMetrics, via Cluster.getClusterStatus(),也就是通过调用getClusterStatus即可得到对应的ClusterMetrics信息。
关于ClusterMetrics就大致看到这里,接下来我们继续学习mapreduce中的其他类。
org.apache.hadoop.mapreduce.Counters源码分析Counters是mapreduce中极其重要的一个类。
计数器(Counter)是 MapReduce 应用程序报告其统计数据的设施。Mapper 和 Reducer 实现可以使用计数器报告统计数据。
MapReduce Counter为提供我们一个窗口:观察MapReduce job运行期的各种细节数据。MapReduce自带了许多默认Counter。
Counters 是全局计数器,由MapReduce框架或者Application定义。每一个Counter可以是任何枚举类型。特定计数器按枚举类型进行分组。
用户可以在map/reduce方法里通过context.getCounter(Enum> counterName)来获取定义好的计数器。然后通过counter.increment(long incr)的方式计数。
MapReduce框架本身就提供了很多内置的计数器,如File System Counters、Job Counters、Map-Reduce framework、File Input Format Counters、File Output Format Counters。可在运行MR任务后的控制台打印信息里看到,如下图。其中SecondarySort Counters是自定义的Counter,不是MR自带的。
我们首先附上整个Counters类的源码,以便进一步分析。
package org.apache.hadoop.mapreduce; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.counters.GenericCounter; import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup; import org.apache.hadoop.mapreduce.counters.CounterGroupbase; import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup; import org.apache.hadoop.mapreduce.counters.AbstractCounters; import org.apache.hadoop.mapreduce.counters.CounterGroupFactory; import org.apache.hadoop.mapreduce.counters.frameworkCounterGroup; @InterfaceAudience.Public @InterfaceStability.Stable public class Counters extends AbstractCounters{ // Mix framework group implementation into CounterGroup interface private static class frameworkGroupImpl > extends frameworkCounterGroup implements CounterGroup { frameworkGroupImpl(Class cls) { super(cls); } @Override protected frameworkCounter newCounter(T key) { return new frameworkCounter (key, getName()); } @Override public CounterGroupbase getUnderlyingGroup() { return this; } } // Mix generic group implementation into CounterGroup interface // and provide some mandatory group factory methods. private static class GenericGroup extends AbstractCounterGroup implements CounterGroup { GenericGroup(String name, String displayName, Limits limits) { super(name, displayName, limits); } @Override protected Counter newCounter(String name, String displayName, long value) { return new GenericCounter(name, displayName, value); } @Override protected Counter newCounter() { return new GenericCounter(); } @Override public CounterGroupbase getUnderlyingGroup() { return this; } } // Mix file system group implementation into the CounterGroup interface private static class FileSystemGroup extends FileSystemCounterGroup implements CounterGroup { @Override protected Counter newCounter(String scheme, FileSystemCounter key) { return new FSCounter(scheme, key); } @Override public CounterGroupbase getUnderlyingGroup() { return this; } } private static class GroupFactory extends CounterGroupFactory { @Override protected > frameworkGroupFactory newframeworkGroupFactory(final Class cls) { return new frameworkGroupFactory () { @Override public CounterGroup newGroup(String name) { return new frameworkGroupImpl (cls); // impl in this package } }; } @Override protected CounterGroup newGenericGroup(String name, String displayName, Limits limits) { return new GenericGroup(name, displayName, limits); } @Override protected CounterGroup newFileSystemGroup() { return new FileSystemGroup(); } } private static final GroupFactory groupFactory = new GroupFactory(); public Counters() { super(groupFactory); } public > Counters(AbstractCounters counters) { super(counters, groupFactory); } }
那么Counters类是用来做什么的呢?我们首先来看官方对Class Counters的定义:
可以看出,Counters包括了每个作业/任务计数器,由 Map-Reduce 框架或应用程序定义。每个Counter都可以是任何Enum类型。
Counters被聚集成CounterGroups,每个都包含来自特定Enum类的计数器。
我们看到,Counters继承自AbstractCounters类
打开AbstractCounters相关源码,它其实是一个抽象类,用于为 mapred 和 mapreduce 包中的 Counters 容器提供通用实现。
它提供的函数也不难理解,主要是为了方便计数器的使用,包括构造计数器、统计组内计数器总数、返回计数器名称等基本功能。
Counters提供了两个构造函数,分别是有参和无参构造:
有参构造器利用了我们上面提到的AbstractCounters提供的计数器构造方法,从另一个计数器对象构造新的计数器对象。别传入两个参数,一个是AbstractCounters,一个是旧的counters对象。AbstractCounters
我们上面提到了计数器组的概念,那么什么是计数器组呢?通过查询相关资料,我了解到
Counter有"组group"的概念,用于表示逻辑上相同范围的所有数值。MapReduce job提供的默认Counter分为三个组:
Map-Reduce frameword
Map input records,Map skipped records,Map input bytes,Map output records,Map output bytes,Combine input records,Combine output records,Reduce input records,Reduce input groups,Reduce output records,Reduce skipped groups,Reduce skipped records,Spilled records
File Systems
FileSystem bytes read,FileSystem bytes written
Job Counters
Launched map tasks,Launched reduce tasks,Failed map tasks,Failed reduce tasks,Data-local map tasks,Rack-local map tasks,Other local map tasks
了解了counters的概念与构造函数,那么我们希望进一步了解counters的使用方法。
比如,用户可能想快速实现文件行数,以及其中错误记录的统计。
为了使用这样的特性,用户代码创建一个叫作 Counter 的对象,并且在适当的时候,Map 和 Reduce 函数中增加 Counter 的值。
这些 Counter 的值,会定时从各个单独的 Worker 机器上传递给 Master(通过 Ping 的应答包传递)。
Master 把执行成功的 Map 或者 Reduce 任务的 Counter 值进行累计,并且当 MapReduce *** 作完成之后,返回给用户代码。
当前 Counter 值也会显示在 Master 的状态页面,这样用户可以看到计算现场的进度。
当累计 Counter 的值的时侯, Master 会检查是否有对同一个 Map 或者 Reduce 任务的相同累计,避免重复累计。
下面的代码就可以通过counters实现这样的一个目标。
package com.shockang.study.bigdata.mapreduce.counter; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MyMapperWithCounter extends Mapper{ public static enum FileRecorder { ErrorRecorder, TotalRecorder } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if ("error".equals(value.toString())) { context.getCounter(FileRecorder.ErrorRecorder).increment(1); } context.getCounter(FileRecorder.TotalRecorder).increment(1); } }
package com.shockang.study.bigdata.mapreduce.counter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobMain { public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.setInt("mapreduce.input.lineinputformat.linespermap", 5); Job job = new Job(configuration, "counter-job"); job.setInputFormatClass(NLineInputFormat.class); job.setJarByClass(JobMain.class); job.setMapperClass(MyMapperWithCounter.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); Path outputDir = new Path(args[1]); FileSystem fs = FileSystem.get(configuration); if (fs.exists(outputDir)) { fs.delete(outputDir, true); } FileOutputFormat.setOutputPath(job, outputDir); if (job.waitForCompletion(true)) { System.out.println("Error num:" + job.getCounters().findCounter(MyMapperWithCounter.FileRecorder.ErrorRecorder).getValue()); System.out.println("Total num:" + job.getCounters().findCounter(MyMapperWithCounter.FileRecorder.TotalRecorder).getValue()); } } }总结
本次我们首先分析了ClusterMetrics类,完善了对Cluster集群类的了解,然后开始了对mapreduce核心类counters计数器的分析,初步探讨了它的作用、构造函数,并通过编写简单的自定义Counters完成了对文件错误记录与全部记录的统计,加深了了解。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)