hadoop2.6.5 Mapper类、Reducer类源码解析

hadoop2.6.5 Mapper类、Reducer类源码解析,第1张

hadoop2.6.5 Mapper类、Reducer类源码解析 Mapper类
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public class Mapper {
    public Mapper() {
    }

    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
    }

    protected void map(KEYIN key, VALUEIN value, Mapper.Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }

    protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
    }

    public void run(Mapper.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKeyValue()) {
                this.map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements MapContext {
        public Context() {
        }
    }
}

Mapper类主要有五种方法

protected void setup(Mapper.Context context)protected void map(KEYIN key, VALUEIN value, Mapper.Context context)protected void cleanup(Mapper.Context context)public void run(Mapper.Context context)public abstract class Context implements MapContext 1. setup()

protected void setup(Mapper.Context context) throws IOException, InterruptedException {
}

初始时调用,用来加载一些初始化的工作,像全局文件、建立数据库的链接等等,执行一次。

2. cleanup()
protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
}

结束时调用,收尾工作,如关闭文件、关闭数据库连接、执行map()后的键值分发等等,执行一次。

3. map()
protected void map(KEYIN key, VALUEIN value, Mapper.Context context) throws IOException, InterruptedException {
	context.write(key, value);
}

它继承Mapper类,是一个需要重点重写的方法,将map工作的处理逻辑都要放在这个方法中。

map方法有三个参数,Mapper有四个泛型变量KEYIN, VALUEIN, KEIOUT, VALUEOUT,它们分别代表输入的键值类型和输出的键值类型,context是上下文,用于保存Job的配置信息、状态和map处理结果,用于最后的write方法。

map中有重要的四个方法

context.nextKeyValue();
负责读取数据,但是方法的返回值却不是读取到的key-value,而是返回了一个标识有没有读取到数据的布尔值context.getCurrentKey();
负责获取context.nextKeyValue() 读取到的keycontext.getCurrentValue();
负责获取context.nextKeyValue() 读取到的valuecontext.write(key,value);
负责输出mapper阶段输出的数据 4. run()

public void run(Mapper.Context context) throws IOException, InterruptedException {
	this.setup(context);

	try {
		while(context.nextKeyValue()) {
			this.map(context.getCurrentKey(), context.getCurrentValue(), context);
		}
	} finally {
		this.cleanup(context);
    }

}

run方法中先执行setup函数,然后是用map处理数据,当处理完数据后用cleanup的收尾工作。值得注意的是,setup函数和cleanup函数由系统作为回调函数只做一次,并不像map函数那样执行多次。

实现的是设计模式中的模板方法模式。Mapper类中定义了一些方法,用户可以继承这个类重写方法以应对不同的需求,但是这些方法内部的执行顺序是确定好的。它封装了程序的算法,让用户能集中精力处理每一部分的具体逻辑。

run方法在程序执行中会默认调用,从他的执行流程来看也给常符合我们的预期,先进行初始化,如果还有输入的数据,那么调用map方法处理每一个键值对,最终执行结束方法。

模板方法模式:
模板模式中分基本方法和模板方法,上述的run可看做基本方法,需要调用其它方法实现整体逻辑,不会被修改,其它几个方法可看做模板方法。基本方法一般会用final修饰,保证其不会被子类修改,而模板方法使用protect修饰,表示需要在子类中实现。

此外,模板模式中还有一个叫钩子方法的概念,即给子类一个授权,允许子类通过重写钩子方法来颠覆基本逻辑的执行,这有时候是非常有用的,可以用来完善具体的逻辑。

5. Context
public abstract class Context implements MapContext {
	public Context() {
	}
}

Context类,它实现了MapContext接口,而MapContext继承于TaskInputOutputContext

Reducer
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

@Checkpointable
@Public
@Stable
public class Reducer {
    public Reducer() {
    }

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
    }

    protected void reduce(KEYIN key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
        Iterator i$ = values.iterator();

        while(i$.hasNext()) {
            VALUEIN value = i$.next();
            context.write(key, value);
        }

    }

    protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
    }

    public void run(Reducer.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements ReduceContext {
        public Context() {
        }
    }
}

Reducer中的类的方法与Mapper类相同,重点区别主要是在reduce方法的参数,它将map处理后的结果作为它的输入参数。回想一下wordcount的处理流程,经过map任务的处理后,变成了每个单词对应一个list,每个list是一系列的1,1,1,1,表明这个单词出现的记录。输入的键当然是单词,输入的值实际上是一个列表,java集合中对于列表提供了一个迭代器用于遍历,使用迭代器进行遍历在速度上会快很多,因此这里的参数是一个迭代器也不难理解。而reduce方法的默认实现也是通过迭代器去遍历每个输入的结果。

Hadoop的迭代器(Iterable values)中使用了对象重用,即迭代时value始终指向一个内存地址(引用值始终不变)改变的是引用指向的内存地址中的数据。

hadoop迭代器原理(通俗易懂)
初学hadoop程序之---------------Iterable迭代器

参考资料

hadoop之mapper类妙用

Hadoop学习笔记(六)实战wordcount

Java设计模式中的模板模式

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715400.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存