如何在Windows下面运行hadoop的MapReduce程序

如何在Windows下面运行hadoop的MapReduce程序,第1张

1 首先登入hadoop 集群里面的一个节点, 创建一个java源文件, 偷懒起见, 基本盗用官方的word count (因为本文的目的是教会你如何快编写和运行一个MapReduce程序, 而不是如何写好一个功能齐全的MapReduce程序)

内容如下:

import javaioIOException;

import javautilStringTokenizer;

import orgapachehadoopconfConfiguration;

import orgapachehadoopfsPath;

import orgapachehadoopioIntWritable;

import orgapachehadoopioText;

import orgapachehadoopmapreduceJob;

import orgapachehadoopmapreduceMapper;

import orgapachehadoopmapreduceReducer;

import orgapachehadoopmapreducelibinputFileInputFormat;

import orgapachehadoopmapreduceliboutputFileOutputFormat;

import orgapachehadooputilGenericOptionsParser;

public class myword {

public static class TokenizerMapper

extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(Object key, Text value, Context context

) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(valuetoString());

while (itrhasMoreTokens()) {

wordset(itrnextToken());

contextwrite(word, one);

}

}

}

public static class IntSumReducer

extends Reducer<Text,IntWritable,Text,IntWritable> {

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,

Context context

) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += valget();

}

resultset(sum);

contextwrite(key, result);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args)getRemainingArgs();

if (otherArgslength != 2) {

Systemerrprintln('Usage: wordcount <in> <out>');

Systemexit(2);

}

Job job = new Job(conf, 'word count');

jobsetJarByClass(mywordclass);

jobsetMapperClass(TokenizerMapperclass);

jobsetCombinerClass(IntSumReducerclass);

jobsetReducerClass(IntSumReducerclass);

jobsetOutputKeyClass(Textclass);

jobsetOutputValueClass(IntWritableclass);

FileInputFormataddInputPath(job, new Path(otherArgs[0]));

FileOutputFormatsetOutputPath(job, new Path(otherArgs[1]));

Systemexit(jobwaitForCompletion(true) 0 : 1);

}

}

与官方版本相比, 主要做了两处修改

1) 为了简单起见,去掉了开头的 package orgapachehadoopexamples;

2) 将类名从 WordCount 改为 myword, 以体现是我们自己的工作成果 :)

2 拿到hadoop 运行的class path, 主要为编译所用

运行命令

hadoop classpath

保存打出的结果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:

/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/:/usr/lib/gphd/hadoop///:/usr/lib/gphd/hadoop-hdfs//:/usr/lib/gphd/hadoop-hdfs/lib/:/usr/lib/gphd/hadoop-hdfs///:/usr/lib/gphd/hadoop-yarn/lib/:/usr/lib/gphd/hadoop-yarn///:/usr/lib/gphd/hadoop-mapreduce/lib/:/usr/lib/gphd/hadoop-mapreduce///::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-corejar:/usr/lib/gphd/pxf/pxf-apijar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexdjar::/usr/lib/gphd/zookeeper/zookeeperjar:/usr/lib/gphd/hbase/lib/hbase-commonjar:/usr/lib/gphd/hbase/lib/hbase-protocoljar:/usr/lib/gphd/hbase/lib/hbase-clientjar:/usr/lib/gphd/hbase/lib/hbase-thriftjar:/usr/lib/gphd/hbase/lib/htrace-core-201jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-servicejar:/usr/lib/gphd/hive/lib/libthrift-090jar:/usr/lib/gphd/hive/lib/hive-metastorejar:/usr/lib/gphd/hive/lib/libfb303-090jar:/usr/lib/gphd/hive/lib/hive-commonjar:/usr/lib/gphd/hive/lib/hive-execjar:/usr/lib/gphd/hive/lib/postgresql-jdbcjar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/:

3 编译

运行命令

javac -classpath xxx /mywordjava

xxx部分就是上一步里面取到的class path

运行完此命令后, 当前目录下会生成一些class 文件, 例如:

mywordclass myword$IntSumReducerclass myword$TokenizerMapperclass

4 将class文件打包成jar文件

运行命令

jar -cvf mywordjar /class

至此, 目标jar 文件成功生成

5 准备一些文本文件, 上传到hdfs, 以做word count的input

例子:

随意创建一些文本文件, 保存到mapred_test 文件夹

运行命令

hadoop fs -put /mapred_test/

确保此文件夹成功上传到hdfs 当前用户根目录下

6 运行我们的程序

运行命令

hadoop jar /mywordjar myword mapred_test output

顺利的话, 此命令会正常进行, 一个MapReduce job 会开始工作, 输出的结果会保存在 hdfs 当前用户根目录下的output 文件夹里面。

至此大功告成!

如果还需要更多的功能, 我们可以修改前面的源文件以达到一个真正有用的MapReduce job。

但是原理大同小异, 练手的话, 基本够了。

一个抛砖引玉的简单例子, 欢迎板砖。

楼主你好,下面这篇博客介绍了在Hadoop上编写MapReduce程序的基本方法,包括MapReduce程序的构成,不同语言开发MapReduce的方法等。

因为涉及了很多代码,直接看原文会比较方便。

>

图2-1 MapReduce 体系结构

• 用户编写的MapReduce程序通过Client提交到JobTracker端

• 用户可通过Client提供的一些接口查看作业运行状态

• JobTracker负责资源监控和作业调度

• JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点

• JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源

• TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的 *** 作(如启动新任务、杀死任务等)

• TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为Map slot和Reduce slot两种,分别供MapTask 和Reduce Task 使用,且两种slot不能互相使用。

Task分为Map Task 和Reduce Task 两种,均由TaskTracker启动,一个节点可以同时进行map任务和task任务。

split分片是一个逻辑概念,是MapReduce的处理单位,记录元数据信息(文件的起始位置、长度、数据所在节点等)。split一般设计为一个block大小,这是因为: 如果split太小,启动map任务会过多,占用过多资源;如果split太大,一是影响并行度,二是由于split跨越不同block,可能增加数据传输成本。

图3-3 MapReduce任务执行流程示意图

图3-4 Shuffle过程基本流程图

输入的数据执行map任务后,会先写入到本地缓存中(缓存默认大小是100M),缓存数据达到溢写比(默认是08)后,会溢写到本地磁盘中。写入到磁盘之前,会进行数据的分区、排序和可能的合并。由于每次溢写都会形成一个文件,最后需要对所有文件进行归并。归并后的文件数据都是已经分区和排好序的。

图3-6 Reduce端Shuffle过程流程图

public static void main(String[] args) { //在本地windows平台eclipse运行mapreduce程序 //创建远程用户,以指定的用户来运行程序 //把要运行的程序代码放到run方法体里 UserGroupInformation ugi = UserGroupInformationcreateRemoteUser("hadoop"); ugidoAs(new PrivilegedAction<Void>() { public Void run() { try{ //设置引用jar的分割符,linux一般用,和:,但windows一般是用;, //所以要设置一下本地的分割符 SystemsetProperty("pathseparator", ":"); Configuration conf = new Configuration(); //可以设置用到的第三方jar //confset("tmpjars", "/tmp/jars/hbase-0945jar,/tmp/jars/protobuf-java-240ajar,/tmp/jars/zookeeper-343jar"); confset("mapredjobtracker", "172168854:9001"); confset("fsdefaultname", "hdfs://172168854:9000"); confset("hadoopjobugi", "hadoop"); confset("hbasezookeeperquorum","172168855,172168856,172168857"); confset("hbasezookeeperpropertyclientPort", "2181"); Job job = new Job(conf); jobsetJobName("ReadHbaseToHdfsAction"); jobsetJarByClass(ReadHbaseToHdfsActionclass); jobsetNumReduceTasks(1); jobsetReducerClass(ReadHbaseToHdfsReduceclass); FileSystem fs=FileSystemget(conf); Path outPath=new Path("/tmp/2/1"); if(fsexists(outPath)){ fsdelete(outPath, true); } FileOutputFormatsetOutputPath(job, outPath); jobsetOutputKeyClass(Textclass); jobsetOutputValueClass(Textclass); Scan scan = new Scan(); TableMapReduceUtilinitTableMapperJob("misdn_catetory22", scan, ReadHbaseToHdfsMapperclass, Textclass, Textclass, job); jobwaitForCompletion(true); }catch(Exception e){ eprintStackTrace(); } return null; }}); }bigs

1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的不同,这里变为了,TextPair的格式为。3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用

以上就是关于如何在Windows下面运行hadoop的MapReduce程序全部的内容,包括:如何在Windows下面运行hadoop的MapReduce程序、如何在Hadoop上编写MapReduce程序、MapReduce等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/9704435.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-01
下一篇 2023-05-01

发表评论

登录后才能评论

评论列表(0条)

保存