关于graphx中vertexId为long所引发的小问题

关于graphx中vertexId为long所引发的小问题,第1张

graphx里节点ID是一个long类型的字段,如果从已有关系形数据加载graphx时,每个节点已有自己的唯一标识(String类型的code),想通过这个唯一的code生成唯一对应的long类型节点ID是个小坑,如果使用普通的hashcode.toLong()放在分布式环境下还是有些不妥,spark RDD有zipWithUniqueId方法可以根据分区索引和分区数量在分布式环境下建立唯一索引的方法。以此code记录,若有问题欢迎指出!

以下代码仅为demo,请结合实际使用。

package org.spark.demo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.graphx.Edge;
import org.apache.spark.graphx.Graph;
import org.apache.spark.storage.StorageLevel;
import scala.Serializable;
import scala.Tuple2;
import scala.Tuple3;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

import java.util.ArrayList;
import java.util.List;


/**
 * @author qn
 */
public class HelloSparkGraphX implements Serializable {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("spark-graphx-kk").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        // classTag
        ClassTag<String> stringTag = ClassTag$.MODULE$.apply(String.class);

        // 节点
        List<Tuple2<String, String>> vertex = new ArrayList<>();
        vertex.add(new Tuple2<>("1", "人物A"));
        vertex.add(new Tuple2<>("2", "人物B"));
        vertex.add(new Tuple2<>("3", "人物C"));
        vertex.add(new Tuple2<>("4", "公司1"));
        vertex.add(new Tuple2<>("5", "公司2"));
		
		// 构建节点RDD(命名比较随意,其实没必要拆开就瞎起名了)
        JavaRDD<Tuple2<String, String>> vertexRdd = sc.parallelize(vertex).rdd().toJavaRDD();
        JavaPairRDD<Tuple2<String, String>, Long> vertexRddNew = vertexRdd.distinct().zipWithUniqueId();
        JavaPairRDD<String, Long> vertexRddNewNew = vertexRddNew.mapToPair(e -> new Tuple2<>(e._1._1, e._2));

		// 构建边RDD
        List<Tuple3<String, String, String>> edges = new ArrayList<>();
        edges.add(new Tuple3<>("4", "1", "管理1"));
        edges.add(new Tuple3<>("4", "1", "管理2"));
        edges.add(new Tuple3<>("5", "2", "管理3"));
        edges.add(new Tuple3<>("5", "3", "管理4"));
        JavaRDD<Tuple3<String, String, String>> edgesRdd = sc.parallelize(edges).rdd().toJavaRDD();
        JavaRDD<Edge<String>> edgesRddNew = edgesRdd.mapToPair(e -> new Tuple2<>(e._1(), new Tuple2<>(e._2(), e._3())))
                .join(vertexRddNewNew)
                .mapToPair(e -> new Tuple2<>(e._2._1._1, new Tuple2<>(e._2._2, e._2._1._2)))
                .join(vertexRddNewNew)
                .map(e -> new Edge<>(e._2._2, e._2._1._1, e._2._1._2));

		// 构建图
        Graph<String, String> graph = Graph.fromEdges(edgesRddNew.rdd(),
                "",
                StorageLevel.MEMORY_ONLY(),
                StorageLevel.MEMORY_ONLY(),
                stringTag,
                stringTag);

        List<Edge<String>> e = graph.edges().toJavaRDD().collect();
        System.out.println(e);
    }

}


一直有个疑问,为什么graphx的节点ID必须使用64bit long类型,求问百度谷歌只有个别相关问题,答案无非就是“VertexID是Long的别名”、“底层实现就是Long”之类…你搁这搁这呢。想偷懒不看源码看来是找不到答案了,等板砖空袭去看看源码吧。
graphx是基于RDD的,GraphFrame是基于dataframe的,虽然它现在还未合并到spark graphx的api里,但毫无疑问它已经成长的很强大了。好奇GraphFrame是怎么实现支持string类型的code作为节点唯一标识的呢,scala api里有这样的一段话:

Note that vertex (and edge) attributes include vertex IDs (and source, destination IDs) in order to support non-Long vertex IDs. If the vertex IDs are not convertible to Long values, then the values are indexed in order to generate corresponding Long vertex IDs (which is an expensive operation).

也就是说,要不你的String就要支持强转为Long,要不就要给你建个索引表用来对应你的String到Long,而且悄咪咪告诉你成本还很高,最好别尝试(有说了白说的嫌疑,不过人家也不是为了解决Long单一的问题,就是为了让你好用些)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/727370.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存