MapReduce实现好友推荐

MapReduce实现好友推荐,第1张

MapReduce实现好友推荐 MapReduce实现好友推荐

环境准备,三台虚拟机上已经搭好hadoop和Zookeeper集群,配置好HDFS和MapReduce环境(环境看之前文章),本案例通过使用ava API实现好友推荐(好友推荐思路图如下)。


1、启动环境
//启动三台zookeeper
zkServer.sh start
//启动HDFS
start-all.sh

启动访问主节点,创建文件加mrxx作为数据存储的位置


2、Java API具体实现代码

首先创建一个java项目,再通过工具类随机生成好友关系数据作为测试数据,在通过编写map和reduce的继承类重写方法实现具体 *** 作,最后通过Job类提交任务


① 引入依赖✧配置✧工具类

依赖pom.xml


    UTF-8
    1.8
    1.8
    
    3.1.2
    
    2.4
  

  
    
    
      org.apache.hadoop
      hadoop-common
      ${hadoop.version}
    
    
    
      org.apache.hadoop
      hadoop-hdfs
      ${hadoop.version}
    
    
    
      org.apache.hadoop
      hadoop-client
      ${hadoop.version}
    
    
    
      org.apache.hadoop
      hadoop-mapreduce-client-common
      ${hadoop.version}
    
    
    
      org.apache.hadoop
      hadoop-mapreduce-client-core
      ${hadoop.version}
    
    
    
      org.apache.hadoop
      hadoop-mapreduce-client-jobclient
      ${hadoop.version}
    
    
    
      commons-io
      commons-io
      ${commons-io.version}
    
    
      com.janeluo
      ikanalyzer
      2012_u6
    


配置文件:


工具类FriendRandomUtil

public class FriendRandomUtil {

    public static void main(String[] args) throws IOException {
        //读取学生信息
        List studentList = FileUtils.readLines(new File(FriendRandomUtil.class.getResource("/students.txt").getPath()));
        //创建好友列表映射关系
        Map> friendMap = studentList.stream().collect(Collectors.toMap(e -> e, e -> new HashSet<>()));
        //开始计算
        for (String student : friendMap.keySet()) {
            //使用蓄水池算法获取随机好友
            List sampleList = FriendRandomUtil.reservoirSampling(studentList, new Random().nextInt(30) + 10);
            //将list中选出的自己删除掉
            sampleList.remove(student);
            //将数据添加到set
            friendMap.get(student).addAll(sampleList);
            //同时将当前学生添加到对方的好友
            for (String friend : sampleList) {
                friendMap.get(friend).add(student);
            }
        }
        //打印好友信息
        for (String student : friendMap.keySet()) {
            System.out.print(student + "t");
            friendMap.get(student).stream().forEach(e -> System.out.print(e + "t"));
            System.out.println();
        }
    }

    
    public static List reservoirSampling(List studentList, int num) {
        //定义数据的蓄水池
        List sampleList = studentList.subList(0, num);
        //开始进行抽样
        for (int i = num; i < studentList.size(); i++) {
            //从0-j中随机出一个数
            int r = new Random().nextInt(i);
            if (r < num) {
                //如果随机出的r<水池大小 ,则进行替换
                sampleList.set(r, studentList.get(i));
            }
        }
        return sampleList;
    }
}

执行工具类生成测试数据,并将测试数据上传至HDFS下的mrxx文件夹里,方便程序启动时获取数据源



② 重写map、reduce方法

重写map方法

public class FriendMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //对好友关系进行拆分
        String[] friends = value.toString().split("\s");
        //开始关联直接好友
        for (int i = 1; i < friends.length; i++) {
            context.write(new Text(sortFriendName(friends[0], friends[i])), new IntWritable(0));
        }
        //开始关联间接好友
        for (int i = 1; i < friends.length; i++) {
            for (int j = i + 1; j < friends.length; j++) {
                context.write(new Text(sortFriendName(friends[i], friends[j])), new IntWritable(1));
            }
        }
    }
    
    private String sortFriendName(String f1, String f2) {
        return f1.compareTo(f2) < 0 ? f1 + ":" + f2 : f2 + ":" + f1;
    }
}

重写reduce

public class FriendReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        //定义一个计数器
        int count = 0;
        //获取迭代器
        Iterator iterator = values.iterator();
        while (iterator.hasNext()) {
            //获取推荐度
            int recommendation = iterator.next().get();
            //判断
            if (recommendation == 0) {
                return;
            } else {
                count += recommendation;
            }
        }
        //将推荐度写出到HDFS
        context.write(key, new IntWritable(count));
    }
}

③ 编写作业类FriendsJob

主要通过此类来提交我们的作业,并配置好我们此次作业的配置和重写的map和reduce方法,启动提交即可

public class FriendsJob {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //声明配置
        Configuration configuration=new Configuration();
        //设置本地模式
        configuration.set("mapreduce.framework.name","local");
        //创建job
        Job job=Job.getInstance(configuration);
        //设置job
        job.setJarByClass(FriendsJob.class);
        job.setJobName("FriendsJob"+System.currentTimeMillis());
        job.setNumReduceTasks(2);
        //设置JOB的输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("/mrxx/friends.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/mrxx/result/" + job.getJobName()));
        //设置Job的Map和Reduce
        job.setMapperClass(FriendMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(FriendReducer.class);

        //提交任务
        job.waitForCompletion(true);
    }
}


测试结果:》》》



至此好友推荐完成,后面数字代表的推荐次数高。


3、好友推荐结果存放到Mysql

在原有基础上添加一个friends类引入mysql依赖,用navicat连接虚拟机上的mysql数据库,创建一个t_friends表作为存储


① Friends类✧依赖✧表结构

Friends类

public class Friend implements Writable, DBWritable {

    private String id;
    private String person;
    private String friend;
    private Integer count;
    private Date createtime;
    }//记得补全set get 和构造器 toString

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.id);
        dataOutput.writeUTF(this.person);
        dataOutput.writeUTF(this.friend);
        dataOutput.writeInt(this.count);
        dataOutput.writeLong(this.createtime.getTime());
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.id = dataInput.readUTF();
        this.person = dataInput.readUTF();
        this.friend = dataInput.readUTF();
        this.count = dataInput.readInt();
        this.createtime = new Date(dataInput.readLong());
    }

    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, this.id);
        preparedStatement.setString(2, this.person);
        preparedStatement.setString(3, this.friend);
        preparedStatement.setInt(4, this.count);
        preparedStatement.setTimestamp(5, new Timestamp(this.createtime.getTime()));
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        this.id = resultSet.getString(1);
        this.person = resultSet.getString(2);
        this.friend = resultSet.getString(3);
        this.count = resultSet.getInt(4);
        this.createtime = resultSet.getTimestamp(5);
    }

t_friend表结构:


prom.xml

    
      mysql
      mysql-connector-java
      5.1.32
    

② 修改FriendReducer和FriendsJob

FriendsReducer

public class FriendReducer extends Reducer {

    private String jobName;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        jobName = context.getJobName();
    }

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        //声明一个计数器记录推荐度
        int count = 0;
        //获取迭代器
        Iterator iterator = values.iterator();
        //开始迭代 *** 作
        for (IntWritable value : values) {
            if (value.get() == 0) {
                return;
            } else if (value.get() == 1) {
                count += value.get();
            }
        }
        //创建一个Friend对象
        String[] pf = key.toString().split(":");
        Friend f1 = new Friend(jobName + UUID.randomUUID().toString().substring(0, 9), pf[0], pf[1], count, new Date());
        Friend f2 = new Friend(jobName + UUID.randomUUID().toString().substring(0, 9), pf[1], pf[0], count, new Date());
        //写出数据
        context.write(f1, NullWritable.get());
        context.write(f2, NullWritable.get());
    }
}

FriendsJob

public class FriendJob {

    private static String driverClass = "com.mysql.jdbc.Driver";
    private static String url = "jdbc:mysql://192.168.168.101:3306/mrxx?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true&useSSL=false";
    private static String username = "root";
    private static String password = "123456";
    private static String tableName = "t_friend";
    private static String[] fields = {"id", "person", "friend", "count", "createtime"};

    public static void main(String[] args) throws Exception {
        //加载配置文件
        Configuration configuration = new Configuration(true);
        configuration.set("mapreduce.framework.name", "local");

        //开始载入数据库的配置文件
        DBConfiguration.configureDB(configuration, driverClass, url, username, password);


        //创建JOB
        Job job = Job.getInstance(configuration);
        //设置Job
        job.setJarByClass(FriendJob.class);
        job.setNumReduceTasks(1);
        job.setJobName("FriendsJob" + System.currentTimeMillis());
        //设置读取和写出的HDFS地址
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(DBOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("/mrxx/friends.txt"));
        DBOutputFormat.setOutput(job, tableName, fields);
        //设置Map和Reduce类以及传输的数据类型
        job.setMapperClass(FriendMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(FriendReducer.class);
        //提交任务
        job.waitForCompletion(true);
    }
}

这样就修改完毕启动提交任务即可,但前提是LInux内的mysql已经启动,可以先用Navicat是否能够连接上,启动程序结果就会存放到mysql数据库了


③ 启动测试结果

完结撒花~


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5691335.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存