import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; //评分矩阵 public class Step1Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //获取job对象 Configuration conf = new Configuration(); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://10.3.0.15:9000"); // conf.set("fs.defaultDF","hdfs://localhost:9000"); Job job = Job.getInstance(conf); //设置jar包路径 job.setJarByClass(Step1Driver.class); //关联map和reduce job.setMapperClass(Step1Mapper.class); job.setReducerClass(Step1Reducer.class); //设置map的输出类型 job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); //设置最终输出类型 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); //设置输入路径 和输出路径 FileInputFormat.setInputPaths(job,new Path("file:\E:\workspace\workspace_BigData1\mrtest1\src\main\resources\data.csv")); FileOutputFormat.setOutputPath(job,new Path("file:\E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result4")); boolean result = job.waitForCompletion(true); System.exit(result? 0: 1); } }
Step1Mapper
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class Step1Mapper extends Mapper{ Text outV=new Text(); IntWritable outK=new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strs = value.toString().split(","); int k = Integer.parseInt(strs[0]); int v1 = Integer.parseInt(strs[1]); double v2 = Double.parseDouble(strs[2]); outK.set(k); outV.set(v1+":"+v2); context.write(outK,outV); } }
Step1Reducer
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Step1Reducer extends ReducerStep2Driver{ Text outV= new Text(); @Override protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { StringBuffer stringBuffer = new StringBuffer(""); for (Text value : values) { stringBuffer.append(","+value); } String result = stringBuffer.toString().replaceFirst(",",""); outV.set(result); context.write(key,outV); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; //同现矩阵 public class Step2Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(Step2Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Step2Mapper.class); job.setReducerClass(Step2Reducer.class); FileInputFormat.setInputPaths(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result4\part-r-00000")); FileOutputFormat.setOutputPath(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result6"));//第6条数据通过验证 boolean b = job.waitForCompletion(true); System.exit(b? 0: 1 ); } }
Step2Mapper
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class Step2Mapper extends Mapper{ Text k =new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("[t,]"); //下次使用[t,]测试 result7里面的数据 for (int i = 1; i < tokens.length; i++) { String itemID = tokens[i].split(":")[0]; for (int j = 1; j < tokens.length; j++) { String itemID2 = tokens[j].split(":")[0]; k.set(itemID + ":" + itemID2); context.write(k,v); } } } }
Step2Reducer
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Step2Reducer extends ReducerStep3Driver{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable value : values) { sum+=value.get(); } context.write(key,new IntWritable(sum)); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //评分矩阵的转换 import java.io.IOException; public class Step3Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(Step3Driver.class); job.setMapperClass(Step3Mapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result4\part-r-00000")); FileOutputFormat.setOutputPath(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result8")); boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }
Step3Mapper
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class Step3Mapper extends Mapper{ IntWritable outK =new IntWritable(); Text outV=new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("[t,]"); for(int i =1 ;i Step5Driver import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class Step5Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(Step5Driver.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setMapperClass(Step5Mapper.class); job.setReducerClass(Step5Reducer.class); FileInputFormat.setInputPaths(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result8\part-r-00000"), new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result7\part-r-00000")); FileOutputFormat.setOutputPath(job,new Path("H:\data\result10")); boolean b = job.waitForCompletion(true); System.exit(b? 0:1 ); } }Step5Mapper
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class Step5Mapper extends Mapper{ private final static IntWritable k = new IntWritable(); private final static Text v = new Text(); private final static Map > cooccurrenceMatrix = new HashMap<>(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split("[t,]"); String[] v1 = tokens[0].split(":"); String[] v2 = tokens[1].split(":"); // 同现 100044:100044 1 if(v1.length>1){//同现矩阵 int itemID1 = Integer.parseInt(v1[0]); int itemID2 = Integer.parseInt(v1[1]); int num = Integer.parseInt(tokens[1]); List list=null;// 如果已经存在就直接到hashM ap中去取,否则创建新的 if(!cooccurrenceMatrix.containsKey(itemID1)){ list=new ArrayList(); }else{ list= cooccurrenceMatrix.get(itemID1); } list.add(new Cooccurrence(itemID1, itemID2, num)); cooccurrenceMatrix.put(itemID1,list); }//说明:将数据写入到文件当中 // 评分 1 604:3.0 if(v2.length>1){//评分矩阵 int itemID = Integer.parseInt(tokens[0]); int userID = Integer.parseInt(v2[0]); double score = Double.parseDouble(v2[1]); k.set(userID);//得到同现相似度itemID for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) { v.set(co.getItemID2()+","+score*co.getNum()); context.write(k,v); } } } } Step5Reducer
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; public class Step5Reducer extends ReducerSpark 分析{ private static final Text v=new Text(); @Override protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { Map result = new HashMap (); for (Text value : values) { String[] str = value.toString().split(","); if (result.containsKey(str[0])) { result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1])); } else { result.put(str[0], Double.parseDouble(str[1])); } } Iterator iter = result.keySet().iterator(); while (iter.hasNext()) { String itemID = iter.next(); double score = result.get(itemID); v.set(itemID + "," + score); context.write(key,v); } } } import com.hankcs.hanlp.HanLP import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary import org.apache.spark.sql.SparkSession import scala.collection.JavaConversions._ object Step5 { def main(args: Array[String]): Unit = { val spark :SparkSession= SparkSession.builder().master("local[*]").appName("test5").getOrCreate() val data = spark.sparkContext.textFile("E:\workspace\workspace_BigData1\spark_mllib_test\src\main\resources\hanlp.csv") // 将数据进行分词 , 并过滤掉通用词 val res1 = data.map(item => { val terms = HanLP.segment(item)//分词 CoreStopWordDictionary.apply(terms)//清洗停用词 terms.map(x => x.word.replaceAll(" ", "")) }) //进行数据扁平化和转换 val res2 = res1.flatMap(x => x).map((_, 1)).reduceByKey(_ + _) .map(x => (x._2, x._1)) .sortByKey(false) .take(10) //先将数据转换成rdd spark.sparkContext.makeRDD(res2).saveAsTextFile("E:\workspace\workspace_BigData1\spark_mllib_test\src\main\resources\res2") res2.foreach(println) } }欢迎分享,转载请注明来源:内存溢出
评论列表(0条)