Spark分析和推荐系统

Spark分析和推荐系统,第1张

Spark分析和推荐系统 Step1Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

//评分矩阵
public class Step1Driver {


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //获取job对象
        Configuration conf = new Configuration();
        conf.set("dfs.client.use.datanode.hostname", "true");
        conf.set("fs.defaultFS","hdfs://10.3.0.15:9000");
//        conf.set("fs.defaultDF","hdfs://localhost:9000");
        Job job = Job.getInstance(conf);

        //设置jar包路径
        job.setJarByClass(Step1Driver.class);

        //关联map和reduce
         job.setMapperClass(Step1Mapper.class);
         job.setReducerClass(Step1Reducer.class);

         //设置map的输出类型
          job.setMapOutputKeyClass(IntWritable.class);
          job.setMapOutputValueClass(Text.class);

          //设置最终输出类型
          job.setOutputKeyClass(IntWritable.class);
          job.setOutputValueClass(Text.class);


          //设置输入路径 和输出路径
        FileInputFormat.setInputPaths(job,new Path("file:\E:\workspace\workspace_BigData1\mrtest1\src\main\resources\data.csv"));
        FileOutputFormat.setOutputPath(job,new Path("file:\E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result4"));

        boolean result = job.waitForCompletion(true);

        System.exit(result? 0: 1);

    }
}

Step1Mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Step1Mapper extends Mapper {

    Text outV=new Text();
    IntWritable    outK=new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strs = value.toString().split(",");
        int k = Integer.parseInt(strs[0]);
        int v1 = Integer.parseInt(strs[1]);
        double v2 = Double.parseDouble(strs[2]);
        outK.set(k);
        outV.set(v1+":"+v2);
        context.write(outK,outV);
    }
}

Step1Reducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class Step1Reducer extends Reducer {


    Text  outV= new Text();
    @Override
    protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException {
        StringBuffer stringBuffer = new StringBuffer("");

        for (Text value : values) {
            stringBuffer.append(","+value);
        }
        String result = stringBuffer.toString().replaceFirst(",","");

        outV.set(result);
        context.write(key,outV);
    }
}

Step2Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
//同现矩阵
public class Step2Driver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Job job = Job.getInstance(new Configuration());


        job.setJarByClass(Step2Reducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(Step2Mapper.class);
        job.setReducerClass(Step2Reducer.class);

        FileInputFormat.setInputPaths(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result4\part-r-00000"));
        FileOutputFormat.setOutputPath(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result6"));//第6条数据通过验证

        boolean b = job.waitForCompletion(true);

        System.exit(b? 0: 1 );
    }
}

Step2Mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Step2Mapper extends Mapper {
    Text k =new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split("[t,]"); //下次使用[t,]测试 result7里面的数据
        for (int i = 1; i < tokens.length; i++) {
            String itemID = tokens[i].split(":")[0];
            for (int j = 1; j < tokens.length; j++) {
                String itemID2 = tokens[j].split(":")[0];
                k.set(itemID + ":" + itemID2);
                context.write(k,v);
            }
        }

    }
}

Step2Reducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Step2Reducer extends Reducer  {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

         int sum = 0 ;

        for (IntWritable value : values) {
            sum+=value.get();
        }

        context.write(key,new IntWritable(sum));

    }
}


Step3Driver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//评分矩阵的转换
import java.io.IOException;

public class Step3Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(Step3Driver.class);

        job.setMapperClass(Step3Mapper.class);

        job.setMapOutputKeyClass(IntWritable.class);
         job.setMapOutputValueClass(Text.class);
         job.setOutputKeyClass(IntWritable.class);
         job.setOutputValueClass(Text.class);


        FileInputFormat.setInputPaths(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result4\part-r-00000"));
        FileOutputFormat.setOutputPath(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result8"));

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

Step3Mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Step3Mapper extends Mapper {


    IntWritable outK =new IntWritable();
    Text outV=new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split("[t,]");

        for(int i =1  ;i 
Step5Driver 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Step5Driver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(Step5Driver.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Step5Mapper.class);
        job.setReducerClass(Step5Reducer.class);

        FileInputFormat.setInputPaths(job,new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result8\part-r-00000"),
                new Path("E:\workspace\workspace_BigData1\mrtest1\src\main\resources\result7\part-r-00000"));
        FileOutputFormat.setOutputPath(job,new Path("H:\data\result10"));

        boolean b = job.waitForCompletion(true);

        System.exit(b? 0:1  );

    }
}


Step5Mapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Step5Mapper extends Mapper {

    private final static IntWritable k = new IntWritable();
    private final static Text v = new Text();
    private final static Map> cooccurrenceMatrix = new HashMap<>();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] tokens = value.toString().split("[t,]");
        String[] v1 = tokens[0].split(":");
        String[] v2 = tokens[1].split(":");

        // 同现 100044:100044	1
        if(v1.length>1){//同现矩阵
            int itemID1 = Integer.parseInt(v1[0]);
            int itemID2 = Integer.parseInt(v1[1]);
            int num = Integer.parseInt(tokens[1]);

            List list=null;// 如果已经存在就直接到hashM     ap中去取,否则创建新的
            if(!cooccurrenceMatrix.containsKey(itemID1)){
                list=new ArrayList();
            }else{
                list= cooccurrenceMatrix.get(itemID1);
            }
            list.add(new Cooccurrence(itemID1, itemID2, num));
            cooccurrenceMatrix.put(itemID1,list);
        }//说明:将数据写入到文件当中

        // 评分 1	604:3.0
        if(v2.length>1){//评分矩阵
            int itemID = Integer.parseInt(tokens[0]);
            int userID = Integer.parseInt(v2[0]);
            double score = Double.parseDouble(v2[1]);

            k.set(userID);//得到同现相似度itemID
            for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) {
                v.set(co.getItemID2()+","+score*co.getNum());
                context.write(k,v);
            }

        }
    }
}

Step5Reducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class Step5Reducer extends Reducer {

    private  static final  Text v=new Text();
    @Override
    protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException {

        Map result = new HashMap();

        for (Text value : values) {
            String[] str = value.toString().split(",");
            if (result.containsKey(str[0])) {
                result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1]));
            } else {
                result.put(str[0], Double.parseDouble(str[1]));
            }
        }
        Iterator iter = result.keySet().iterator();
        while (iter.hasNext()) {

            String itemID =  iter.next();
            double score = result.get(itemID);
            v.set(itemID + "," + score);
            context.write(key,v);
        }
    }
}

Spark 分析
import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConversions._

object Step5 {

  def main(args: Array[String]): Unit = {

    val spark :SparkSession= SparkSession.builder().master("local[*]").appName("test5").getOrCreate()

    val data = spark.sparkContext.textFile("E:\workspace\workspace_BigData1\spark_mllib_test\src\main\resources\hanlp.csv")
    // 将数据进行分词 ,  并过滤掉通用词
    val res1 = data.map(item => {

      val terms = HanLP.segment(item)//分词
      CoreStopWordDictionary.apply(terms)//清洗停用词
      terms.map(x => x.word.replaceAll(" ", ""))
    })

//进行数据扁平化和转换
    val res2 = res1.flatMap(x => x).map((_, 1)).reduceByKey(_ + _)
      .map(x => (x._2, x._1))
      .sortByKey(false)
      .take(10)

//先将数据转换成rdd
    spark.sparkContext.makeRDD(res2).saveAsTextFile("E:\workspace\workspace_BigData1\spark_mllib_test\src\main\resources\res2")
    res2.foreach(println)
  }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5669796.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存