PageRank 算法在Hadoop和Spark上的实现

PageRank 算法在Hadoop和Spark上的实现,第1张

背景和目的

        PageRank 网页排名的算法,曾是 Google 关键核心技术。用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。通过对 PageRank 的编程在Hadoop 和 Spark 上的实现,熟练掌握 MapReduce 程序与 Spark 程序在集群上的提交与执行过程,加深对 MapReduce 与 Spark 的理解。

要求 1.在本地编写程序和调试

        在本地 eclipse 上使用 MapReduce 、 Spark 实现 PageRank 算法,Spark程序可采用 Java、Python、Scala 等语言进行编程,编程工具、语言自由选定。

1.1 数据格式

        每一行内容的格式:网页+‘\t’+该网页链接到的网页的集合(相互之间用英文逗号分开)。

        page        page1,page2,page3...

        图中是截取了数据集的一部分,看起来似乎一段一段,但实际上一行的数据,因为一行显示不出来所以使用多行显示,图中不同颜色代表了不同行。

1.2 输出格式

        输出结果为"("+"page"+","+"PR"+")"

(page,PR)

        同时我们可以发现,输出是按照PR的降序排列,同时PR保存到小数点后10位,不足补0.

1.3 参数要求

        要求能够利用 PageRank 算法的思想计算出每个网页的 PR 值,迭代 10 次.

,(初始)

2.在集群上提交作业并执行

        将编写好的MapReduce程序和Spark程序分别打包成jar包提交到集群上执行。集群上有HDFS文件系统(给定了输入数据集),配置好的Spark环境(可以直接通过spark指令运行jar包)

PageRank基础知识

        PageRank将互联网上的网页之间的链接关系看作一个有向图,对于任何网页的PR(u)值可以表示为:

          表示所有链接到网页的网页集合,网页是集合里的一个网页,是网页外链接的网页数。可以理解为一个网页总PR值平均分给每一个链接外面网页的PR值。事实上从定义上来看,一个网页的PR值就是其他网页的PR值平均分流后传入到的此网页的总PR值。

          PR值的计算就是经过多次迭代不断更新PR值直到满足一定的收敛条件。

         上面的简单模型并不能解决实际问题。主要是存在排名泄露和排名下沉问题。

        排名泄露指存在网页出度为0,那么网页总的PR值在迭代过程中,指向这一个网页的有向边会不断流失PR值(该网页X的PR值(不妨设为PR=a)在迭代中用不上,因为没有出去的有向边传递这个a;同时该网页迭代之后的PR值是由其他指向它本身的网页PR值求和而得到的,而这一个求和是所有网页的PR总和扣除掉a后,计算流向网页X的边PR流量和。这样就会导致每一次迭代总会有部分的a值流失,即不会在之后迭代中用到)。最终整个图的PR值都是0;

        排名下沉是指存在网页入度为0,同时存在出度大于0。如果这个网页出度不为0,那么其本身的PR值就会不断流失到其他网页,而没有其他网页的PR值能流向自己,相当于迭代过程中该网页一直在付出却没有回报。导致自己PR值为0(我认为这并不是一种问题,因为如果有一个网页没有其他网页指向自己,那这个网页一定程度上就是不太重要的,PR值自然是低的)

         为了解决以上问题,引入随机浏览模型的PageRank公式为:

        从模型上来看就是增加了的部分。此时一个网页的PR值不仅仅取决于指向自己的网页这部分网页,这部分所占权值为d,还有另外一部分来自于任意一个网页,可以认为有概率是来自于其他网页随即浏览的跳转。值为,描述的是从正在浏览的某个网页(总网页数为N)随机跳转到此网页这一事实,因此PR值为。

        在实际处理中用的是以下公式,已经被证明两个公式得出来的PageRank值在相对顺序上没有区别。

        本次实验使用中可以不用刻意理解转移矩阵的概念,但是理解之后能更加深刻的理解PR值计算传递过程。上面的公式是计算一个节点的PR值,更符合Reduce过程的计算;而对于Map过程,应转化为:对于每一个网页它所能贡献给其他网页的PR值,也就是。

PageRank的基本思路        

        结合一开始提到的输入数据的格式,每一行格式为:

<,{}>

        指的是所指向的。因此我们思路就是对于的每一个外链接的网页,都输出分摊到的PR值的给,因此Map阶段输出的是

<>

        到了Reduce阶段,对于同一个key为的都输入到一个Reduce节点,输入格式为:

<,{…}>

        因此将求和,然后乘上d再加上(1-d),就完成了第一次迭代。

        事实上我们可以发现,Reduce输出的是:

<>

        而我们要求的是至少10次迭代,也就是这一次Reduce还要作为下一次迭代Map的输入。

        那么就会出现输出的格式和输入的格式不统一,各自无法满足需求。因为Map的输入是一个图结构,即<,{}>,而Map阶段需要用到迭代过程中的PR值(第一次因为初始PR为1我们可以直接指定,但以后就没法指定了),并且Reduce只给出了PR,需要输出图结构,否则下一次Map阶段没法进行。

        将两者兼顾,Map阶段的输入修改为:

<,        {}>

        也就是value变为了:

 '\t' {

        这样Map阶段需要的图结构和PR值都有。而这也意味着Reduce必须输出这种格式,而Reduce输入是:

<,{…}>

       也就是reduce根本不能了解到图的边信息。那么图结构哪里来?只能是Map输出传递过去。

        因此Map输出除了网页的外连接的信息<, >,还应该传递它本身的图关系:

<,{}>

        这样,在Reduce输入格式应该为

<,{ {},…}>

        这样Reduce就可以根据后面的算出PR值,同时将图结构也传递输出给下一次迭代的Map使用。Reduce输出格式为:

<,        {}>

在Eclipse上使用MapReduce实现

        我通过三个部分来实现MapReduce的PageRank。第一部分用于处理原始输入得到所需输入格式,第二部分用于迭代计算PageRank,第三部分用于将最后一次迭代排序并输出成所需的格式。

①规范化输入 (FormatInput.java)

        由前面描述可以知道,Map输入需要的格式是:

<,        {}>

        而数据集输入为:

<,{}> 

        因此这一部分(或者说job)所要做的就只需要在value里面加上一个初始的PR值1.

	public static class Map extends Mapper {
		// map输入:<行偏移,page page1,page2,page3……>
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] tuple = value.toString().split("\t");
			// tuple[0]就是page,tuple[1]是page指向的网页集合,以","为分隔符的list
			// 默认所有网络的PR初值为1
			if (tuple.length > 1)// 有外联接,输出
				context.write(new Text(tuple[0]), new Text("1.0\t" + tuple[1]));
			else// 无外链接,输出
				context.write(new Text(tuple[0]), new Text("1.0\t"));// 输出的中间结果为(一行数据,null)
		}
	}

         要注意的是读取出来的key不是page,而是行偏移。

        value是整个<, {}>。对其通过‘\t’分割得到真正的key和value,然后value前面加上1.

        而Reduce什么也不需要干,因此可以不写Reduce,直接将Map的输出作为最后的输出即可。

②迭代计算PageRank (PageRank.java)  Map阶段:
	public static class Map extends Mapper {
		// map的输入是<行偏移,page PR page1,page2,page3……>
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] tuple = value.toString().split("\t");// 将value以‘\t’分割
			String pageName = tuple[0];// tuple[0]是page
			double PR = Double.parseDouble(tuple[1]);// tuple[1]是PR
			if (tuple.length > 2)// 存在外链接
			{
				String[] linkPages = tuple[2].split(",");
				for (String linkPage : linkPages) {// 取出每一个指向的网页
					String PR_to = String.valueOf(PR / linkPages.length);// 求出指向的网页所分配到的PR值
					// 设所指向的网页为u
					context.write(new Text(linkPage), new Text(PR_to));// 输出
				}
				// 传递原本的网页图结构,如果不传递的话reduce输出失去了图结构信息,无法作为下一次迭代map的输入
				context.write(new Text(pageName), new Text("|" + tuple[2]));
			}
		}
	}

        整体过程就是将value按‘\t’分割得到tuple数组。要注意此时分割后的tuple[0]是page,tuple[1]是PR,tuple[2]是pageList。

        tuple[0]直接作为key输出即可。

        tuple[1]结合pageList的长度得到应该分给每一个流出的网页值pr,然后对于pageList里的每一个page都输出。最后将所有出边tuple[2]传递到Reduce,这里用一个‘|’作为分割,方便后续判断是哪一种输入类型,即value是pr还是pageList。

 Reduce阶段:
	public static class Reduce extends Reducer {
		// 设置d为0.85
		private static double d = 0.85;

		// 输入为
		public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
			String links = "";
			double pageRank = 0;// 用于累加PR
			for (Text value : values) {
				String tmp = value.toString();
				if (tmp.startsWith("|")) {// 图结构
					links = "\t" + tmp.substring(tmp.indexOf("|") + 1);
				} else {// PR
					pageRank += Double.parseDouble(tmp);
				}
			}
			// 计算最后的PR
			pageRank = (double) (1 - d) + d * pageRank;
			context.write(new Text(key), new Text(String.valueOf(pageRank) + links));
			// 输出,与map读取的输入一致,以便进行迭代
		}
	}

        此时输入是:

<,{ {  '|'  },…}>

        对输入的value首字符进行判断,如果是'|'表示这是一个图结构,那么就保存下来便于待会Reduce输出;如果不是'|'那么说明是pr值,进行累加。

        最后将累加后的pr按照公式计算得到随机浏览模型的PR,然后连同图结构一起输出。最后输出就是:

<,PR {}>

        与Map输入格式保持一致。

Main阶段

        上面的Map加上Reduce也只是一次迭代(或者说Job),需要进行多次迭代,因此需要创建多个Job来执行多次迭代。

        以下是迭代过程:

// 迭代10次,输出的目录作为下一次迭代输入的目录
		for (int i = 0; i < 10; i++) {
			Job job = new Job(conf, "PageRank");// 新建Job
			job.setJarByClass(PageRank.class);// 设置执行任务的jar
			job.setMapperClass(Map.class);// 设置Maper类
			job.setReducerClass(Reduce.class);// 设置Reduce类
			job.setOutputKeyClass(Text.class);// 设置job输出的key
			job.setOutputValueClass(Text.class);// job输出的value

			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));// 输入文件的路径

			Path path = new Path(otherArgs[1] + i);
			// 加载配置文件
			FileSystem fileSystem = path.getFileSystem(conf);
			// 输出目录若存在则删除
			if (fileSystem.exists(new Path(otherArgs[1] + i))) {
				fileSystem.delete(new Path(otherArgs[1] + i), true);
			}
			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1] + i));// 输出文件的路径

//			System.out.println(otherArgs[0]);
//			System.out.println(otherArgs[1] + i);
			// 输入目录变为此次输出的目录
			otherArgs[0] = otherArgs[1] + i + "/part-r-00000";
//			System.out.println(otherArgs[0]);
			job.waitForCompletion(true);// 提交任务等待任务完成
		}

        每一次循环都新建一个Job,设置这个Job的各种参数,大部分参数与正常Job设置一样。

        重点在于输入和输出的文件路径。第一次输入来自于第一阶段规范化输入后的输出目录,后面几次输入都不断更换为当次的输出目录的part-r-00000文件。

        有10次迭代,就会生成10个迭代过程的中间结果目录,它们又是下一次迭代的输入目录。

③输出结果按PR值排序,并且转化为所需格式(SortPageRank.java)

        这一阶段就是将最后一轮迭代的输出转化为标准答案的格式。因为要通过Sort阶段实现排序,因此key必须是PR。而我们输出的格式为:

<,        {}>

        因此,将PR提出来变为key,page作为value(图结构不需要用了,可以抛弃掉了)。所以Map过程如下:

	public static class Map extends Mapper {
		// PageRank输出的是
		// 需要用到的是前面的page和PR
		// 因为要按PR降序排序,所以将PR作为key,page作为value
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] tuple = value.toString().split("\t");
			context.write(new DoubleWritable(Double.parseDouble(tuple[1])), new Text(tuple[0]));
		}
	}

        此时排序后得到的预期结果是<,{ }>(这里表示有多个网页经过10次迭代之后PR值相同,但是实验中发现不存在这种情况,保险起见仍然对每个进行处理)

        我们期望的标准格式是

(page,PR)

        因此这里又要将key和value的值换回来,同时加上两个小括号和逗号输出。

        这里有两个小坑。一个是输出要保存至10位小数,这里使用过很多方法,但是最后输出的格式都和标准的不一样,要么大部分一样个别不一样,要么个别输出直接相同,原因很大程度是设置的是FloatWritable而不是DoubleWritable,或者格式化输出前值就已经被四舍五入过造成数据不准确。另一个小坑是输出的value为“”(也就是空)并不是输出结果就是只有一个key,此时会在输出key之后多输出一个'\t'分隔符,而这个分隔符人眼是看不出来的,只有使用diff指令才会判断出错。输出完全为空的方法是NullWritable。

	// 将输入key只保留10位小数
	// 同时将输出的格式改成标准result的格式
	// 这里有个坑就是value不能设为“”,虽然也是空,但是输出会在key和“”多一个‘\t’
	// 设value为NullWritable就能只输出所需要的key
	public static class Reduce extends Reducer {
		public void reduce(DoubleWritable key, Iterable values, Context context)
				throws IOException, InterruptedException {
			for (Text value : values)
				context.write(new Text("(" + value.toString() + "," + String.format("%.10f", key.get()) + ")"),
						NullWritable.get());
		}
	}

        同时要注意的是,给定的标准输出是按照降序排序,而Map默认是升序排序,因此需要自定义一个排序函数。基本就是继承原有的类将输出变成相反数即可。

	// 自定义的降序排序,用于同一个reduce节点的排序
	private static class DoubleWritableDecreasingComparator extends DoubleWritable.Comparator {
		public int compare(DoubleWritable a, DoubleWritable b) {
			return -super.compare(a, b);
		}

		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			return -super.compare(b1, s1, l1, b2, s2, l2);
		}

	}
④将三个阶段合并成一个过程

        我的实现方法是设置第二阶段为主类,第二阶段调用第一阶段和第三阶段的main函数。

        以下是实现方法:

	public static void main(String[] args) throws Exception {
		// 命令行参数,包括了输入的文件和要输出的文件目录
		FormatInput.main(null);

		args = new String[] { "hdfs://localhost:9000/ex3/mapInput/part-r-00000", "hdfs://localhost:9000/ex3/iter" };
		Configuration conf = new Configuration();// 为任务设定配置文件
		conf.set("fs.defaultFS", "hdfs://localhost:9000");

		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();// 其它参数
		if (otherArgs.length != 2) {// 参数长度不等于2则异常推出
			System.err.println("Usage: wordcount  ");
			System.exit(2);
		}
		// 迭代10次,输出的目录作为下一次迭代输入的目录
		for (int i = 0; i < 10; i++) {
                ...
		}
		SortPageRank.main(null);
		System.exit(0);

	}
⑤运行结果

        在主类PageRank.java直接通过Eclipse的运行按钮进行测试,生成如下结果:

        将output里面的part-r-00000与标准输出比对:

        输出一致,因此可以将其打包到集群上运行。

        事实上,我是为了调试方便将3个过程分置在3个java程序里。如果觉得上面分别写3个java程序有些麻烦,可以将3个class都写在1个class,可以参考这一篇博客。

⑥上传至集群运行

        在本机调试时是基于本地的hdfs文件系统路径,上传到集群需要修改部分输入输出路径的参数和conf配置。

        在FormatInput.java里面修改参数如下:

		// 命令行参数,包括了输入的文件和要输出的文件目录
		args = new String[] { "hdfs://10.102.0.198:9000/ex3/input",
"hdfs://10.102.0.198:9000/user/bigdata_201900130047/Experiment_3_Hadoop/mapInput" };
		Configuration conf = new Configuration();// 为任务设定配置文件
		conf.set("fs.defaultFS", "hdfs://10.102.0.198:9000");

        在PageRank.java里面修改参数如下:

		// 命令行参数,包括了输入的文件和要输出的文件目录
		FormatInput_cluster.main(null);
		args = new String[] {"hdfs://10.102.0.198:9000/user/bigdata_201900130047/Experiment_3_Hadoop/mapInput/part-r-00000","hdfs://10.102.0.198:9000/user/bigdata_201900130047/Experiment_3_Hadoop/iter" };
		Configuration conf = new Configuration();// 为任务设定配置文件
		conf.set("fs.defaultFS", "hdfs://10.102.0.198:9000");

        在SortedPageRank里面修改参数如下:

		// 命令行参数,包括了输入的文件和要输出的文件目录
		args = new String[] { "hdfs://10.102.0.198:9000/user/bigdata_201900130047/Experiment_3_Hadoop/iter9",
"hdfs://10.102.0.198:9000/user/bigdata_201900130047/Experiment_3_Hadoop/output" };
		Configuration conf = new Configuration();// 为任务设定配置文件
		conf.set("fs.defaultFS", "hdfs://10.102.0.198:9000");

        然后将程序打包成jar包。

        ①先使用命令:

scp PageRank_Hadoop.jar 用户名@服务器IP:/home/用户名

        将本地程序提交到 Hadoop 集群,然后通过命令:

ssh 用户名@服务器IP

        远程登录到 Hadoop 集群进行 *** 作。

        ②在集群上使用命令:

hadoop jar PageRank_Hadoop.jar 输入目录 输出目录

        在集群上运行Hadoop作业,这里我在代码中已经指定输出目录为集群hdfs 目录下的 Experiment_3_Hadoop,因此直接使用命令:

hadoop jar PageRank_Hadoop.jar

        执行完成后可以在集群的hdfs上查看自己的输出文件:

附源代码

FormatInput.java:

package org.apache.hadoop.examples;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class FormatInput {
	public static class Map extends Mapper {
		// map输入:<行偏移,page page1,page2,page3……>
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] tuple = value.toString().split("\t");
			// tuple[0]就是page,tuple[1]是page指向的网页集合,以","为分隔符的list
			// 默认所有网络的PR初值为1
			if (tuple.length > 1)// 有外联接,输出
				context.write(new Text(tuple[0]), new Text("1.0\t" + tuple[1]));
			else// 无外链接,输出
				context.write(new Text(tuple[0]), new Text("1.0\t"));// 输出的中间结果为(一行数据,null)
		}
	}

	public static void main(String[] args) throws Exception {
		// 命令行参数,包括了输入的文件和要输出的文件目录
		args = new String[] { "hdfs://localhost:9000/ex3/input", "hdfs://localhost:9000/ex3/mapInput" };

		Configuration conf = new Configuration();// 为任务设定配置文件
		conf.set("fs.defaultFS", "hdfs://localhost:9000");

		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();// 其它参数
		if (otherArgs.length != 2) {// 参数长度不等于2则异常推出
			System.err.println("Usage: wordcount  ");
			System.exit(2);
		}
		Path path = new Path(args[1]);
		// 加载配置文件
		FileSystem fileSystem = path.getFileSystem(conf);
		// 输出目录若存在则删除
		if (fileSystem.exists(new Path(args[1]))) {
			fileSystem.delete(new Path(args[1]), true);
		}

		Job job = new Job(conf, "FormatInput");// 新建Job
		job.setJarByClass(FormatInput.class);// 设置执行任务的jar

		job.setMapperClass(Map.class);// 设置Maper类

		job.setOutputKeyClass(Text.class);// 设置job输出的key
		job.setOutputValueClass(Text.class);// job输出的value
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));// 输入文件的路径
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));// 输出文件的路径
		job.waitForCompletion(true);// 提交任务等待任务完成
	}
}

PageRank.java

package org.apache.hadoop.examples;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank {
	// 重载map函数
	public static class Map extends Mapper {
		// map的输入是<行偏移,page PR page1,page2,page3……>
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] tuple = value.toString().split("\t");// 将value以‘\t’分割
			String pageName = tuple[0];// tuple[0]是page
			double PR = Double.parseDouble(tuple[1]);// tuple[1]是PR
			if (tuple.length > 2)// 存在外链接
			{
				String[] linkPages = tuple[2].split(",");
				for (String linkPage : linkPages) {// 取出每一个指向的网页
					String PR_to = String.valueOf(PR / linkPages.length);// 求出指向的网页所分配到的PR值
					// 设所指向的网页为u
					context.write(new Text(linkPage), new Text(PR_to));// 输出
				}
				// 传递原本的网页图结构,如果不传递的话reduce输出失去了图结构信息,无法作为下一次迭代map的输入
				context.write(new Text(pageName), new Text("|" + tuple[2]));
			}
		}
	}

	public static class Reduce extends Reducer {
		// 设置d为0.85
		private static double d = 0.85;

		// 输入为
		public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
			String links = "";
			double pageRank = 0;// 用于累加PR
			for (Text value : values) {
				String tmp = value.toString();
				if (tmp.startsWith("|")) {// 图结构
					links = "\t" + tmp.substring(tmp.indexOf("|") + 1);
				} else {// PR
					pageRank += Double.parseDouble(tmp);
				}
			}
			// 计算最后的PR
			pageRank = (double) (1 - d) + d * pageRank;
			context.write(new Text(key), new Text(String.valueOf(pageRank) + links));
			// 输出,与map读取的输入一致,以便进行迭代
		}
	}

	public static void main(String[] args) throws Exception {
		// 命令行参数,包括了输入的文件和要输出的文件目录

		FormatInput.main(null);

		args = new String[] { "hdfs://localhost:9000/ex3/mapInput/part-r-00000", "hdfs://localhost:9000/ex3/iter" };
		Configuration conf = new Configuration();// 为任务设定配置文件
		conf.set("fs.defaultFS", "hdfs://localhost:9000");

		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();// 其它参数
		if (otherArgs.length != 2) {// 参数长度不等于2则异常推出
			System.err.println("Usage: wordcount  ");
			System.exit(2);
		}
		// 迭代10次,输出的目录作为下一次迭代输入的目录
		for (int i = 0; i < 10; i++) {
			Job job = new Job(conf, "PageRank");// 新建Job
			job.setJarByClass(PageRank.class);// 设置执行任务的jar
			job.setMapperClass(Map.class);// 设置Maper类
			job.setReducerClass(Reduce.class);// 设置Reduce类
			job.setOutputKeyClass(Text.class);// 设置job输出的key
			job.setOutputValueClass(Text.class);// job输出的value

			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));// 输入文件的路径

			Path path = new Path(otherArgs[1] + i);
			// 加载配置文件
			FileSystem fileSystem = path.getFileSystem(conf);
			// 输出目录若存在则删除
			if (fileSystem.exists(new Path(otherArgs[1] + i))) {
				fileSystem.delete(new Path(otherArgs[1] + i), true);
			}
			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1] + i));// 输出文件的路径

//			System.out.println(otherArgs[0]);
//			System.out.println(otherArgs[1] + i);
			// 输入目录变为此次输出的目录
			otherArgs[0] = otherArgs[1] + i + "/part-r-00000";
//			System.out.println(otherArgs[0]);
			job.waitForCompletion(true);// 提交任务等待任务完成
		}
		SortPageRank.main(null);
		System.exit(0);
//		job.setMapOutputKeyClass(Text.class);
//		job.setMapOutputValueClass(Text.class);

	}
}

SortPageRank.java

package org.apache.hadoop.examples;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class SortPageRank {

	public static class Map extends Mapper {
		// PageRank输出的是
		// 需要用到的是前面的page和PR
		// 因为要按PR降序排序,所以将PR作为key,page作为value
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			String[] tuple = value.toString().split("\t");
			context.write(new DoubleWritable(Double.parseDouble(tuple[1])), new Text(tuple[0]));
		}
	}

	// 将输入key只保留10位小数
	// 同时将输出的格式改成标准result的格式
	// 这里有个坑就是value不能设为“”,虽然也是空,但是输出会在key和“”多一个‘\t’
	// 设value为NullWritable就能只输出所需要的key
	public static class Reduce extends Reducer {
		public void reduce(DoubleWritable key, Iterable values, Context context)
				throws IOException, InterruptedException {
			for (Text value : values)
				context.write(new Text("(" + value.toString() + "," + String.format("%.10f", key.get()) + ")"),
						NullWritable.get());
		}
	}

	// 自定义的降序排序,用于同一个reduce节点的排序
	private static class DoubleWritableDecreasingComparator extends DoubleWritable.Comparator {
		public int compare(DoubleWritable a, DoubleWritable b) {
			return -super.compare(a, b);
		}

		public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
			return -super.compare(b1, s1, l1, b2, s2, l2);
		}

	}

	public static void main(String[] args) throws Exception {
		// 命令行参数,包括了输入的文件和要输出的文件目录
		args = new String[] { "hdfs://localhost:9000/ex3/iter9", "hdfs://localhost:9000/ex3/output" };

		Configuration conf = new Configuration();// 为任务设定配置文件
		conf.set("fs.defaultFS", "hdfs://localhost:9000");
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();// 其它参数
		if (otherArgs.length != 2) {// 参数长度不等于2则异常推出
			System.err.println("Usage: wordcount  ");
			System.exit(2);
		}
		Path path = new Path(args[1]);
		// 加载配置文件
		FileSystem fileSystem = path.getFileSystem(conf);
		// 输出目录若存在则删除
		if (fileSystem.exists(new Path(args[1]))) {
			fileSystem.delete(new Path(args[1]), true);
		}

		Job job = new Job(conf, "SortPageRank");// 新建Job
		job.setJarByClass(SortPageRank.class);// 设置执行任务的jar

		job.setMapperClass(Map.class);// 设置Maper类
		job.setReducerClass(Reduce.class);// 设置Reduce类

		job.setMapOutputKeyClass(DoubleWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setSortComparatorClass(DoubleWritableDecreasingComparator.class);
		job.setOutputKeyClass(Text.class);// 设置job输出的key
		job.setOutputValueClass(Text.class);// job输出的value
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));// 输入文件的路径
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));// 输出文件的路径
		job.waitForCompletion(true);// 提交任务等待任务完成
	}
}
在intelliJ上使用spark实现(scala)

        因为对spark也是刚接触一两天,所以可能对知识的理解有一些纰漏。这里仅仅描述一下我学习的一些经验,仅仅作为零基础学习的参考。

        使用spark的最大问题就是intelliJ的安装问题。intelliJ需要依次配置好scala插件,scala-sdk。然后在library引入scala-sdk和spark的库。

        下面是我参考过的教程:

        [1]比较易懂的配置过程

        [2]实用性比较广的Maven配置,但比较难理解

        配置完成后,先测试自动生成的hello world程序是否能运行。如果能运行说明大概率scala环境配置成功。因为网上的intelliJ版本不一样,教程里面按钮位置也不一样。这里给出发文为止最新版的intelliJ,scala和sparl的简易快速配置(默认此时已经安装好scala插件,scala-sdk,且电脑上已经配置好spark)。

       

        然后再使用wordcount实例程序测试spark环境是否配置完成。

        这里我推荐以下两篇博客,它们简单实现了PageRank算法,也可以先初步了解如何用scala写PageRank,测试spark环境是否配置完整。如果两篇博客的程序都能运行,基本就能运行spark程序。

        [1]PageRank的简易Spark实现(1)

        [2]PageRank的简易Spark实现(2)

        然后还要解决scala编程语言问题,scala整体与java相似,学的比较快。

        [1]厦大林子雨老师对scala和spark入门的讲解

        [2]菜鸟scala教程

        接着解决使用spark库编程的问题。需要理解spark的框架,RDD的设计原理。

        然后就能使用RDD算子进行编程,推荐以下入门博客:

        [1]RDD编程简易入门

        [2]RDD算子的查询与使用

单机环境

        spark编程使用与类似于hdfs的conf设置:

    val conf = new SparkConf().setAppName("PageRank").setMaster("local")
    val sc = new SparkContext(conf)

        通过SparkContext得到Spark的上下文,可以连接到文件系统,主要还是得到RDD算子进行 *** 作。

        以下是进行迭代前的一些准备:

    //参数定义
    val d = 0.85
    val iterCnt = 10
    //从HDFS读取图结构,并把图结构存入内存
    val lines = sc.textFile("hdfs://localhost:9000/ex3/input")
    //得到
    val links = lines.map(line => (line.split("\t")(0), line.split("\t")(1).split(","))).cache()
    //初始化PR值
    var ranks = links.mapValues(_ => 1.0)

        通过SparkContext的textFile获得输入文件的RDD,对此RDD进行Map映射。

        输入文件是通过'\t'划分得到page和page1,page2,…

        split("\t")(0)就是page, split("\t")(1)就是{page1,page2,…}(与java不同,这里使用的是小括号不是中括号),split("\t")(1)再通过split(",")划分得到String类型的Array。

        line=>(line1,line2)表示将linesRDD描述的一行行文件内容被映射成键值对。这时候得到的links是一个一个如下所示的键值对:

        最后加个cache表示将此linksRDD存放内存。如果不存放内存,程序也能运行,但是spark内存计算的优势就没体现出来。因为每一次reduce触发action之后,都需要重新去hdfs文件系统取出文件到RDD进行一系列transform,相比mapreduce实现的虽然少了第一阶段规范化的磁盘io,但是后续每一次迭代的reduce都需要1次磁盘io。如果能读取到内存,那么以后基于此RDD的transform都不用进行磁盘io。

        ranks是将links的values(原本是{page1,page2,page3…})重新map映射,保留key的基础上将value初始化为1,即初始各个网页的PR值为1。rank里面此时的元素是

        接下来进行迭代的 *** 作。

    //迭代
    for (i <- 0 until iterCnt) {
      //得到
      val mapInput = links.join(ranks)
      //计算输出的pr
      val contributions = mapInput.flatMap {
        //类型匹配,有外链接的才输出pr值
        //由一个page输出多个
        case (_, (linkList, rank)) => linkList.map(pageTo => (pageTo, rank / linkList.size))
      }
      //reduce,先求和得到总的pr,再加权
      val pagePR = contributions.reduceByKey((x, y) => x + y)
      ranks = pagePR.mapValues(v => (1 - d) + d * v)
    }

        通过links.join将links和rank连接在一起得到mapInput,连接后mapInput的元素为:

        这就是我们之前在MapReduce时Map的输入,只不过value里面PR和{page1,page2,page3…}的位置反了。为了书写方便,我们令{page1,page2,page3…}为linkList。按照之前Map的 *** 作,我们要对linkList里的每一个都要输出<,pr>,同时输出图结构。这里图结构不需要传递了,因为已经读取到linksRDD,之后的RDD可以直接使用。

        因此对于每一个元素,我们要输出这个page所指向的linkList中的每一个。也就是下面过程:

linkList.map(pageTo => (pageTo, rank / linkList.size))

此时产生的是{(page1,pr1),(page2,pr2),(page3,pr3)…}

        而mapInput是由很多个 元素组成的,因此对于最后map会得到:

{        {(page1,pr1),(page2,pr2),(page3,pr3)},

        {(page1,pr1),(page2,pr2),(page3,pr3)}},

        {(page1,pr1),(page2,pr2),(page3,pr3)}…        }

        一个List嵌套了多个List,因此同flatMap进行扁平化。这里还有一个问题就是不是所有的page都有外链接(实验数据每个page都有),因此先用case进行模式匹配,如果value键值对里面存在List才有外链接,才进行flatMap。

        接下来就是Reduce阶段。Map所做的一些针对RDD的 *** 作都没有立即执行,只有action类 *** 作如Reduce时,才会进行RDD之间的计算与转换。Reduce的输入是:

        

        直接使用ReduceByKey将value依次累加即可。有一个注意点是:

        reduceByKey((x, y) => x + y)里面的x+y不是key+value,而是对于values里面的每一个value进行依次累加,如1,2,3,4,5就是1+2=3,3+3=6,6+4=10,10+5=15。

        最后就是排序阶段,使用sortBy对输入的value进行降序排序。排完序之后要把输出输出的PR设置为保留10位小数,不足补0。

    val result = ranks.sortBy(x => x._2, false).mapValues(x => x.formatted("%.10f"))
    //    此时输出已经是标准答案的格式"(page,PR)"
    //    可以调用如下函数进行验证
    //    result.foreach(println)

        这里要注意的是,result此时格式已经是标准了,也可以此时加上一个result.foreach(println)来验证一下。

        Spark与MapReduce不同在于,当我们输入输出的是键值对时,输出的格式就是:

(key,value)

        而MapReduce输出的键值对是:

key \t value

        最后将RDD写回到文件系统。

    //    输出到 HDFS文件系统
    val SavePath = "hdfs://localhost:9000/ex3/outputSpark"
    hdfsDel(sc, SavePath)
    result.saveAsTextFile(SavePath)

        写回时需要作一个判断,如果目标目录已经存在,那么再写到该目录会出错,因此需要先将存在的目录删除。也就是hdfsDel(sc, SavePath)。

        函数过程如下:

  def hdfsDel(sc: SparkContext, filePath: String): Unit = {
    //也可以通过scala来获取
    //这里直接用spark自带的hadoopconf *** 作

    //String转为Path
    val output = new Path(filePath)
    //获得conf
    val conf = sc.hadoopConfiguration
    //获得hdfs文件系统
    //  采用下面这一种方式会报错
    //    val hdfs = FileSystem.get(conf)
    val hdfs = output.getFileSystem(conf);
    //判断文件是否存在
    if (hdfs.exists(output))
      hdfs.delete(output, true)
  }

        传入SparkContext,通过它获得hadoopConfiguration。在单机环境下,需要通过Path的getFileSystem获得文件系统FileSystem,然后通过FileSystem的exists判断输出路径是否存在,若存在则递归删除目录及其子文件。

运行过程:

附源代码
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.File

object PageRank {
  def dirDel(path: File): Unit = {
    if (!path.exists()) //文件路径不存在
      return
    else if (path.isFile()) { //要删除的是一个文件
      path.delete()
      return
    }
    //要删除的是目录
    val file: Array[File] = path.listFiles()
    for (d <- file) {
      dirDel(d)
    }
    path.delete()
  }
  def hdfsDel(sc: SparkContext, filePath: String): Unit = {
    //也可以通过scala来获取
    //这里直接用spark自带的hadoopconf操作

    //String转为Path
    val output = new Path(filePath)
    //获得conf
    val conf = sc.hadoopConfiguration
    //获得hdfs文件系统
    //  采用下面这一种方式会报错
    //    val hdfs = FileSystem.get(conf)
    val hdfs = output.getFileSystem(conf);
    //判断文件是否存在
    if (hdfs.exists(output))
      hdfs.delete(output, true)
  }
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("PageRank").setMaster("local")
    val sc = new SparkContext(conf)
    //参数定义
    val d = 0.85
    val iterCnt = 10
    //从HDFS读取图结构,并把图结构存入内存
    val lines = sc.textFile("hdfs://localhost:9000/ex3/input")
    //得到
    val links = lines.map(line => (line.split("\t")(0), line.split("\t")(1).split(","))).cache()
    //初始化PR值
    var ranks = links.mapValues(_ => 1.0)

    //迭代
    for (i <- 0 until iterCnt) {
      //得到
      val mapInput = links.join(ranks)
      //计算输出的pr
      val contributions = mapInput.flatMap {
        //类型匹配,有外链接的才输出pr值
        //由一个page输出多个
        case (_, (linkList, rank)) => linkList.map(pageTo => (pageTo, rank / linkList.size))
      }
      //reduce,先求和得到总的pr,再加权
      val pagePR = contributions.reduceByKey((x, y) => x + y)
      ranks = pagePR.mapValues(v => (1 - d) + d * v)
    }

    val result = ranks.sortBy(x => x._2, false).mapValues(x => x.formatted("%.10f"))
    //    此时输出已经是标准答案的格式"(page,PR)"
    //    可以调用如下函数进行验证
    //    result.foreach(println)

    //    输出到本地文件系统看看是否正确
    //    val SavePath = "/home/hadoop/test3/spark"
    //    val file = new File(SavePath)
    //    if (file.exists()) //文件路径存在
    //      dirDel(file)
    //    result.saveAsTextFile("file://" + SavePath)

    //    输出到 HDFS文件系统
    val SavePath = "hdfs://localhost:9000/ex3/outputSpark"
    hdfsDel(sc, SavePath)
    result.saveAsTextFile(SavePath)
  }
}
集群环境

        上传至集群时,需要对参数以及个别函数进行修改。

        首先是Master的参数的设置,原本是local,现在需要改成集群。可以不设置,我这里使用了集群的Master端口。对于setMaster的参数可以参考这里。

    val conf = new SparkConf().setAppName("PageRank_byX01").setMaster("spark://Master:7077")
    val sc = new SparkContext(conf)

        输入文件目录相应的修改为集群的hdfs文件系统:

    //从HDFS读取图结构,并把图结构存入内存
    val lines = sc.textFile("hdfs://10.102.0.198:9000/ex3/input", 1)

        输出目录也相应地修改:

    //    输出到 HDFS文件系统
    val SavePath = "hdfs://10.102.0.198:9000/user/bigdata_201900130047/Experiment_3_Spark"
    //若文件目录已经存在则删除
    hdfsDel(sc, SavePath)
    //保存到文件系统
    result.saveAsTextFile(SavePath)

        同时hdfsDel(sc, SavePath)函数也需要修改:

  def hdfsDel(sc: SparkContext, filePath: String): Unit = {
    //也可以通过scala来获取
    //这里直接用spark自带的hadoopconf *** 作

    //String转为Path
    val output = new Path(filePath)
    //获得conf
    val conf = sc.hadoopConfiguration
    //获得hdfs文件系统
    //  采用下面这一种方式会报错
    //    val hdfs = output.getFileSystem(conf);
    //两个刚好是相反,现在这个适用于集群环境
    val hdfs = FileSystem.get(conf)
    //判断文件是否存在
    if (hdfs.exists(output))
      hdfs.delete(output, true)
  }

        修改原因也是因为实验测试中发现如果不修改会报错,具体可以参考此解释。

        函数功能不变,但是获取文件系统的方式变了。原本是单机环境获取hdfs文件系统,现在集群下,变成了通过FileSystem.get(conf)获得文件系统。

        在集群的配置下,默认会将文件分区为2.这样就会导致结果会输出两个文件,不好进行比对。我这里为了简单起见,仅仅设置了读取文件时强制分区为1.这样map分区就为1,同时reduce不做说明时会保持map的分区,所以最后结果就只有一个part文件。

    val lines = sc.textFile("hdfs://10.102.0.198:9000/ex3/input", 1)

        使用intelliJ的项目构建,选择需要的编译输出和jar包、去掉不需要包含的依赖库打包成jar包上传到集群。上传方法也是使用scp上传。

        在集群上运行spark的jar包与MapReduce直接使用hadoop jar运行不同。我所上传的服务器集群上已经安装了spark环境,因此进入 spark 的 bin 目录下,使用命令:

 spark-submit --class PageRank(主类名) PageRank_Spark.jar

        运行 Spark 作业。

        在Spark集群网页查看任务执行状态:

        可以看到任务执行成功。

        将输出的文件与标准答案比对:

        出现图中所示即说明答案与标准输出一致。

集群环境下分区的思考

        为了让结果文件只有一个,我在读文件时指定了分区数目为1.

        在spark.default.parallelism默认设置为2,而代码中的优先级高于外部配置文件,所以最后分区还是1.

        这样输出只有一个文件了,但实际上效率和并行度也下降了。一个RDD的每个partition分给了一个task去执行,每个Executor都带1核,每次最多只能执行一个task。这样后续的RDD *** 作,Map包括后面Reduce(没有额外指定分区不为1的话)都是只有一个task执行一个partition。即使这时候空闲的Executor很多但实际执行的Executor仍只有1个,如果数据量很大的话,那么任务执行的就很慢。

        查询资料后发现,存在这样一种方法:

        在RDD上调用coalesce(1,true).saveAsTextFile(),意味着做完计算之后将数据汇集到一个分区,然后再执行保存的动作,显然,一个分区,Spark自然只起一个task来执行保存的动作,也就只有一个文件产生了。

        又或者,可以调用repartition(1),它其实是coalesce的一个包装,默认第二个参数为true。这样在最后执行要保存时才执行合成1个分区,之前计算仍然按多个分区计算,多个task并行。

        再或者,在sortBy参数里面多加一个分区数。

        但这样做存在一些问题。因为Spark面对的是大量的数据,并且是并行执行的,如果强行要求最后只有一个分区,必然导致大量的磁盘IO和网络IO产生,原因是将分布将各个机器上的RDD partition 合并到单一主机后再读入磁盘。并且最终执行reduce *** 作的节点的内存也会承受很大考验。Spark程序会很慢,甚至死掉。

        存在一种安全的方法是,将最后合成文件的任务交给hdfs来完成。

        通过命令:

hadoop fs -getmerge /hdfs/output   /local/file.txt

        把HDFS 上output里的多个文件合并成一个file.txt本地文件。但这样可能需要处理各个分区局部有序与全局有序的问题。

        关于分区的参考文章如下:

        [1]Spark中使用saveAsTextFile生成一个文件

        [2]RDD默认分区数及分区数目的设置

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/922934.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存