1. 程式人生 > >Spark Streaming 流計算優化記錄(4)-時間都去哪兒了,關於排程與空轉

Spark Streaming 流計算優化記錄(4)-時間都去哪兒了,關於排程與空轉

6. 時間都去where了,青春不能等,排程也是
除了上述優化, 我們還注意到一個奇怪的現象:
 
怎麼回事, 即使接收不到訊息都要花掉5秒?!! 雖然Spark Streaming空轉依然會產生空task, 這些空task依然會消耗序列化, 壓縮, 排程等時間, 但也不至於那麼多吧!!!
我們拿一個Stage看看, 就拿處理Kafka訊息的那個Stage作例子吧:
 
Kafka沒有任何訊息進來的情況下, 這個Stage竟然耗費我3秒青春, 有無搞錯! 時間都去where了? 
接著我們看了一下task的時間分佈圖:

 
從圖中, 我們可以看到Spark總共排程分發了兩批次task set, 每個task set的處理(含序列化和壓縮之類的工作)都不超過100毫秒, 那麼該Stage何來消耗3秒呢? 慢著, 貌似這兩批次的task set分發的時間相隔得有點長啊, 隔了2秒多. 為什麼會隔這麼就才排程一次呢?
此處要引入一個配置項” spark.locality.wait”, 它配置了本地化排程降級所需要的時間. 這裡概要補充下Spark本地化排程的知識, Spark的task一般都會分發到它所需資料的那個節點, 這稱之為”NODE_LOCAL”, 但在資源不足的情況下, 資料所在節點未必有資源處理task, 因此Spark在等待了” spark.locality.wait”所配置的時間長度後, 會退而求其次, 分發到資料所在節點的同一個機架的其它節點上, 這是”RACK_LOCAL”, 當然, 也有更慘的, 就是再等了一段” spark.locality.wait”的時間長度後, 乾脆隨便找一臺機器去跑task, 這就是”ANY”策略了. 

而從上例看到, 即使用最差的”ANY”策略進行排程, task set的處理也只是花了100毫秒, 因此, 沒必要非得為了”NODE_LOCAL”策略的生效而去等待那麼長的時間, 特別是在流計算這種場景上. 所以把” spark.locality.wait”果斷調小, 從1秒到500毫秒, 最後乾脆調到100毫秒算了.
調了之後的處理時間是醬紫的:
 
原來兩個Stage空轉需要5秒, 現在變成1秒了. 排程不能等啊.


7. 進一步減少空轉耗時
上一節以處理Kafka訊息的那個Stage作為例子, 講了如何發現時間消耗, 如何減少等待時間, 這裡再講下在沒資料處理的情況下如何非侵入式地減少不必要的空轉. (呵呵,所謂非侵入式就是不修改Spark原始碼啦,否則後期維護很煩人的)

這一節, 我們以進行資料join的Stage作為例子.
 
該Stage所做的事情就是從HDFS中載入資料, 進行轉換處理後, 快取在記憶體中, 然後與Kafka過來的資料在本機記憶體中進行join操作. 空轉時的耗時是1秒, 時間分佈如下:
 
排程等待和序列化的耗時還算正常, 但為毛在task set中啥都沒有的情況下對task set的處理都需要1秒呢?
通過研究可知, 即使join的雙方有一方沒資料的情況下, Spark依然會迴圈另一方的資料, 以按key對value進行彙總.
 
額, 就是這個迴圈耗了我們近1秒青春. 而其實在這個場景下, 當Kafka方面沒資料輸入時, 就根本不要進這個迴圈, 直接返回空就是了. 因此我們引入了新的SkipableCoGroupedRDD.

 

該RDD負責兩個不同RDD的join操作, 但與一般的join操作不同的是, 它會把第一個RDD作為是否能夠跳過join操作的參照, 若第一個RDD中根本沒有資料, 那麼整個join操作會被跳過.

使用了SkipableCoGroupedRDD的處理結果如下:
 
在空轉的情況下, 整個join的Stage的處理時間只需要0.2秒. 空轉作業的處理時間進一步降低到0.2~0.3秒.