最近发现一个问题,可以看到有Dataframe被广播,但不知道是左边还是右边,试图通过执行计划来判断哪个数据源被广播。
准备数据import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.broadcast case class People(id: Int, name: String) case class Student(sid: Int, sname: String) object BroadcastTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local") .appName("BroadcastTest") .getOrCreate() import spark.implicits._ val df1 = spark.createDataframe(Seq( People(1, "darren"), People(2, "zhang"), People(3, "jonathan"), People(4, "li"), People(5, "joyce"), People(6, "chen"), People(7, "chaoshu"), People(8, "sha"), People(9, "cindy"), People(10, "xie"))).as[People] //df1.show() val df2 = spark.createDataframe(Seq(Student(1, "darren"))).as[Student] //df2.show() // change this part to monitor broadcast behavior val df3 = df1.join(df2, $"id" === $"sid", joinType = "inner") df3.show() df3.explain() spark.stop() } }Case 1:df1 inner join df2
val df3 = df1.join(df2, $"id" === $"sid", joinType = "inner")
+---+------+---+------+ | id| name|sid| sname| +---+------+---+------+ | 1|darren| 1|darren| +---+------+---+------+
== Physical Plan == *(1) BroadcastHashJoin [id#0], [sid#7], Inner, BuildRight :- LocalTableScan [id#0, name#1] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [sid#7, sname#8]
现在还不能看出什么,只是有一些方向性的词汇,比如BuildRight,其次就是这里有两个LocalTableScan,一个是左表People,一个是右表Student,只是第二个LocalTableScan位于BroadcastExchange之下。可以有所怀疑,是不是在广播右表。
Case 2: df1 inner join df2,Hint broancast df2val df3 = df1.join(broadcast(df2), $"id" === $"sid", joinType = "inner")
+---+------+---+------+ | id| name|sid| sname| +---+------+---+------+ | 1|darren| 1|darren| +---+------+---+------+
== Physical Plan == *(1) BroadcastHashJoin [id#0], [sid#7], Inner, BuildRight :- LocalTableScan [id#0, name#1] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [sid#7, sname#8]
从这里来看,Hint broadcast df2和不Hint broadcast df2结果是一样的
Case 3: df1 inner join df2,Hint broancast df1val df3 = broadcast(df1).join(df2, $"id" === $"sid", joinType = "inner")
+---+------+---+------+ | id| name|sid| sname| +---+------+---+------+ | 1|darren| 1|darren| +---+------+---+------+
== Physical Plan == *(1) BroadcastHashJoin [id#0], [sid#7], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- LocalTableScan [id#0, name#1] +- LocalTableScan [sid#7, sname#8]
这里就发现与上边两次有明显的不同,第一点,这里是BuildLeft,并且左表People在BroadcastExchange之下。明显可以表明BuildLeft或BuildRight可以确定方向,下边的BroadcastExchange也可以确定,但是要左表和右表结构不同才可以明显看出来是在广播左表或右表。
对于第一个情境,为什么是BuildRight?难道是因为左表大右表小吗?是有可能的,我们把左右表调换一下位置看看结果。
Case 4: df2 inner join df1val df3 = df2.join(df1, $"sid" === $"id", joinType = "inner")
+---+------+---+------+ |sid| sname| id| name| +---+------+---+------+ | 1|darren| 1|darren| +---+------+---+------+
== Physical Plan == *(1) BroadcastHashJoin [sid#7], [id#0], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- LocalTableScan [sid#7, sname#8] +- LocalTableScan [id#0, name#1]
似乎还真是,广播的是小表(左表Student)
接下开再看一下left join是什呢情况?
Case 5: df1 left join df2val df3 = df1.join(df2, $"id" === $"sid", joinType = "left")
+---+--------+----+------+ | id| name| sid| sname| +---+--------+----+------+ | 1| darren| 1|darren| | 2| zhang|null| null| | 3|jonathan|null| null| | 4| li|null| null| | 5| joyce|null| null| | 6| chen|null| null| | 7| chaoshu|null| null| | 8| sha|null| null| | 9| cindy|null| null| | 10| xie|null| null| +---+--------+----+------+
== Physical Plan == *(1) BroadcastHashJoin [id#0], [sid#7], LeftOuter, BuildRight :- LocalTableScan [id#0, name#1] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [sid#7, sname#8]
广播的右表
Case 6: df2 left join df1val df3 = df2.join(df1, $"sid" === $"id", joinType = "left")
+---+------+---+------+ |sid| sname| id| name| +---+------+---+------+ | 1|darren| 1|darren| +---+------+---+------+
== Physical Plan == *(1) BroadcastHashJoin [sid#7], [id#0], LeftOuter, BuildRight :- LocalTableScan [sid#7, sname#8] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [id#0, name#1]
可以看到是在广播右表,所以不管大小,总是在广播右表,那么能不能强制让它广播左表呢?
Case 7: df1 left join df2,Hint broadcast df1val df3 = broadcast(df1).join(df2, $"id" === $"sid", joinType = "left")
+---+--------+----+------+ | id| name| sid| sname| +---+--------+----+------+ | 1| darren| 1|darren| | 2| zhang|null| null| | 3|jonathan|null| null| | 4| li|null| null| | 5| joyce|null| null| | 6| chen|null| null| | 7| chaoshu|null| null| | 8| sha|null| null| | 9| cindy|null| null| | 10| xie|null| null| +---+--------+----+------+
== Physical Plan == *(1) BroadcastHashJoin [id#0], [sid#7], LeftOuter, BuildRight :- LocalTableScan [id#0, name#1] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- LocalTableScan [sid#7, sname#8]
看起来不可以在left join的时候强制广播左表。
总结Inner Join:
- 默认广播小表,左表小就广播左表,右表小就广播右表
- 可以指定广播的表,通过broadcast(),指定广播左表或右表,不论大小与否
Left Join:
- 只能广播右表,不论大小与否
.config("spark.sql.autoBroadcastJoinThreshold", -1)
val df3 = broadcast(df1).join(df2, $"id" === $"sid", joinType = "left")
+---+--------+----+------+ | id| name| sid| sname| +---+--------+----+------+ | 1| darren| 1|darren| | 6| chen|null| null| | 3|jonathan|null| null| | 5| joyce|null| null| | 9| cindy|null| null| | 4| li|null| null| | 8| sha|null| null| | 7| chaoshu|null| null| | 10| xie|null| null| | 2| zhang|null| null| +---+--------+----+------+
== Physical Plan == SortMergeJoin [id#0], [sid#7], LeftOuter :- *(1) Sort [id#0 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#0, 200) : +- LocalTableScan [id#0, name#1] +- *(2) Sort [sid#7 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(sid#7, 200) +- LocalTableScan [sid#7, sname#8]
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)