大数据系统与大规模数据分析课程第二次作业

大数据系统与大规模数据分析课程第二次作业,第1张

大数据系统与大规模数据分析第二次作业(Graph Coloring) 一、Part1作业内容

​ WordCount的变体,主要考察对Hadoop MapReduce框架编程的熟练度,几乎无坑点。

​ MapReduce AC代码如下:

/**
 * @ClassName Hw2Part1
 * @Description: homework2
 * @Author: cgg
 * @CreateDate: 2022/4/9 20:24
 * @UpdateUser: cgg
 * @UpdateDate: 2022/4/9 20:24
 * @UpdateRemark: null
 * @Version: 1.0
 */
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
@SuppressWarnings("all")
public class Hw2Part1 {
    /**
     * @auther: cgg
     * @Description: map class
     * @date: 2022/4/9 20:30
     */
    public static class TokenizerMapper extends Mapper<Object, Text, Text, DoubleWritable>{
    
    private final static DoubleWritable DoubleTime = new DoubleWritable();
    private Text word = new Text();
    //map
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] args = value.toString().trim().split("\s+");
        if(args.length!=3) return ;
        word.set(args[0]+" "+args[1]);
        DoubleTime.set(Double.parseDouble(args[2]));
        context.write(word, DoubleTime);
    }
  }
  /**
     * @auther: cgg
     * @Description: reduce class
     * @date: 2022/4/9 20:50
     */
  public static class GetCountandAvgReducer extends Reducer<Text,DoubleWritable,Text,Text> {
    //key value
    private Text result_key= new Text();
    private Text result_value= new Text();
    // reduce
    public void reduce(Text key, Iterable<DoubleWritable> values, Context context
                       ) throws IOException, InterruptedException {
      double doublesum = 0.0;
      int cnt =0;
      double avg = 0.0;
      for (DoubleWritable val : values) {
        doublesum += val.get();
        cnt++;
      }
      avg = doublesum*1.0/cnt;
      // generate result key
      result_key.set(key);
      // generate result value
      result_value.set(Integer.toString(cnt));
      result_value.append(" ".getBytes(), 0, 1);
      result_value.append(String.format("%.3f", avg).getBytes(), 0, String.format("%.3f", avg).length());
      context.write(result_key, result_value);
    }
  }
  //main
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount  [...] ");
      System.out.println("hello, no args!!");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "Hw2Part1");
    //invoke
    job.setJarByClass(Hw2Part1.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(GetCountandAvgReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(DoubleWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    // add the input paths as given by command line
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    // add the output path as given by the command line
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

二、Part2作业内容

​ 1. 同步图计算框架GraphLite的使用(C/C++实现) 2. 超步的理解 3. 聚合器Aggregator的理解

​ 坑点较多,如:

1. 一定要返回上一级目录运行程序,要不然会一直拒绝连接
2. 如果顶点值类型修改为int,返回顶点值size也要对应修改,聚合器Aggregator类型指针也要对应修改,要不然虽然可以编译成.so,但运行时会一直拒绝连接
3. 按照老师的思路走的话,选择不冲突颜色值时一定要随机(十分重要!!!),要不然超步会不收敛
4. 运行程序按Ctrl+C程序并不会停止,要输入killall graphlite 才能关闭所有端口

​ 思路与上图老师所给思路基本一致,使用STL容器set查找速度快一些,程序AC时间会少一些;程序主要分两部分:1. compute()函数的编写 2. 超步结束条件的编写

​ AC代码如下:

/**
 * @file GraphColor.cc
 * @author  cgg
 * @version 0.1
 * @section DESCRIPTION
 * homework 2 :GraphColor
 */
#include 
#include 
#include 
#include 

#include "GraphLite.h"

#define VERTEX_CLASS_NAME(name) GraphColor##name
#define EPS 1e-6
long long v0;//v0
long long color_num;//color_num

class VERTEX_CLASS_NAME(InputFormatter): public InputFormatter {
public:
    int64_t getVertexNum() {
        unsigned long long n;
        sscanf(m_ptotal_vertex_line, "%lld", &n);
        m_total_vertex= n;
        return m_total_vertex;
    }
    int64_t getEdgeNum() {
        unsigned long long n;
        sscanf(m_ptotal_edge_line, "%lld", &n);
        m_total_edge= n;
        return m_total_edge;
    }
    int getVertexValueSize() {
        m_n_value_size = sizeof(int);
        return m_n_value_size;
    }
    int getEdgeValueSize() {
        m_e_value_size = sizeof(int);
        return m_e_value_size;
    }
    int getMessageValueSize() {
        m_m_value_size = sizeof(int);
        return m_m_value_size;
    }
    void loadGraph() {
        unsigned long long last_vertex;
        unsigned long long from;
        unsigned long long to;
        int weight = 0;
        //The initial value is - 1
        int value = -1;
        int outdegree = 0;
        
        const char *line= getEdgeLine();

        // Note: modify this if an edge weight is to be read
        //       modify the 'weight' variable

        sscanf(line, "%lld %lld %d", &from, &to,&weight);
        addEdge(from, to, &weight);

        last_vertex = from;
        ++outdegree;
        for (int64_t i = 1; i < m_total_edge; ++i) {
            line= getEdgeLine();

            // Note: modify this if an edge weight is to be read
            //       modify the 'weight' variable

            sscanf(line, "%lld %lld %d", &from, &to, &weight);
            if (last_vertex != from) {
                addVertex(last_vertex, &value, outdegree);
                last_vertex = from;
                outdegree = 1;
            } else {
                ++outdegree;
            }
            addEdge(from, to, &weight);
        }
        addVertex(last_vertex, &value, outdegree);
    }
};

class VERTEX_CLASS_NAME(OutputFormatter): public OutputFormatter {
public:
    void writeResult() {
        int64_t vid;
        int value;
        char s[1024];

        for (ResultIterator r_iter; ! r_iter.done(); r_iter.next() ) {
            r_iter.getIdValue(vid, &value);
            int n = sprintf(s, "%lld: %d\n", (unsigned long long)vid, (int)value);
            writeNextResLine(s, n);
        }
    }
};

// An aggregator that records a double value tom compute sum
class VERTEX_CLASS_NAME(Aggregator): public Aggregator<int> {
public:
    void init() {
        m_global = 0;
        m_local = 0;
    }
    void* getGlobal() {
        return &m_global;
    }
    void setGlobal(const void* p) {
        m_global = * (int *)p;
    }
    void* getLocal() {
        return &m_local;
    }
    void merge(const void* p) {
        m_global += * (int *)p;
    }
    void accumulate(const void* p) {
        m_local += * (int *)p;
    }
};

class VERTEX_CLASS_NAME(): public Vertex <int, int, int> {
public:
    //Judge whether the color is qualified
    bool IsOk(int color , MessageIterator* pmsgs){
        if(color==-1) return false;
        bool flag = true;
        for ( ; ! pmsgs->done(); pmsgs->next() ) {
                if(pmsgs->getValue()==color){
                    flag = false;
                    return flag;
                }
            }
        return flag;
    }
    void compute(MessageIterator* pmsgs) {
        //Initial assignment
        int val=-1;
        if (getSuperstep() == 0) {
            // When the Superstep is 0, set v0
            if(m_pme->m_v_id==v0) val= 0;
        } else {
            if (getSuperstep() >= 2) {
                //End condition: when the last statistical result is almost unchanged, make the node sleep
                int global_val = * (int *)getAggrGlobal(0);
                //MessageIterator* temp_pmsgs = pmsgs;
                if (global_val < EPS /*|| IsOk(currentcolor,temp_pmsgs)*/) {
                    voteToHalt(); return;
                }
            }
            //Collect the colors of all neighbor nodes to set
            //STL set:The bottom layer is implemented as a tree structure, which is easy to query
            set<int> ColorFromNeighbors;
            for ( ; ! pmsgs->done(); pmsgs->next() ) {
                ColorFromNeighbors.insert(pmsgs->getValue());
            }
            //Judge whether the dyeing scheme is feasible,
            //if ok find()==end() else find()!=end() 
            //Random is very important!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
            int currentcolor = getValue();
            if(ColorFromNeighbors.find(currentcolor)!=ColorFromNeighbors.end()||currentcolor==-1){
                //select random color in [0,color_num-1]
                int randomcolor=rand()%color_num;
                //If the conditions are not met, cycle until they are met
                while(ColorFromNeighbors.find(randomcolor)!=ColorFromNeighbors.end()){
                    randomcolor=rand()%color_num;
                }
                val=randomcolor;
            }else{
                //If the conditions are met, it is the current value
                val=currentcolor;
            }
            //accumulate
            int acc = fabs(getValue() - val);
            accumulateAggr(0, &acc);
        }
        //Current node assignment
        * mutableValue() = val;
        sendMessageToAllNeighbors(val);
    }
};

class VERTEX_CLASS_NAME(Graph): public Graph {
public:
    VERTEX_CLASS_NAME(Aggregator)* aggregator;

public:
    // argv[0]: PageRankVertex.so
    // argv[1]: 
    // argv[2]: 
    // argv[3]: v0
    // argv[4]: color num
    void init(int argc, char* argv[]) {
        setNumHosts(5);

        setHost(0, "localhost", 1411);
        setHost(1, "localhost", 1421);
        setHost(2, "localhost", 1431);
        setHost(3, "localhost", 1441);
        setHost(4, "localhost", 1451);

        if (argc < 5) {
           printf ("Usage: %s    \n", argv[0]);
           exit(1);
        }

        m_pin_path = argv[1];
        m_pout_path = argv[2];
        char* temp1;
        char* temp2;
        //Convert input parameters to decimal(* char->int)
        v0  = strtoull(argv[3], &temp1, 10);
        color_num = strtoull(argv[4], &temp2,10);

        aggregator = new VERTEX_CLASS_NAME(Aggregator)[1];
        regNumAggr(1);
        regAggr(0, &aggregator[0]);
    }

    void term() {
        delete[] aggregator;
    }
};

/* STOP: do not change the code below. */
extern "C" Graph* create_graph() {
    Graph* pgraph = new VERTEX_CLASS_NAME(Graph);

    pgraph->m_pin_formatter = new VERTEX_CLASS_NAME(InputFormatter);
    pgraph->m_pout_formatter = new VERTEX_CLASS_NAME(OutputFormatter);
    pgraph->m_pver_base = new VERTEX_CLASS_NAME();

    return pgraph;
}

extern "C" void destroy_graph(Graph* pobject) {
    delete ( VERTEX_CLASS_NAME()* )(pobject->m_pver_base);
    delete ( VERTEX_CLASS_NAME(OutputFormatter)* )(pobject->m_pout_formatter);
    delete ( VERTEX_CLASS_NAME(InputFormatter)* )(pobject->m_pin_formatter);
    delete ( VERTEX_CLASS_NAME(Graph)* )pobject;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/707167.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-24
下一篇 2022-04-24

发表评论

登录后才能评论

评论列表(0条)

保存