Join優化
概述
跟傳統的關係型資料庫類似,分散式環境中的join在提供優化器“hint”(提示)以告訴優化器選擇一些執行策略。目前一些優化提示主要針對批處理中的連線(join)。在批處理中共有三個跟連線有關的轉換函式:
-
join:預設為等值連線(Equi-join),即我們平時看到的inner join;
-
outerjoin:外連線,具體細分為left-outer join、righ-outer join、full-outer join;
-
cross:交叉連線,求兩個資料集的笛卡爾積。
1.演算法分析
常用來實現連線的演算法有:hash join、sort-merge join以及nested loop join,下面我們對這三種演算法進行簡單介紹。首先hash演算法實現連線時,通常分為兩個階段:
-
build:為參與連線的兩個資料集中較小的資料集準備好雜湊表,雜湊表中的記錄包含著連線的屬性以及它對應的行。因為雜湊表是通過對連線屬性應用一個雜湊函式來訪問的,因此通過它將比掃描初始資料集更快地發現給定的連線屬性對應的行;
-
probe:一旦雜湊表構建完成,會掃描更大的資料集並通過從更小的資料集匹配雜湊表以發現相關的行。
而使用sort-merge演算法實現連線時,通常也劃分為兩個階段:
-
sort:對兩個資料集在他們的連線屬性上進行排序;
-
merge:合併排過序的資料集。
nested loop實現連線相對更容易理解,它使用兩層巢狀迴圈分別作用於兩個參與連線的資料集。
2.連線策略
通過上面的介紹,我們得知當選擇hash演算法來實現連線時,需要確定以哪個輸入端作為build端,哪個輸入端作為probe端,這是影響其執行效率的因素之一(因為通常選擇資料量較小的資料集作為build端)。而當以sort-merge演算法來實現連線時,不會區分輸入端的特殊職責,也就不存在build階段和probe階段。
為了理清演算法跟參與連線的輸入端的關係,Flink將它們區分成兩種不同策略的:本地策略以及傳輸(ship)策略。其中傳輸策略表示如何移動兩個輸入端中的資料使得它們具備連線的條件;本地策略則指兩個已在本地的輸入端資料集所執行的連線演算法。
我們來解釋一下這兩種策略,假設有兩個待連線的資料集(R和S)。傳輸策略有如下兩種:
-
Broadcast-Forward strategy (BF):該策略會將一個完整的資料集,比如R,廣播到資料集S的每一個分割槽上,而資料集S的所有資料則一直處於本地,無需網路傳輸;
-
Repartition-Repartition strategy (RR):以相同的分割槽函式以及用於連線的鍵屬性分割槽兩個資料集R、S;
正如上面已經提及的,本地策略也即連線的實現演算法也有兩種:
-
Sort-Merge-Join strategy (SM):首先對兩個輸入端的資料集在它們的連線鍵屬性上進行排序(排序階段),然後合併排過序的資料集(合併階段);
-
Hybrid-Hash-Join strategy (HH):分為構建階段和探索階段;
在不指定“Hint”的情況下,Flink在進行批處理優化時會根據成本自動選擇傳輸策略以及本地策略。優化器的一個關鍵特徵是它會根據已經存在的資料屬性來進行推理。就連線運算而言,如果某一個輸入端的資料量遠小於另一輸入端,Flink會傾向於選擇BF傳輸策略,將較小的輸入端廣播給較大的輸入端的每一個分割槽,並在本地策略中選擇HH且以較小的輸入端作為HH的構建端;如果優化器得知某個(或兩個)輸入端已排好序,那麼生成的候選計劃將不再重分割槽該輸入端,此時它更傾向於選擇RR傳輸策略以及SM本地策略。
除了優化器的自動選擇,當用戶對資料集非常瞭解的情況下,Flink定義了JoinHint允許使用者為join(inner join)指定連線策略給予優化器提示。JoinHint提供了人為選擇連線策略的靈活性,其使用方式有兩種,一種是直接指定兩個輸入端的大小:

image
另一種是直接指定連線策略:

image