1. 程式人生 > >SparkSQL中的三種Join及其實現(broadcast join、shuffle hash join和sort merge join)

SparkSQL中的三種Join及其實現(broadcast join、shuffle hash join和sort merge join)

1.小表對大表(broadcast join)

將小表的資料分發到每個節點上,供大表使用。executor儲存小表的全部資料,一定程度上犧牲了空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作Broadcast Join

Broadcast Join的條件有以下幾個:

*被廣播的表需要小於 spark.sql.autoBroadcastJoinThreshold 所配置的值,預設是10M (或者加了broadcast join的hint)

*基表不能被廣播,比如 left outer join 時,只能廣播右表 在這裡插入圖片描述

2.Shuffle Hash Join

因為被廣播的表首先被collect到driver段,然後被冗餘分發到每個executor上,所以當表比較大時,採用broadcast join會對driver端和executor端造成較大的壓力。

spark可以通過分割槽的形式將大批量的資料劃分成n份較小的資料集進行平行計算.

利用key相同必然分割槽相同的這個原理,SparkSQL將較大表的join分而治之,先將表劃分成n個分割槽,再對兩個表中相對應分割槽的資料分別進行Hash Join,

這樣即在一定程度上減少了driver廣播一側表的壓力,也減少了executor端取整張被廣播表的記憶體消耗。

*Shuffle Hash Join分為兩步:

對兩張表分別按照join keys進行重分割槽,即shuffle,目的是為了讓有相同join keys值的記錄分到對應的分割槽中

對對應分割槽中的資料進行join,此處先將小表分割槽構造為一張hash表,然後根據大表分割槽中記錄的join keys值拿出來進行匹配

*Shuffle Hash Join的條件有以下幾個:

分割槽的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,預設是10M

基表不能被廣播,比如left outer join時,只能廣播右表

一側的表要明顯小於另外一側,小的一側將被廣播(明顯小於的定義為3倍小,此處為經驗值)     在這裡插入圖片描述

3.大表對大表(Sort Merge Join)

將兩張表按照join keys進行了重新shuffle,保證join keys值相同的記錄會被分在相應的分割槽。分割槽後對每個分割槽內的資料進行排序,排序後再對相應的分割槽內的記錄進行連線

因為兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;如果不同,左邊小就繼續取左邊,反之取右邊(即用即取即丟)   在這裡插入圖片描述

package day05

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * @author WangLeiKai
  *         2018/10/3  18:24
  */
object SparkSQLJoinDemo {
  def main(args: Array[String]): Unit = {
    //程式的入口
    val spark = SparkSession.builder().appName("SparkSQLJoinDemo").master("local[*]").getOrCreate()
    //匯入spark物件的隱式轉換
    import spark.implicits._



    //spark.sql.autoBroadcastJoinThreshold = -1
    //不限定小表的大小
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    // 每個分割槽的平均大小不超過spark.sql.autoBroadcastJoinThreshold設定的值
    spark.conf.set("spark.sql.join.preferSortMergeJoin", true)

    println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))


    //定義兩個集合,轉換成dataframe
    val df1: DataFrame = Seq(
      ("0", "a"),
      ("1", "b"),
      ("2", "c")
    ).toDF("id", "name")


    val df2: DataFrame = Seq(
      ("0", "d"),
      ("1", "e"),
      ("2", "f")
    ).toDF("aid", "aname")

    //重新分割槽
    df2.repartition()

    //df1.cache().count()

    //進行連線
    val result = df1.join(df2,$"id" === $"aid")

    //檢視執行計劃
    result.explain()

    //展示結果
    result.show()

    //釋放資源
    spark.stop()

  }
}

執行結果都是一樣的

在這裡插入圖片描述

檢視執行計劃: 在這裡插入圖片描述 在這裡插入圖片描述

**

這三種join在傳統的資料庫中也有體現,只是現在是分散式的。

**