1. 程式人生 > >淺談Flink批處理優化器之Join優化

淺談Flink批處理優化器之Join優化

跟傳統的關係型資料庫類似,Flink提供了優化器“hint”(提示)以告訴優化器選擇一些執行策略。目前優化提示主要針對批處理中的連線(join)。在批處理中共有三個跟連線有關的轉換函式:

  • outerJoin:外連線,具體細分為left-outer join、right-outer join、full-outer join;
  • cross:交叉連線,求兩個資料集的笛卡爾積;

完全展開之後共有五種,這也符合ANSI-standard SQL對連接種類的劃分。

下文當我們提及“join”時,主要指equi-join,而當我們想表達outer-join時,我們會直接使用“外連線”,當我們想泛指時,我們將使用“連線”這個詞。

常用來實現連線的演算法有:hash join、sort-merge join以及nested loop join,下面我們對這三種演算法進行簡單介紹。首先,當基於hash演算法實現連線時,通常劃分為兩個階段:

  1. build:為參與連線的兩個資料集中較小的資料集準備好雜湊表,雜湊表中的記錄包含著連線的屬性以及它對應的行。因為雜湊表是通過對連線屬性應用一個雜湊函式來訪問的,因此通過它將比掃描初始的資料集更快地發現給定的連線屬性對應的行;
  2. probe:一旦雜湊表構建完成,會掃描更大的資料集並通過從更小的資料集匹配雜湊表以發現相關的行;

而使用sort-merge演算法實現連線時,通常也劃分為兩個階段:

  1. sort:對兩個資料集在它們的連線鍵屬性上進行排序;
  2. merge:合併排過序的資料集;

nested loop實現連線相對更容易理解,它使用兩層巢狀迴圈分別作用於兩個參與連線的資料集。

在Flink的DataSet API中,hash和sort-merge演算法都可被選擇用於實現join和outerJoin,而nested loop只用於實現cross join。

通過上面的介紹,我們得知當選擇hash演算法來實現連線時,需要確定以哪個輸入端作為build端,哪個輸入端作為probe端,這是影響其執行效率的因素之一(因為通常選擇資料量較小的資料集作為build端)。因此,以hash演算法來實現連線時,而不同的選擇顯然對應著不同的運算子描述器,列舉如下:

  • HashJoinBuildFirstProperties
  • HashJoinBuildSecondProperties
  • HashLeftOuterJoinBuildFirstDescriptor
  • HashLeftOuterJoinBuildSecondDescriptor
  • HashRightOuterJoinBuildFirstDescriptor
  • HashRightOuterJoinBuildSecondDescriptor
  • HashFullOuterJoinBuildFirstDescriptor
  • HashFullOuterJoinBuildSecondDescriptor

而當以sort-merge演算法來實現連線時,不會區分輸入端的特殊職責,也就不存在build階段和probe階段,因此運算子描述器只有如下四種:

  • SortMergeInnerJoinDescriptor:
  • SortMergeLeftOuterJoinDescriptor:
  • SortMergeRightOuterJoinDescriptor:
  • SortMergeFullOuterJoinDescriptor:

以上這麼多運算子描述器,主要是為它們設定不同的執行策略(DriverStrategy),不同的執行策略直接導致了不同的執行成本。

為了理清演算法跟參與連線的輸入端的關係,Flink將它們區分成兩種不同策略的:本地策略以及傳輸(ship)策略。其中傳輸策略表示如何移動兩個輸入端中的資料使得它們具備連線的條件;本地策略則指兩個已在本地的輸入端資料集所執行的連線演算法。

我們來解釋一下這兩種策略,假設有兩個待連線的資料集(R和S)。傳輸策略有如下兩種:

  • Broadcast-Forward strategy (BF):該策略會將一個完整的資料集,比如R,廣播到資料集S的每一個分割槽上,而資料集S的所有資料則一直處於本地,無需網路傳輸;
  • Repartition-Repartition strategy (RR):以相同的分割槽函式以及用於連線的鍵屬性分割槽兩個資料集R、S;

正如上面已經提及的,本地策略也即連線的實現演算法也有兩種:

  • Sort-Merge-Join strategy (SM):首先對兩個輸入端的資料集在它們的連線鍵屬性上進行排序(排序階段),然後合併排過序的資料集(合併階段);
  • Hybrid-Hash-Join strategy (HH):分為構建階段和探索階段;

在不指定“Hint”的情況下,Flink在進行批處理優化時會根據成本自動選擇傳輸策略以及本地策略。優化器的一個關鍵特徵是它會根據已經存在的資料屬性來進行推理。就連線運算而言,如果某一個輸入端的資料量遠小於另一輸入端,Flink會傾向於選擇BF傳輸策略,將較小的輸入端廣播給較大的輸入端的每一個分割槽,並在本地策略中選擇HH且以較小的輸入端作為HH的構建端;如果優化器得知某個(或兩個)輸入端已排好序,那麼生成的候選計劃將不再重分割槽該輸入端,此時它更傾向於選擇RR傳輸策略以及SM本地策略。

除了優化器的自動選擇,當用戶對資料集非常瞭解的情況下,Flink定義了JoinHint允許使用者為join(inner join)指定連線策略給予優化器提示。JoinHint提供了人為選擇連線策略的靈活性,其使用方式有兩種,一種是直接指定兩個輸入端的大小:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
    result1 = input1.joinWithTiny(input2)    //提示優化器第二個資料集比第一個資料集小得多
        .where(0)
        .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
    result2 = input1.joinWithHuge(input2)    //提示優化器第二個資料集比第一個資料集大得多
        .where(0)
        .equalTo(0);

另一種是直接指定連線策略:

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");

當前有如下的這些策略可供選擇:

  • OPTIMIZER_CHOOSES:將選擇權交予Flink優化器,相當於沒有給提示;
  • BROADCAST_HASH_FIRST:廣播第一個輸入端,同時基於它構建一個雜湊表,而第二個輸入端作為探索端,選擇這種策略的場景是第一個輸入端規模很小;
  • BROADCAST_HASH_SECOND:廣播第二個輸入端並基於它構建雜湊表,第一個輸入端作為探索端,選擇這種策略的場景是第二個輸入端的規模很小;
  • REPARTITION_HASH_FIRST:該策略會導致兩個輸入端都會被重分割槽,但會基於第一個輸入端構建雜湊表。該策略適用於第一個輸入端資料量小於第二個輸入端的資料量,但這兩個輸入端的規模仍然很大,優化器也是當沒有辦法估算大小,沒有已存在的分割槽以及排序順序可被使用時系統預設採用的策略;
  • REPARTITION_HASH_SECOND:該策略會導致兩個輸入端都會被重分割槽,但會基於第二個輸入端構建雜湊表。該策略適用於兩個輸入端的規模都很大,但第二個輸入端的資料量小於第一個輸入端的情況;
  • REPARTITION_SORT_MERGE:輸入端被以流的形式進行連線併合併成排過序的輸入。該策略適用於一個或兩個輸入端都已排過序的情況;

對應到優化器中,JoinHint被用來指定建立何種運算子描述器,由於JoinHint只適應於join,所以它只對應如下這些運算子描述器:

  • HashJoinBuildFirstProperties
  • HashJoinBuildSecondProperties
  • SortMergeInnerJoinDescriptor

因此,如果使用者給出了JoinHint,則資料屬性(其實這裡主要是DriverStrategy)會通過以上三種運算子描述器來提供:

joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;

switch (joinHint) {
    case BROADCAST_HASH_FIRST:
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
        break;
    case BROADCAST_HASH_SECOND:
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
        break;
    case REPARTITION_HASH_FIRST:
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
        break;
    case REPARTITION_HASH_SECOND:
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
        break;
    case REPARTITION_SORT_MERGE:
        list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
        break;
    case OPTIMIZER_CHOOSES:
        list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
        break;
    default:
        throw new CompilerException("Unrecognized join hint: " + joinHint);
}

由程式碼段可見,當將選擇權交給優化器時,它會將三種運算子描述器都作為資料屬性,供後續生成候選計劃時再剔除。

除了針對join的提示外,Flink還提供了針對求交叉連線的提示CrossHint,該提示主要是針對輸入端的資料量大小。使用示例如下:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple4<Integer, String, Integer, String>>
    udfResult = input1.crossWithTiny(input2)        //提示第二個資料集非常小
    // apply any Cross function (or projection)
    .with(new MyCrosser());

DataSet<Tuple3<Integer, Integer, String>>
    projectResult = input1.crossWithHuge(input2)    //提示第二個資料集非常大
    // apply a projection (or any Cross function)
    .projectFirst(0,1).projectSecond(1);

不同於Join提示,Cross提示被表述為不同的API。從程式碼層面上來看,CrossHint有三個列舉值:

  • OPTIMIZER_CHOOSES:將選擇權交給Flink優化器;
  • FIRST_IS_SMALL:第一個輸入端小於第二個輸入端;
  • SECOND_IS_SMALL:第二個輸入端資料量小於第一個輸入端;

在建立相關運算子描述器CrossHint被用來指定特定的構造引數,比如是允許第一個輸入端廣播還是第二個輸入端廣播。交叉連線的實現演算法為nested-loop,關於運算子描述器,考慮到以哪個資料集作為內、外層迴圈以及以阻塞模型還是流模型來處理這兩個因素,有四種實現:

  • CrossBlockOuterFirstDescriptor:以第二個輸入端作為內迴圈,第一個輸入端作為外迴圈且以阻塞形式處理;
  • CrossBlockOuterSecondDescriptor:以第一個輸入端作為內迴圈,第二個輸入端作為外迴圈且以阻塞形式處理;
  • CrossStreamOuterFirstDescriptor:以第二個輸入端作為內迴圈,第一個輸入端作為外迴圈且以流模型處理;
  • CrossStreamOuterSecondDescriptor:以第一個輸入端作為內迴圈,第二個輸入端作為外迴圈且以流模型處理;

且需要注意的是,不同的處理模型,哪個輸入端作為內外迴圈是相反的:

else if (hint == CrossHint.SECOND_IS_SMALL) {
    ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
    list.add(new CrossBlockOuterSecondDescriptor(false, true));
    list.add(new CrossStreamOuterFirstDescriptor(false, true));
    this.dataProperties = list;
}
else if (hint == CrossHint.FIRST_IS_SMALL) {
    ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
    list.add(new CrossBlockOuterFirstDescriptor(true, false));
    list.add(new CrossStreamOuterSecondDescriptor(true, false));
    this.dataProperties = list;
}

但廣播哪個輸入端是一致的。

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat

QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group