输入为网页有向图的邻接表:
通过统计输入文件的行数,即可得之网页总数为4
每个网页的初值为1/N,即0.25
第一行输入经过map处理后,得到如下结果:
B 0.0833 C 0.0833 D 0.0833
同理,第二三四行经过map处理后,得到:
A 0.125 D 0.125 C 0.25 B 0.125 C 0.125
系统会自动对map的输出进行shuflle处理,即对key进行排序,将相同key的value合并成一个列表。
即
A 0.125 B 0.0833 0.125 C 0.0833 0.25 0.125 D 0.0833 0.125 0.125
此时出现一个疑问:
为什么要进行这一步,而不是直接将相同key的value进行加和呢?
是为了MapReduce编程的可扩展性,在已知PageRank任务的前提下,我们知道要对相同key的value进行加和,如果是求最大值的任务呢?
所以把对value列表的 *** 作交给reduce,我们要怎么 *** 作这些列表,只要对reduce进行编写即可。
为解决网页间的终止点问题和陷阱问题,需要在reduce中进行如下处理(网页没有出链或者出链只有自己,pr值迭代后只增不减)
假设:上网者通过出链访问其他网页的概率为a,通过地址栏随机访问页面的概率为(1-a)
所以,在reduce过程,某网页pr变换为:
a *(接收其他网页发送来的pr值) + (1-a) * 1/N
经过reduce处理后,网页的pr值为
A = 0.8 * 0.125 + 0.2 * 0.25 = 0.15
B = 0.8 * (0.0833 + 0.125) + 0.2 * 0.25 = 0.216
C = 0.8 * (0.0833 + 0.25) + 0.2 * 0.25 = 0.416
D = 0.8 * (0.0833 + 0.125 + 0.125) + 0.2 * 0.25 = 0.216
此时一轮迭代结束,将reduce的结果输出
那么何时停止迭代呢?
要么到达最大迭代次数,要么pr值的变化已经收敛(pr值的曲线图趋于水平)
如何判断pr值收敛:
设置一个参数epi,若 max | Pi j - P i j-1| < epi ,则说明pr值的变化已经收敛。
完整的程序如下:(支持eclipse Run on Hadoop,不支持yarn -jar运行,因为yarn -jar运行时,只能访问类中static变量的初始值,若在程序运行时对static变量的值进行更改,则map/reduce中得到的变量值还是旧值)
package test02; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class PageRank_02 { private static int N = 1; private static float a = 0.8f; private static int maxIteration = 40; private static float epi = 0.000001f; private static HashMapmap; private static HashMap old_map; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: [max_iteration] [epi]"); System.err.println(" max_iteration -- Integer, default 40"); System.err.println(" epi -- Float, default 0.000001f"); System.exit(2); } String input = ""; if (otherArgs.length > 0) input = otherArgs[0]; String output = ""; if (otherArgs.length > 1) output = otherArgs[1]; if (otherArgs.length > 2) setMaxIteration(Integer.parseInt(otherArgs[2])); if (otherArgs.length > 3) setEpi(Float.parseFloat(otherArgs[3])); // 统计input文件行数,即网页个数 FileSystem fs = FileSystem.get(conf); FSDataInputStream in = fs.open(new Path(input)); BufferedReader d = new BufferedReader(new InputStreamReader(in)); int count = 0; String line; while ((line = d.readLine()) != null) { count += 1; } System.err.println("Numbers of pages: " + count); setN(count); d.close(); in.close(); for (int i = 0; i < getMaxIteration(); i++) { map = new HashMap (); Job job = Job.getInstance(conf, "page rank"); job.setJarByClass(PageRank_02.class); job.setMapperClass(PageRankMapper.class); job.setReducerClass(PageRankReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(input)); String new_output = output + (i + 1);// 把下一次的输出设置成一个新地址。 // 输出路径存在,则删除 Path new_output_path = new Path(new_output); if (fs.exists(new_output_path)) { fs.delete(new_output_path, true); } FileOutputFormat.setOutputPath(job, new_output_path); job.waitForCompletion(true); float max_delta = -1.0f; if (i > 0) { for (String key : map.keySet()) { max_delta = Math.max(max_delta, Math.abs(map.get(key) - old_map.get(key))); } } System.err.println("iteration: " + i + " , MaxIteration: " + getMaxIteration()); System.err.println("N: " + getN()); System.err.println("a: " + getA()); System.err.println("max_delta: " + max_delta); System.err.println("epi: " + getEpi()); if (max_delta < epi && i > 0) break; old_map = map; } System.exit(0); } public static class PageRankMapper extends Mapper
程序输入参数分别为:输入文件 输出文件 Max_iteration epi
Run Configurations设置如下
按照如图配置运行程序
在iteration: 14时,程序退出循环
pr值变化的最大值:
max_delta = 0.0000846
设置的参数epi:
epi = 0.0001
max_delta < epi
即pr值已收敛
参考文献:
1.MapReduce 之PageRank算法概述、设计思路和源码分析https://blog.csdn.net/u010414589/article/details/51404971
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)