SparkSQL中的三種Join及其實現(broadcast join、shuffle hash join和sort merge join)
1.小表對大表(broadcast join)
將小表的資料分發到每個節點上,供大表使用。executor儲存小表的全部資料,一定程度上犧牲了空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作Broadcast Join
Broadcast Join的條件有以下幾個:
*被廣播的表需要小於 spark.sql.autoBroadcastJoinThreshold 所配置的值,預設是10M (或者加了broadcast join的hint)
*基表不能被廣播,比如 left outer join 時,只能廣播右表
2.Shuffle Hash Join
因為被廣播的表首先被collect到driver段,然後被冗餘分發到每個executor上,所以當表比較大時,採用broadcast join會對driver端和executor端造成較大的壓力。
spark可以通過分割槽的形式將大批量的資料劃分成n份較小的資料集進行平行計算.
利用key相同必然分割槽相同的這個原理,SparkSQL將較大表的join分而治之,先將表劃分成n個分割槽,再對兩個表中相對應分割槽的資料分別進行Hash Join,
這樣即在一定程度上減少了driver廣播一側表的壓力,也減少了executor端取整張被廣播表的記憶體消耗。
*Shuffle Hash Join分為兩步:
對兩張表分別按照join keys進行重分割槽,即shuffle,目的是為了讓有相同join keys值的記錄分到對應的分割槽中
對對應分割槽中的資料進行join,此處先將小表分割槽構造為一張hash表,然後根據大表分割槽中記錄的join keys值拿出來進行匹配
*Shuffle Hash Join的條件有以下幾個:
分割槽的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,預設是10M
基表不能被廣播,比如left outer join時,只能廣播右表
一側的表要明顯小於另外一側,小的一側將被廣播(明顯小於的定義為3倍小,此處為經驗值)
3.大表對大表(Sort Merge Join)
將兩張表按照join keys進行了重新shuffle,保證join keys值相同的記錄會被分在相應的分割槽。分割槽後對每個分割槽內的資料進行排序,排序後再對相應的分割槽內的記錄進行連線
因為兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;如果不同,左邊小就繼續取左邊,反之取右邊(即用即取即丟)
package day05
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @author WangLeiKai
* 2018/10/3 18:24
*/
object SparkSQLJoinDemo {
def main(args: Array[String]): Unit = {
//程式的入口
val spark = SparkSession.builder().appName("SparkSQLJoinDemo").master("local[*]").getOrCreate()
//匯入spark物件的隱式轉換
import spark.implicits._
//spark.sql.autoBroadcastJoinThreshold = -1
//不限定小表的大小
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
// 每個分割槽的平均大小不超過spark.sql.autoBroadcastJoinThreshold設定的值
spark.conf.set("spark.sql.join.preferSortMergeJoin", true)
println(spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))
//定義兩個集合,轉換成dataframe
val df1: DataFrame = Seq(
("0", "a"),
("1", "b"),
("2", "c")
).toDF("id", "name")
val df2: DataFrame = Seq(
("0", "d"),
("1", "e"),
("2", "f")
).toDF("aid", "aname")
//重新分割槽
df2.repartition()
//df1.cache().count()
//進行連線
val result = df1.join(df2,$"id" === $"aid")
//檢視執行計劃
result.explain()
//展示結果
result.show()
//釋放資源
spark.stop()
}
}
執行結果都是一樣的
檢視執行計劃:
**
這三種join在傳統的資料庫中也有體現,只是現在是分散式的。
**