1. 程式人生 > >Flink中的序列化失敗問題 和transent宣告

Flink中的序列化失敗問題 和transent宣告

最近在Flink的的map運算元中使用了自義定類(實現richMapFunction)來序列化中存在的問題?

一、背景介紹

在編寫Spark程式中,由於在map等運算元內部使用了外部定義的變數和函式,從而引發Task未序列化問題。然而,Spark運算元在計算過程中使用外部變數在許多情形下確實在所難免,比如在filter運算元根據外部指定的條件進行過濾,map根據相應的配置進行變換等。為了解決上述Task未序列化問題,這裡對其進行了研究和總結。
(其實在Flink中也存在該問題)

二、原因分析

連結:http://www.sjsjw.com/103/005844MYM031811/
其實看看這個原因就差不多了

三、解決方案

3.1.先說一下網上已有的解決辦法:

(這是對Spark而言的,其實對Flink也是適用的,沒試過)
  承上所述,這個問題主要是引用了某類的成員變數或函式,並且相應的類沒有做好序列化處理導致的。因此解決這個問題無非以下兩種方法:
不在(或不直接在)map等閉包內部直接引用某類(通常是當前類)的成員函式或成員變數
如果引用了某類的成員函式或變數,則需對相應的類做好序列化處理

(一)不在(或不直接在)map等閉包內部直接引用某類成員函式或成員變數

  • (1)對於依賴某類成員變數的情形
    如果程式依賴的值相對固定,可取固定的值,或定義在map、filter等操作內部,或定義在scala
    object物件中(類似於Java中的static變數)
    如果依賴值需要程式呼叫時動態指定(以函式引數形式),則在map、filter等操作時,可不直接引用該成員變數,而是在類似上面例子的getResult函式中根據成員變數的值重新定義一個區域性變數,這樣map等運算元就無需引用類的成員變數。

  • (2)對於依賴某類成員函式的情形
    如果函式功能獨立,可定義在scala object物件中(類似於Java中的static方法),這樣就無需一來特定的類。

(二)如果引用了某類的成員函式或變數,則需對相應的類做好序列化處理

對於這種情況,則需對該類做好序列化處理,首先該類繼承序列化類,然後對於不能序列化的成員變數使用“@transent”標註,告訴編譯器不需要序列化。
此外如果可以,可將依賴的變數獨立放到一個小的class中,讓這個class支援序列化,這樣做可以減少網路傳輸量,提高效率。

3.2我的解決方法:

【首先宣告:我的程式是Scala+Java混合方式實現的Flink程式(Storm遷移至Flink)】

方法1:宣告為@transent

在map()運算元中繼承了的RichMapFunction的類,其中的由於物件都需要序列化,但是對於不能夠實現的序列化介面的物件比如資料庫連線物件SqlConnection(mySQL、Postgresql等)和GeomentyFactory物件等,自身不可以序列化的物件,則宣告為@transent
再分為兩點
(1)物件在需要在map()中不停呼叫的時候,這時候需要在open()中序列化,這個跟設定的併發執行緒數有關係。比如設定併發parallel=10,則該物件被初始化10次,生成是個物件,分別被10個執行緒呼叫;
(2)假如物件只需要在在後期實用一次,比如資料庫連線SqlConnection在載入一次快取(一般是不變的快取),則最好是放在該類的建構函式中初始化,執行1次;

方法2:宣告為靜態

對於像資料庫連線物件SqlConnection等不可以序列化的物件,由於該物件是長連線不可變物件,可以宣告為靜態物件(類物件),JVM會將該物件放入永久代Perminate 中,實現Flink多執行緒共享併發訪問;
同理,所有不可變的小物件(不可變快取(如hashMap)是指小的快取物件)都可以宣告為靜態物件,因此,此時不用對該物件宣告為@transent而表明不可以序列化;

(推薦使用 :方法2:宣告為靜態物件,簡單易用!)


自身在Flink使用中遇到的問題,後續會繼續跟蹤該類問題,並更新部落格!