使用explain来分析broadcast

使用explain来分析broadcast,第1张

使用explain来分析broadcast

最近发现一个问题,可以看到有Dataframe被广播,但不知道是左边还是右边,试图通过执行计划来判断哪个数据源被广播。

准备数据
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.broadcast

case class People(id: Int, name: String)

case class Student(sid: Int, sname: String)

object BroadcastTest {

    def main(args: Array[String]): Unit = {
        val spark = SparkSession
        .builder()
        .master("local")
        .appName("BroadcastTest")
        .getOrCreate()
        import spark.implicits._
        val df1 = spark.createDataframe(Seq(
            People(1, "darren"),
            People(2, "zhang"),
            People(3, "jonathan"),
            People(4, "li"),
            People(5, "joyce"),
            People(6, "chen"),
            People(7, "chaoshu"),
            People(8, "sha"),
            People(9, "cindy"),
            People(10, "xie"))).as[People]
        //df1.show()

        val df2 = spark.createDataframe(Seq(Student(1, "darren"))).as[Student]
        //df2.show()

		// change this part to monitor broadcast behavior
        val df3 = df1.join(df2,  $"id" === $"sid", joinType = "inner")

        df3.show()
        df3.explain()
        spark.stop()
    }
}
Case 1:df1 inner join df2
val df3 = df1.join(df2,  $"id" === $"sid", joinType = "inner")
+---+------+---+------+
| id|  name|sid| sname|
+---+------+---+------+
|  1|darren|  1|darren|
+---+------+---+------+
== Physical Plan ==
*(1) BroadcastHashJoin [id#0], [sid#7], Inner, BuildRight
:- LocalTableScan [id#0, name#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [sid#7, sname#8]

现在还不能看出什么,只是有一些方向性的词汇,比如BuildRight,其次就是这里有两个LocalTableScan,一个是左表People,一个是右表Student,只是第二个LocalTableScan位于BroadcastExchange之下。可以有所怀疑,是不是在广播右表。

Case 2: df1 inner join df2,Hint broancast df2
val df3 = df1.join(broadcast(df2), $"id" === $"sid", joinType = "inner")
+---+------+---+------+
| id|  name|sid| sname|
+---+------+---+------+
|  1|darren|  1|darren|
+---+------+---+------+
== Physical Plan ==
*(1) BroadcastHashJoin [id#0], [sid#7], Inner, BuildRight
:- LocalTableScan [id#0, name#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [sid#7, sname#8]

从这里来看,Hint broadcast df2和不Hint broadcast df2结果是一样的

Case 3: df1 inner join df2,Hint broancast df1
val df3 = broadcast(df1).join(df2, $"id" === $"sid", joinType = "inner")
+---+------+---+------+
| id|  name|sid| sname|
+---+------+---+------+
|  1|darren|  1|darren|
+---+------+---+------+
== Physical Plan ==
*(1) BroadcastHashJoin [id#0], [sid#7], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
:  +- LocalTableScan [id#0, name#1]
+- LocalTableScan [sid#7, sname#8]

这里就发现与上边两次有明显的不同,第一点,这里是BuildLeft,并且左表People在BroadcastExchange之下。明显可以表明BuildLeftBuildRight可以确定方向,下边的BroadcastExchange也可以确定,但是要左表和右表结构不同才可以明显看出来是在广播左表或右表。

对于第一个情境,为什么是BuildRight?难道是因为左表大右表小吗?是有可能的,我们把左右表调换一下位置看看结果。

Case 4: df2 inner join df1
val df3 = df2.join(df1, $"sid" === $"id", joinType = "inner")
+---+------+---+------+
|sid| sname| id|  name|
+---+------+---+------+
|  1|darren|  1|darren|
+---+------+---+------+
== Physical Plan ==
*(1) BroadcastHashJoin [sid#7], [id#0], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
:  +- LocalTableScan [sid#7, sname#8]
+- LocalTableScan [id#0, name#1]

似乎还真是,广播的是小表(左表Student)

接下开再看一下left join是什呢情况?

Case 5: df1 left join df2
val df3 = df1.join(df2, $"id" === $"sid", joinType = "left")
+---+--------+----+------+
| id|    name| sid| sname|
+---+--------+----+------+
|  1|  darren|   1|darren|
|  2|   zhang|null|  null|
|  3|jonathan|null|  null|
|  4|      li|null|  null|
|  5|   joyce|null|  null|
|  6|    chen|null|  null|
|  7| chaoshu|null|  null|
|  8|     sha|null|  null|
|  9|   cindy|null|  null|
| 10|     xie|null|  null|
+---+--------+----+------+
== Physical Plan ==
*(1) BroadcastHashJoin [id#0], [sid#7], LeftOuter, BuildRight
:- LocalTableScan [id#0, name#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [sid#7, sname#8]

广播的右表

Case 6: df2 left join df1
val df3 = df2.join(df1, $"sid" === $"id", joinType = "left")
+---+------+---+------+
|sid| sname| id|  name|
+---+------+---+------+
|  1|darren|  1|darren|
+---+------+---+------+
== Physical Plan ==
*(1) BroadcastHashJoin [sid#7], [id#0], LeftOuter, BuildRight
:- LocalTableScan [sid#7, sname#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [id#0, name#1]

可以看到是在广播右表,所以不管大小,总是在广播右表,那么能不能强制让它广播左表呢?

Case 7: df1 left join df2,Hint broadcast df1
val df3 = broadcast(df1).join(df2, $"id" === $"sid", joinType = "left")
+---+--------+----+------+
| id|    name| sid| sname|
+---+--------+----+------+
|  1|  darren|   1|darren|
|  2|   zhang|null|  null|
|  3|jonathan|null|  null|
|  4|      li|null|  null|
|  5|   joyce|null|  null|
|  6|    chen|null|  null|
|  7| chaoshu|null|  null|
|  8|     sha|null|  null|
|  9|   cindy|null|  null|
| 10|     xie|null|  null|
+---+--------+----+------+
== Physical Plan ==
*(1) BroadcastHashJoin [id#0], [sid#7], LeftOuter, BuildRight
:- LocalTableScan [id#0, name#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- LocalTableScan [sid#7, sname#8]

看起来不可以在left join的时候强制广播左表。

总结

Inner Join:

  • 默认广播小表,左表小就广播左表,右表小就广播右表
  • 可以指定广播的表,通过broadcast(),指定广播左表或右表,不论大小与否

Left Join:

  • 只能广播右表,不论大小与否
禁止广播
.config("spark.sql.autoBroadcastJoinThreshold", -1)
val df3 = broadcast(df1).join(df2, $"id" === $"sid", joinType = "left")
+---+--------+----+------+
| id|    name| sid| sname|
+---+--------+----+------+
|  1|  darren|   1|darren|
|  6|    chen|null|  null|
|  3|jonathan|null|  null|
|  5|   joyce|null|  null|
|  9|   cindy|null|  null|
|  4|      li|null|  null|
|  8|     sha|null|  null|
|  7| chaoshu|null|  null|
| 10|     xie|null|  null|
|  2|   zhang|null|  null|
+---+--------+----+------+
== Physical Plan ==
SortMergeJoin [id#0], [sid#7], LeftOuter
:- *(1) Sort [id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#0, 200)
:     +- LocalTableScan [id#0, name#1]
+- *(2) Sort [sid#7 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(sid#7, 200)
      +- LocalTableScan [sid#7, sname#8]

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4005739.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-22
下一篇 2022-10-22

发表评论

登录后才能评论

评论列表(0条)

保存