今天,分布式计算引擎是许多分析、批处理和流应用程序的支柱。Spark提供了许多开箱即用的高级功能(pivot、分析窗口函数等)来转换数据。有时需要处理分层数据或执行分层计算。许多数据库供应商提供诸如“递归 CTE(公用表达式)”或“join” SQL 子句之类的功能来查询/转换分层数据。CTE 也称为递归查询或父子查询。在这篇文章中,我们将看看如何使用 Spark 解决这个问题。
分层数据概述 –存在分层关系,其中一项数据是另一项的父项。分层数据可以使用图形属性对象模型表示,其中每一行都是一个顶点(节点),连接是连接顶点的边(关系),列是顶点的属性。
一些用例- 财务计算 - 子账户一直累积到父账户直至最高账户
- 创建组织层次结构 - 经理与路径的员工关系
- 使用路径生成网页之间的链接图
- 任何类型的涉及链接数据的迭代计算
在分布式系统中查询分层数据有一些挑战
数据是连接的,但它分布在分区和节点之间。解决这个问题的实现应该针对执行迭代和根据需要移动数据(shuffle)进行优化。
图的深度会随着时间的推移而变化——解决方案应该处理不同的深度,并且不应该强制用户在处理之前定义它。
在 spark 中实现 CTE 的方法之一是使用Graphx Pregel API。
什么是 Graphx Pregel API?Graphx 是用于图形和图形并行计算的 Spark API。图算法本质上是迭代的,顶点的属性取决于它们直接或间接(通过其他顶点连接)连接顶点的属性。Pregel 是由 Google 和 spark graphX 开发的以顶点为中心的图处理模型,它提供了 pregel api 的优化变体。
Pregel API 如何工作?Pregel API 处理包括执行超级步骤
步骤 0:将初始消息传递给所有顶点
将值作为消息发送到其直接连接的顶点
接收来自前面步骤的消息
改变值
将值作为消息发送到其直接连接的顶点
重复 步骤 1 直到有消息传递,当没有更多消息传递时停止。
下表显示了我们将用于生成自上而下的层次结构的示例员工数据。这里员工的经理由具有 emp_id 值的 mgr_id 字段表示。
添加以下列作为处理的一部分
import org.apache.log4j.{Level, Logger} import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataframe, SparkSession} import scala.util.hashing.MurmurHash3 object PregelTest { // The code below demonstrates use of Graphx Pregel API - Scala 2.11+ // functions to build the top down hierarchy //setup & call the pregel api //设置并调用pregel api def calcTopLevelHierarcy(vertexDF: Dataframe, edgeDF: Dataframe): RDD[(Any,(Int,Any,String,Int,Int))] = { // create the vertex RDD // primary key, root, path val verticesRDD: RDD[(VertexId, (Any, Any, String))] = vertexDF .rdd .map{x=> (x.get(0),x.get(1) , x.get(2))} .map{ x => (MurmurHash3.stringHash(x._1.toString).toLong, ( x._1.asInstanceOf[Any], x._2.asInstanceOf[Any] , x._3.asInstanceOf[String]) ) } // create the edge RDD // top down relationship val EdgesRDD = edgeDF .rdd .map{x=> (x.get(0),x.get(1))} .map{ x => Edge(MurmurHash3.stringHash(x._1.toString).toLong, MurmurHash3.stringHash(x._2.toString).toLong,"topdown" )} // create graph val graph = Graph(verticesRDD, EdgesRDD).cache() val pathSeperator = """/""" // 初始化消息 // initialize id,level,root,path,iscyclic, isleaf val initialMsg = (0L,0,0.asInstanceOf[Any], List("dummy"),0,1) // add more dummy attributes to the vertices - id, level, root, path, isCyclic, existing value of current vertex to build path, isleaf, pk val initialGraph = graph.mapVertices((id, v) => (id, 0, v._2, List(v._3), 0, v._3, 1, v._1) ) val hrchyRDD = initialGraph.pregel( initialMsg, Int.MaxValue, // 迭代次数, 设置成当前表示无限迭代下去 EdgeDirection.Out)( setMsg, sendMsg, mergeMsg) // build the path from the list val hrchyOutRDD = hrchyRDD.vertices.map{case(id,v) => (v._8,(v._2,v._3,pathSeperator + v._4.reverse.mkString(pathSeperator),v._5, v._7 )) } hrchyOutRDD } //改变顶点的值 def setMsg(vertexId: VertexId, value: (Long,Int,Any,List[String], Int,String,Int,Any), message: (Long,Int, Any,List[String],Int,Int)): (Long,Int, Any,List[String],Int,String,Int,Any) = { // 第一次收到的消息是初始化的消息 initialMsg println(s"设置值: $value 收到消息: $message") if (message._2 < 1) { //superstep 0 - initialize (value._1,value._2+1,value._3,value._4,value._5,value._6,value._7,value._8) } else if ( message._5 == 1) { // set isCyclic (判断是不是一个环) (value._1, value._2, value._3, value._4, message._5, value._6, value._7,value._8) } else if ( message._6 == 0 ) { // set isleaf (value._1, value._2, value._3, value._4, value._5, value._6, message._6,value._8) } else { // set new values //( message._1,value._2+1, value._3, value._6 :: message._4 , value._5,value._6,value._7,value._8) ( message._1,value._2+1, message._3, value._6 :: message._4 , value._5,value._6,value._7,value._8) } } // 将值发送到顶点 def sendMsg(triplet: EdgeTriplet[(Long,Int,Any,List[String],Int,String,Int,Any), _]): Iterator[(VertexId, (Long,Int,Any,List[String],Int,Int))] = { val sourceVertex: (VertexId, Int, Any, List[String], Int, String, Int, Any) = triplet.srcAttr val destinationVertex: (VertexId, Int, Any, List[String], Int, String, Int, Any) = triplet.dstAttr println(s" 源头: $sourceVertex 目的地: $destinationVertex") // 检查是不是一个死环, 就是 a是b的领导, b是a的领导 // check for icyclic if (sourceVertex._1 == triplet.dstId || sourceVertex._1 == destinationVertex._1) { println(s"存在死环 源头: ${sourceVertex._1} 目的地: ${triplet.dstId}") if (destinationVertex._5 == 0) { //set iscyclic Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 1, sourceVertex._7))) } else { Iterator.empty } } else { // 判断是不是叶子节点,就是没有子节点的节点,属于叶子节点,根节点不算 ,所以样例数据中的叶子节点是 3,8,10 if (sourceVertex._7==1) //is NOT leaf { Iterator((triplet.srcId, (sourceVertex._1,sourceVertex._2,sourceVertex._3, sourceVertex._4 ,0, 0 ))) } else { // set new values Iterator((triplet.dstId, (sourceVertex._1, sourceVertex._2, sourceVertex._3, sourceVertex._4, 0, 1))) } } } // 从所有连接的顶点接收值 def mergeMsg(msg1: (Long,Int,Any,List[String],Int,Int), msg2: (Long,Int, Any,List[String],Int,Int)): (Long,Int,Any,List[String],Int,Int) = { println(s"合并值: $msg1 $msg2") // dummy logic not applicable to the data in this usecase msg2 } // Test with some sample data def main(args: Array[String]): Unit = { // 屏蔽日志 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val spark: SparkSession = SparkSession .builder .appName(s"${this.getClass.getSimpleName}") .master("local[1]") .getOrCreate() val sc = spark.sparkContext // RDD 转 DF, 隐式转换 import spark.implicits._ val empData = Array( // 测试没有顶级的父节点,会出现空指针异常,构建图的时候,会根据边生成一个为null的顶点 ("EMP001", "Bob", "Baker", "CEO", null.asInstanceOf[String]) , ("EMP002", "Jim", "Lake", "CIO", "EMP001") , ("EMP003", "Tim", "Gorab", "MGR", "EMP002") , ("EMP004", "Rick", "Summer", "MGR", "EMP002") , ("EMP005", "Sam", "Cap", "Lead", "EMP004") , ("EMP006", "Ron", "Hubb", "Sr.Dev", "EMP005") , ("EMP007", "Cathy", "Watson", "Dev", "EMP006") , ("EMP008", "Samantha", "Lion", "Dev", "EMP007") , ("EMP009", "Jimmy", "Copper", "Dev", "EMP007") , ("EMP010", "Shon", "Taylor", "Intern", "EMP009") // 空指针和顶点数据重复没有关系 // 空指针和父节点在顶点中找不到有关系 (父顶点为null没有关系,需要父顶点能够在顶点列表中能找到) , ("EMP011", "zhang", "xiaoming", "CTO", null) ) // create dataframe with some partitions val empDF = sc.parallelize(empData, 3) .toDF("emp_id","first_name","last_name","title","mgr_id") .cache() // primary key , root, path - dataframe to graphx for vertices val empVertexDF = empDF.selectExpr("emp_id","concat(first_name,' ',last_name)","concat(last_name,' ',first_name)") // parent to child - dataframe to graphx for edges val empEdgeDF = empDF.selectExpr("mgr_id","emp_id").filter("mgr_id is not null") // call the function val empHirearchyExtDF: Dataframe = calcTopLevelHierarcy(empVertexDF,empEdgeDF) .map{ case(pk,(level,root,path,iscyclic,isleaf)) => (pk.asInstanceOf[String],level,root.asInstanceOf[String],path,iscyclic,isleaf)} .toDF("emp_id_pk","level","root","path","iscyclic","isleaf").cache() // extend original table with new columns val empHirearchyDF = empHirearchyExtDF.join(empDF , empDF.col("emp_id") === empHirearchyExtDF.col("emp_id_pk")) .selectExpr( "emp_id","first_name","last_name", "title","mgr_id", "level", "root", "path", "iscyclic","isleaf" ) // print empHirearchyDF.show() } }输出 任务执行
Spark 作业分解为作业、阶段和任务。由于其迭代性质,Pregel API 在内部生成多个作业。每次将消息传递到顶点时都会生成一个作业。由于数据可能位于不同的节点上,因此每个作业可能会以多次 shuffle 结束。
需要注意的是在处理大型数据集时创建的长 RDD 谱系。
Graphx Pregel API 非常强大,可用于解决迭代问题或任何图形计算。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)