- MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
- MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个Hadoop集群上。
- 易于编程:它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。
- 易于扩展:当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
- 高容错性:hadoop集群中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
- 离线处理:可以实现上千台服务器集群并发工作,提供数据处理能力
- 不擅长实时计算:MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
- 不擅长流式计算:流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。
- 不擅长DAG(有向无环图)计算:多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出,中间结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
- 分布式的运算程序往往需要分成至少2个阶段。
- 第一个阶段的MapTask并发实例,完全并行运行,互不相干。
- 第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
- MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责Map阶段的整个数据处理流程。
- ReduceTask:负责Reduce阶段的整个数据处理流程。
- 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
- 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
为什么不用Java的序列化Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
Hadoop序列化特点:
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)可扩展:随着通信协议的升级而可升级
(4)互 *** 作:支持多语言的交互
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用”t”分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。
Mapper阶段:MapTask进程对每一个
Raducer阶段:RaduceTask进程对每一组相同K的
Driver阶段:相当于YARN集群的客户端,提交程序及其运行参数的JOB对象到YARN集群
MapReduce中context的作用
Java8-反射与注解
jvm类加载器,类加载机制详解,看这一篇就够了
Java 8 并发教程:线程和执行器
Java中Runnable和Thread的区别
Synchronized原理和jdk1.8后的优化
Mapper(处理过后的数据以字典序落盘(字符串))
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ @Override protected void setup(Context context) throws IOException, InterruptedException { System.out.println("setup方法执行了"); } // 复用两个变量 private Text outk = new Text(); private IntWritable outv = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取当前输入的数据(复用java实现的String处理方法) String line = value.toString(); // 切割数据 String[] datas = line.split(" "); // 遍历集合 封装 输出数据的key和value for (String data : datas) { outk.set(data); context.write(outk, outv); } System.out.println("map方法执行了"); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { System.out.println("cleanup执行了"); } }
Reduce
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ // 复用两个变量 private Text outk = new Text(); private IntWritable outv = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int total = 0; // 遍历values for (IntWritable value : values) { // 对value累加进行累加 输出结果 total+=value.get(); } // 封装key和value outk.set(key); outv.set(total); context.write(outk, outv); } }
Driver
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 声明配置对象 Configuration conf = new Configuration(); // 声明Job对象 Job job = Job.getInstance(conf); // 指定当前Job的驱动类(通过反射获得对象的实例) job.setJarByClass(WordCountDriver.class); // 指定当前Job的 Mapper和Reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 指定Map段输出数据的key的类型和输出数据value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 指定最终输出结果的key的类型和value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定输入数据的目录 和 输出数据的目录 // FileInputFormat.setInputPaths(job, new Path("F:\in\wcinput")); // FileOutputFormat.setOutputPath(job, new Path("F:\out\wcoutput2")); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交Job job.waitForCompletion(true); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)