spark streaming 踩過的那些坑
- 系統背景
--driver-memory 50G
- spark streaming + Kafka高階API receiver
- 目前資源分配(現在系統比較穩定的資源分配),獨立叢集
--executor-memory 8G
--num-executors 11
--executor-cores 5
- 廣播變數
1. 廣播變數的初始化
1.1.executor端,存放廣播變數的物件使用非靜態,因為靜態變數是屬於類的,不能使用建構函式來初始化。在executor端使用靜態的時候,它只是定義的時候的一個狀態,而在初始化時設定的值取不到。而使用非靜態的物件,其建構函式的初始化在driver端執行,故在叢集可以取到廣播變數的值。
2. 廣播變數的釋放
2.1.當filter增量為指定大小時,進行廣播,雖然廣播的是同一個物件,但是,廣播的ID是不一樣的,而且ID號越來越大,這說明對於廣播來說,它並不是一個物件,而只是名字一樣的不同物件,如果不對廣播變數進行釋放,將會導致executor端記憶體佔用越來越大,而一直沒有使用的廣播變數,被進行GC,會導致GC開銷超過使用上線,導致程式失敗。
2.2.解決方案:這廣播之前,先呼叫unpersist()方法,釋放不用的廣播變數
- 使用Kafka 的高階API receiver
1. 在使用receiver高階API時,由於receiver、partition、executor的分配關係,經常會導致某個executor任務比較繁重,進而影響整體處理速度
1.1.最好是一個receiver對應一個executor
2. 由於前段時間資料延遲比較嚴重,就想,能不能讓所有executor的cores都去處理資料?所以調整receiver為原來的四倍,結果系統啟動時,就一下衝上來非常大的資料量,導致系統崩潰,可見,receiver不僅跟partition的分配有關,還跟資料接收量有關
3. 在實際處理資料中,由於訊息延遲,可以看到,有的topic處理速度快有的慢,原因分析如下:
3.1.跟訊息的格式有關,有的是序列化檔案,有的事json格式,而json的解析相對於比較慢
3.2.有時候拖累整個叢集處理速度的,除了大量資料,還跟單條資料的大小有關
以下是程式跑掛的一些異常,和原因分析
問題矯正:
第一張圖片的,解決方案的倒數第二個, spark.memory.storageFraction(動態記憶體的百分比設定),應該為spark.storage.memoryFraction(靜態記憶體分配的設定) (由於原文件丟失,導致無法修改文件。)
如果有什麼問題,歡迎大家指出,共同探討,共同進步