1. 程式人生 > >四種優化 Apache Flink 應用程式的方法

四種優化 Apache Flink 應用程式的方法

如果下面文章格式或圖片不清晰,請參見原文:https://www.iteblog.com/archives/2303.html 或直接點選下面 閱讀原文 即可進入。

Flink 是一種非常複雜的框架,它提供了多種調整其執行的方法。本文將介紹四種不同的方法來提升你的 Flink 應用程式的效能。

使用 Flink Tuples

當你使用類似於 groupBy, join, 或者 keyBy 運算元時,Flink 提供了多種用於在你的資料集上選擇 key 的方法。你可以使用 key 選擇函式,如下:

640?wx_fmt=png&wxfrom=5&wx_lazy=1

你甚至可以指定 POJO 型別中一個 field 的名字:

640?wx_fmt=png&wxfrom=5&wx_lazy=1

但是如果你現在使用的是 Flink 元組型別(tuple types)的資料,你可以簡單地指定將要作為 key 的欄位在元組中的位置:

0?wx_fmt=png

這種方法在 Flink 中將會獲得最佳的效能,但是可讀性方面呢?這是不是意味著你的程式碼看起來像下面那樣:

0?wx_fmt=png

如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

在這種情況下,提高可讀性的常見方法是建立一個繼承自 TupleX 的類,並且實現其中的 getters 和 setters。下面是 Flink Gelly 類庫中 Edge 類的實現,其中有三個 fileds,所以它直接繼承了 Tuple3 類:

0?wx_fmt=png

重用 Flink 物件

另外一種可以提升 Flink 應用程式效能的方法是在使用者自定義函式返回資料時使用可變物件(mutable objects),請看看下面的例子:

0?wx_fmt=png

正如你所看到的,在我們每次呼叫 apply 函式的時候,我們都會建立一個 Tuple2 型別的例項,這將會給垃圾回收造成很大的壓力。解決這個問題的一種方法就是反覆使用相同的例項:

0?wx_fmt=png

上面的程式碼效能會好些。雖然我們在每次呼叫的時候只建立了一個 Tuple2 例項,但是我們還間接地建立了 Long 型別的例項。為了解決這個問題, Flink 內部提供了一系列 value classes,比如:IntValue, LongValue, StringValue, FloatValue 等。這些類的重點是為內建型別提供了可變版本,所以我們可以在使用者自定義函式中重用這些型別,下面就是如何使用的例子:

0?wx_fmt=png

上面這些使用習慣在 Flink 類庫中被普遍使用,比如 Flink Gelly。

使用函式註解

另一種優化 Flink 應用程式的方法是提供一些關於使用者自定義函式如何對輸入資料進行處理的資訊。由於 Flink 無法解析和理解你的程式碼,所以你提供一些關鍵的資訊將會幫助 Flink 建立一個更加高效的執行計劃。我們可以使用三種註解:

  • @ForwardedFields – 指定輸入資料中哪些欄位保持不變並且在輸出值中使用(specifies what fields in an input value were left unchanged and are used in an output value.)。

  • @NotForwardedFields – 指定在輸出中相同位置未保留的欄位(specifies fields which were not preserved in the same positions in the output.)。

  • @ReadFields – 指定哪些欄位在計算結果的時候用到。你只能指定那些在計算中使用的欄位,而不是僅僅將資料拷貝到輸出中的欄位。(specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.)

我們來看看如何使用 ForwardedFields 註釋:

0?wx_fmt=png

上面的註釋意味著輸入元組的第一個元素將不會改變,而且在返回元組中同樣處在第一個位置。

如果你沒有改變一個元素,只不過簡單地將它移到不同的位置上,你同樣可以使用 ForwardedFields 註釋來實現。下面例子中,我們簡單地將輸入元組的位置互相交換,並且直接返回:

0?wx_fmt=png

上面例子中提到的註釋只能應用到只有一個輸入引數的函式中,比如 map 或者 flatMap。如果你有兩個輸入引數的函式,你可以分別使用 ForwardedFieldsFirst 和 ForwardedFieldsSecond 註釋來為第一和第二個引數指定一些資訊。

下面我們使用 ForwardedFieldsFirst 和 ForwardedFieldsSecond 註釋來為實現 JoinFunction 介面的類指定相關的資訊:

0?wx_fmt=png

Flink 同樣提供了 NotForwardedFieldsFirst, NotForwardedFieldsSecond, ReadFieldsFirst, 和 ReadFirldsSecond 註釋來實現相同的功能。

選擇 Join 型別

如果你為 Flink 提供了一些資訊,可以使你的 Join 操作更快,在討論這個是如何工作之前,讓我們先了解 Fliink 是如何執行 Join 操作的。

當 Flink 處理批量資料時,叢集中的每臺機器只儲存了部分的資料。為了執行 Join 操作, Apache Flink 需要找到兩個資料集所有 key 相同的資料。為了做到這一點,Flink 首先必須將兩個資料集擁有相同 key 的資料放在同一臺機器上。這裡有兩種實現策略:

  • Repartition-Repartition strategy:在這種場景下,Join 的兩個資料集分別對它們的 key 使用相同的分割槽函式進行分割槽,並經過網路傳送資料。這就意味著如果資料集非常大,這將花費相當一部分時間將資料分發出去。

  • Broadcast-Forward strategy:這種場景下,大的資料集R不做處理,另一個比較小的資料集S將全部複製到叢集中所有擁有R的一部分資料的機器上。

如果你使用一個比較小的資料集和一個比較大的資料集進行 join 操作,你可以使用 Broadcast-Forward 策略,這個很容易實現:

ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)

這種寫法表示第一個資料集要比第二個資料集小的多。

Flink 支援的其他 join 提示有以下幾種:

  • BROADCAST_HASH_SECOND – 表示第二個資料集比較小

  • REPARTITION_HASH_FIRST – 表示第一個資料集比較小

  • REPARTITION_HASH_SECOND – 表示第二個資料集有點小

  • REPARTITION_SORT_MERGE – 表示重新分割槽兩個資料集並使用排序和合並策略(sorting and merging strategy)

  • OPTIMIZER_CHOOSES – Flink 優化器將決定如何連線資料集

本文翻譯自:《Four ways to optimize your Flink applications》:https://brewing.codes/2017/10/17/flink-optimize/

猜你喜歡

歡迎關注本公眾號:iteblog_hadoop:

0、回覆 電子書獲取 本站所有可下載的電子書

11、更多大資料文章歡迎訪問https://www.iteblog.com及本公眾號(iteblog_hadoop)12、Flink中文文件:http://flink.iteblog.com
640?wx_fmt=jpeg