1. 程式人生 > >作為一個程式設計新手,我再也不怕Flink迷了我的眼!

作為一個程式設計新手,我再也不怕Flink迷了我的眼!

歡迎大家前往騰訊雲+社群,獲取更多騰訊海量技術實踐乾貨哦~

使用 Flink 編寫處理邏輯時,新手總是容易被林林總總的概念所混淆:

為什麼 Flink 有那麼多的型別宣告方式?

BasicTypeInfo.STRING_TYPE_INFO、Types.STRING 、Types.STRING() 有何區別?

TypeInfoFactory 又是什麼?

TypeInformation.of 和 TypeHint 是如何使用的呢?

接下來本文將逐步解密 Flink 的型別和序列化機制。

img圖 1:Flink 型別分類

Flink 的型別系統原始碼位於 org.apache.flink.api.common.typeinfo 包,讓我們對圖 1 深入追蹤,看一下類的繼承關係圖:

img圖 2:TypeInformation 類繼承關係圖

可以看到,圖 1 和 圖 2 是一一對應的,TypeInformation 類是描述一切型別的公共基類,它和它的所有子類必須可序列化(Serializable),因為型別資訊將會伴隨 Flink 的作業提交,被傳遞給每個執行節點。

由於 Flink 自己管理記憶體,採用了一種非常緊湊的儲存格式(見官方博文),因而型別資訊在整個資料處理流程中屬於至關重要的元資料。

TypeExtractror 型別提取

Flink 內部實現了名為 TypeExtractror 的類,可以利用方法簽名、子類資訊等蛛絲馬跡,自動提取和恢復型別資訊(當然也可以顯式宣告,即本文所介紹的內容)。

然而由於 Java 的型別擦除,自動提取並不是總是有效。因而一些情況下(例如通過 URLClassLoader 動態載入的類),仍需手動處理;例如下圖中對 DataSet 變換時,使用 .returns() 方法宣告返回型別。

這裡需要說明一下,returns() 接受三種類型的引數:字串描述的類名(例如 "String")、TypeHint(接下來會講到,用於泛型型別引數)、Java 原生 Class(例如 String.class) 等;不過字串形式的用法即將廢棄,如果確實有必要,請使用 Class.forName() 等方法來解決。

img圖 3:使用 .returns 方法宣告返回型別

下面是 ExecutionEnvironment 類的 registerType 方法,它可以向 Flink 註冊子類資訊(Flink 認識父類,但不一定認識子類的一些獨特特性,因而需要註冊),下面是 Flink-ML 機器學習庫程式碼的例子:

img圖 4:Flink-ML 註冊子類型別資訊

從下圖可以看到,如果通過 TypeExtractor.createTypeInfo(type) 方法獲取到的型別資訊屬於 PojoTypeInfo 及其子類,那麼將其註冊到一起;否則統一交給 Kryo 去處理,Flink 並不過問(這種情況下效能會變差)。

img圖 5:Flink 允許註冊自定義型別

宣告型別資訊的常見手段

通過 TypeInformation.of() 方法,可以簡單地建立型別資訊物件。

1. 對於非泛型的類,直接傳入 Class 物件即可

img圖 6:class 物件作為引數

2. 對於泛型類,需要藉助 TypeHint 來儲存泛型型別資訊

TypeHint 的原理是建立匿名子類,執行時 TypeExtractor 可以通過 getGenericSuperclass(). getActualTypeArguments() 方法獲取儲存的實際型別。

img圖 7:TypeHint 作為引數,儲存泛型資訊

3. 預定義的快捷方式

例如 BasicTypeInfo,這個類定義了一系列常用型別的快捷方式,對於 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本型別的型別宣告,可以直接使用。

img圖 8:BasicTypeInfo 快捷方式

例如下面是對 Row 型別各欄位的型別宣告,使用方法非常簡明,不再需要 new XxxTypeInfo<>(很多很多引數)

img圖 9:使用 BasicTypeInfo 快捷方式來宣告一行(Row)每個欄位的型別資訊

當然,如果覺得 BasicTypeInfo 還是太長,Flink 還提供了完全等價的 Types 類(org.apache.flink.api.common.typeinfo.Types):

img圖 10:Types 類

特別需要注意的是,flink-table 模組也有一個 Types 類(org.apache.flink.table.api.Types),用於 table 模組內部的型別定義資訊,用法稍有不同。使用 IDE 的自動 import 時一定要小心:

img圖 11:flink-table 模組的 Types 類

4. 自定義 TypeInfo 和 TypeInfoFactory

通過自定義 TypeInfo 為任意類提供 Flink 原生記憶體管理(而非 Kryo),可令儲存更緊湊,執行時也更高效。

開發者在自定義類上使用 @TypeInfo 註解,隨後建立相應的 TypeInfoFactory 並覆蓋 createTypeInfo 方法。

注意需要繼承 TypeInformation 類,為每個欄位定義型別,並覆蓋元資料方法,例如是否是基本型別(isBasicType)、是否是 Tuple(isTupleType)、元數(對於一維的 Row 型別,等於欄位的個數)等等,從而為 TypeExtractor 提供決策依據。

img圖 12:為自定義類提供型別支援(圖片未展示全部欄位)

TypeSerializer

Flink 自帶了很多 TypeSerializer 子類,大多數情況下各種自定義型別都是常用型別的排列組合,因而可以直接複用:

img圖 13:Flink 自帶的 TypeSerializer 子類概覽

如果不能滿足,那麼可以繼承 TypeSerializer 及其子類以實現自己的序列化器。

Kryo 序列化

對於 Flink 無法序列化的型別(例如使用者自定義型別,沒有 registerType,也沒有自定義 TypeInfo 和 TypeInfoFactory),預設會交給 Kryo 處理。

如果 Kryo 仍然無法處理(例如 Guava、Thrift、Protobuf 等第三方庫的一些類),有以下兩種解決方案:

\1. 可以強制使用 Avro 來替代 Kryo:

env.getConfig().enableForceAvro();   // env 代表 ExecutionEnvironment 物件, 下同

\2. 為 Kryo 增加自定義的 Serializer 以增強 Kryo 的功能:

 env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass

img圖 14:為 Kryo 增加自定義的 Serializer

以及

env.getConfig().registerTypeWithKryoSerializer(Class<?> type, T serializer)

img圖 15:為 Kryo 增加自定義的 Serializer

如果希望完全禁用 Kryo(100% 使用 Flink 的序列化機制),則可以使用以下設定,但注意一切無法處理的類都將導致異常:

env.getConfig().disableGenericTypes();

型別機制的陷阱與缺陷

金無足赤,人無完人。Flink 內建的型別系統雖然強大而靈活,但仍然有一些需要注意的點:

1. Lambda 函式的型別提取

由於 Flink 型別提取依賴於繼承等機制,而 lambda 函式比較特殊,它是匿名的,也沒有與之相關的類,所以其型別資訊較難獲取。

Eclipse 的 JDT 編譯器會把 lambda 函式的泛型簽名等資訊寫入編譯後的位元組碼中,而對於 javac 等常見的其他編譯器,則不會這樣做,因而 Flink 就無法獲取具體型別資訊了。

2. Kryo 的 JavaSerializer 在 Flink 下存在 Bug

推薦使用 org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer 而非 com.esotericsoftware.kryo.serializers.JavaSerializer 以防止與 Flink 不相容。

型別機制與記憶體管理

img圖 16:型別資訊到記憶體塊

下面以 StringSerializer 為例,來看下 Flink 是如何緊湊管理記憶體的:

img圖 17:StringSerializer 類的 serialize() 方法

下面是具體的序列化過程:

img圖 18:String 物件的序列化過程

可以看到,Flink 對於記憶體管理是非常細緻的,層次分明,程式碼也容易理解。

參考閱讀

此文已由作者授權騰訊雲+社群釋出,更多原文請點選

搜尋關注公眾號「雲加社群」,第一時間獲取技術乾貨,關注後回覆1024 送你一份技術課程大禮包!

海量技術實踐經驗,盡在雲加社群