1 首先登入hadoop 集群里面的一个节点, 创建一个java源文件, 偷懒起见, 基本盗用官方的word count (因为本文的目的是教会你如何快编写和运行一个MapReduce程序, 而不是如何写好一个功能齐全的MapReduce程序)
内容如下:
import javaioIOException;
import javautilStringTokenizer;
import orgapachehadoopconfConfiguration;
import orgapachehadoopfsPath;
import orgapachehadoopioIntWritable;
import orgapachehadoopioText;
import orgapachehadoopmapreduceJob;
import orgapachehadoopmapreduceMapper;
import orgapachehadoopmapreduceReducer;
import orgapachehadoopmapreducelibinputFileInputFormat;
import orgapachehadoopmapreduceliboutputFileOutputFormat;
import orgapachehadooputilGenericOptionsParser;
public class myword {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(valuetoString());
while (itrhasMoreTokens()) {
wordset(itrnextToken());
contextwrite(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += valget();
}
resultset(sum);
contextwrite(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)getRemainingArgs();
if (otherArgslength != 2) {
Systemerrprintln('Usage: wordcount <in> <out>');
Systemexit(2);
}
Job job = new Job(conf, 'word count');
jobsetJarByClass(mywordclass);
jobsetMapperClass(TokenizerMapperclass);
jobsetCombinerClass(IntSumReducerclass);
jobsetReducerClass(IntSumReducerclass);
jobsetOutputKeyClass(Textclass);
jobsetOutputValueClass(IntWritableclass);
FileInputFormataddInputPath(job, new Path(otherArgs[0]));
FileOutputFormatsetOutputPath(job, new Path(otherArgs[1]));
Systemexit(jobwaitForCompletion(true) 0 : 1);
}
}
与官方版本相比, 主要做了两处修改
1) 为了简单起见,去掉了开头的 package orgapachehadoopexamples;
2) 将类名从 WordCount 改为 myword, 以体现是我们自己的工作成果 :)
2 拿到hadoop 运行的class path, 主要为编译所用
运行命令
hadoop classpath
保存打出的结果,本文用的hadoop 版本是Pivotal 公司的Pivotal hadoop, 例子:
/etc/gphd/hadoop/conf:/usr/lib/gphd/hadoop/lib/:/usr/lib/gphd/hadoop///:/usr/lib/gphd/hadoop-hdfs//:/usr/lib/gphd/hadoop-hdfs/lib/:/usr/lib/gphd/hadoop-hdfs///:/usr/lib/gphd/hadoop-yarn/lib/:/usr/lib/gphd/hadoop-yarn///:/usr/lib/gphd/hadoop-mapreduce/lib/:/usr/lib/gphd/hadoop-mapreduce///::/etc/gphd/pxf/conf::/usr/lib/gphd/pxf/pxf-corejar:/usr/lib/gphd/pxf/pxf-apijar:/usr/lib/gphd/publicstage:/usr/lib/gphd/gfxd/lib/gemfirexdjar::/usr/lib/gphd/zookeeper/zookeeperjar:/usr/lib/gphd/hbase/lib/hbase-commonjar:/usr/lib/gphd/hbase/lib/hbase-protocoljar:/usr/lib/gphd/hbase/lib/hbase-clientjar:/usr/lib/gphd/hbase/lib/hbase-thriftjar:/usr/lib/gphd/hbase/lib/htrace-core-201jar:/etc/gphd/hbase/conf::/usr/lib/gphd/hive/lib/hive-servicejar:/usr/lib/gphd/hive/lib/libthrift-090jar:/usr/lib/gphd/hive/lib/hive-metastorejar:/usr/lib/gphd/hive/lib/libfb303-090jar:/usr/lib/gphd/hive/lib/hive-commonjar:/usr/lib/gphd/hive/lib/hive-execjar:/usr/lib/gphd/hive/lib/postgresql-jdbcjar:/etc/gphd/hive/conf::/usr/lib/gphd/sm-plugins/:
3 编译
运行命令
javac -classpath xxx /mywordjava
xxx部分就是上一步里面取到的class path
运行完此命令后, 当前目录下会生成一些class 文件, 例如:
mywordclass myword$IntSumReducerclass myword$TokenizerMapperclass
4 将class文件打包成jar文件
运行命令
jar -cvf mywordjar /class
至此, 目标jar 文件成功生成
5 准备一些文本文件, 上传到hdfs, 以做word count的input
例子:
随意创建一些文本文件, 保存到mapred_test 文件夹
运行命令
hadoop fs -put /mapred_test/
确保此文件夹成功上传到hdfs 当前用户根目录下
6 运行我们的程序
运行命令
hadoop jar /mywordjar myword mapred_test output
顺利的话, 此命令会正常进行, 一个MapReduce job 会开始工作, 输出的结果会保存在 hdfs 当前用户根目录下的output 文件夹里面。
至此大功告成!
如果还需要更多的功能, 我们可以修改前面的源文件以达到一个真正有用的MapReduce job。
但是原理大同小异, 练手的话, 基本够了。
一个抛砖引玉的简单例子, 欢迎板砖。
楼主你好,下面这篇博客介绍了在Hadoop上编写MapReduce程序的基本方法,包括MapReduce程序的构成,不同语言开发MapReduce的方法等。
因为涉及了很多代码,直接看原文会比较方便。
>
图2-1 MapReduce 体系结构
• 用户编写的MapReduce程序通过Client提交到JobTracker端
• 用户可通过Client提供的一些接口查看作业运行状态
• JobTracker负责资源监控和作业调度
• JobTracker 监控所有TaskTracker与Job的健康状况,一旦发现失败,就将相应的任务转移到其他节点
• JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器(TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源
• TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的 *** 作(如启动新任务、杀死任务等)
• TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot 分为Map slot和Reduce slot两种,分别供MapTask 和Reduce Task 使用,且两种slot不能互相使用。
Task分为Map Task 和Reduce Task 两种,均由TaskTracker启动,一个节点可以同时进行map任务和task任务。
split分片是一个逻辑概念,是MapReduce的处理单位,记录元数据信息(文件的起始位置、长度、数据所在节点等)。split一般设计为一个block大小,这是因为: 如果split太小,启动map任务会过多,占用过多资源;如果split太大,一是影响并行度,二是由于split跨越不同block,可能增加数据传输成本。
图3-3 MapReduce任务执行流程示意图
图3-4 Shuffle过程基本流程图
输入的数据执行map任务后,会先写入到本地缓存中(缓存默认大小是100M),缓存数据达到溢写比(默认是08)后,会溢写到本地磁盘中。写入到磁盘之前,会进行数据的分区、排序和可能的合并。由于每次溢写都会形成一个文件,最后需要对所有文件进行归并。归并后的文件数据都是已经分区和排好序的。
图3-6 Reduce端Shuffle过程流程图
public static void main(String[] args) { //在本地windows平台eclipse运行mapreduce程序 //创建远程用户,以指定的用户来运行程序 //把要运行的程序代码放到run方法体里 UserGroupInformation ugi = UserGroupInformationcreateRemoteUser("hadoop"); ugidoAs(new PrivilegedAction<Void>() { public Void run() { try{ //设置引用jar的分割符,linux一般用,和:,但windows一般是用;, //所以要设置一下本地的分割符 SystemsetProperty("pathseparator", ":"); Configuration conf = new Configuration(); //可以设置用到的第三方jar //confset("tmpjars", "/tmp/jars/hbase-0945jar,/tmp/jars/protobuf-java-240ajar,/tmp/jars/zookeeper-343jar"); confset("mapredjobtracker", "172168854:9001"); confset("fsdefaultname", "hdfs://172168854:9000"); confset("hadoopjobugi", "hadoop"); confset("hbasezookeeperquorum","172168855,172168856,172168857"); confset("hbasezookeeperpropertyclientPort", "2181"); Job job = new Job(conf); jobsetJobName("ReadHbaseToHdfsAction"); jobsetJarByClass(ReadHbaseToHdfsActionclass); jobsetNumReduceTasks(1); jobsetReducerClass(ReadHbaseToHdfsReduceclass); FileSystem fs=FileSystemget(conf); Path outPath=new Path("/tmp/2/1"); if(fsexists(outPath)){ fsdelete(outPath, true); } FileOutputFormatsetOutputPath(job, outPath); jobsetOutputKeyClass(Textclass); jobsetOutputValueClass(Textclass); Scan scan = new Scan(); TableMapReduceUtilinitTableMapperJob("misdn_catetory22", scan, ReadHbaseToHdfsMapperclass, Textclass, Textclass, job); jobwaitForCompletion(true); }catch(Exception e){ eprintStackTrace(); } return null; }}); }bigs
1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的不同,这里变为了,TextPair的格式为。3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用
以上就是关于如何在Windows下面运行hadoop的MapReduce程序全部的内容,包括:如何在Windows下面运行hadoop的MapReduce程序、如何在Hadoop上编写MapReduce程序、MapReduce等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)