1. 程式人生 > >spark streaming效能優化

spark streaming效能優化

一 資料接收並行度調優

通過網路接收資料的時候,比如kafka或者flume,會將資料反序列化,並存儲在在Spark記憶體中。如果資料接收成為系統的瓶頸,那麼可以考慮並行化接收資料。

1.1除了建立更多輸入DStream和Receiver

每一個InputDStream都會在某個Worker上的Executor上啟動一個Receiver,該Receiver接收一個數據流。因此可以通過建立多個InputDStream,並且配置他們接收資料來源不同的分割槽資料,達到接收多個數據流的效果。比如說,一個接收兩個kafka topic的InputDStream,可以拆分成2個InputDStream,每一個分別接收一個topic的資料。這樣就會建立2個Receiver,從而並行的接收資料,進而提升吞吐量。多個DStream可以使用union運算元進行聯合,從而形成一個新的DStream,後續的操作都可以基於聯合的DStream.

intnumStreams = 5;
List<JavaPairDStream<String,String>> kafkaStreams= new ArrayList<JavaPairDStream<String,String>>(numStreams);
for (int i= 0; i < numStreams; i++) {
    kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String,String> unifiedStream= streamingContext.union(kafkaStreams

.get(0),kafkaStreams.subList(1,kafkaStreams.size()));
unifiedStream.print();

1.2 調整block 時間間隔

我們可以通過調整spark.streaming.blockInterval引數來設定產生一個block的時間間隔,預設是200ms。對於大多數Receiver來說,在將接收到的資料儲存到Spark的BlockManager之前,都會將資料根據設定的時間間隔分成構造成不同的block,然後推送給BlockManager儲存。每一個批次中block的數量決定了該批次對應的RDD的partition數量,以及針對該RDD執行轉換操作的時候建立的task數量,每一個batch對應的task數量大約是 = (batch 時間間隔)/(block時間間隔) ,即batch 時間間隔為1s,block時間間隔為200ms,相當於一個batch會包括5個block,即batch對應的RDD就有5個分割槽,也就決定task的數量是5.

如何認定batch對應的task數量太少呢?

如果每個batch的task數量低於每臺機器的CPU Core數量,那麼就說明batch的task數量是不夠的,因為所有的CPU資源無法完全被利用起來。

要為batch增加block的數量,那麼就減小block interval。然而,推薦的block interval最小值是50ms,如果低於這個數值,那麼大量task的啟動時間,可能會變成一個性能開銷點。

1.3 調大task數量

如果並行的task數量不是很多,那麼叢集資源無法充分利用,舉例來說對於分散式的reduce操作,比如reduceByKeyAndWindow或者reduceByKey等操作,預設的並行度是由spark.default. parallelism引數決定的,你可以在reduceByKey操作中傳入第二個引數,手動指定並行度,也可以調整全域性的spark.default.parallelism引數

如果輸入的資料流,如果你覺得分割槽數目太少,也可以對輸入的資料流重新分割槽,顯示對輸入資料流執行repartition操作,這樣調大分割槽數目,那麼task的數量也就大了

二 任務啟動優化

如果每一秒鐘啟動的task數量太多,假設50個,即1s的batch時間間隔和50ms的block時間間隔。如果傳送這些task去worker上的executor,那麼效能開銷會比較大,這是很難到到毫秒級的延遲了。

2.1task序列化

使用Kryo序列化類庫來序列化task,可以減小task的大小從而減少driver傳送這些task到各個executor的傳送時間,即節省網路資源

2.2 執行模式

在standalone模式下執行spark,可以達到更少的task啟動時間

三 資料序列化優化

資料序列化造成的系統開銷可以由序列化格式的優化來減小。在流式場景下,有兩種型別的資料需要序列化:

1 輸入資料:預設情況下,接收到的輸入資料,是儲存在Executor記憶體中,使用的持久化級別是MEMORY_AND_DISK_2.這就意味著資料被序列化位元組從而減小GC開銷,並且還會複製到其他節點已進行容錯。因此Receiver必須反序列化接收到的資料,然後在使用Spark的序列化格式序列化資料

2 流式計算操作生成持久化的RDD: 流式計算生成持久化RDD,可能會持久化到記憶體,這裡預設持久化級別就是MEMORY_ONLY_SER,預設就會減小GC開銷。

四 batch時間間隔優化

batch應該在生成之後就儘可能塊的處理掉,對於一個應用來說,可以通過觀察Spark UI上batch的處理時間來定。batch的處理時間必須小於batch 時間間隔,假設batch 時間間隔1s, 那麼這個批次的處理時間不應超過1s

為應用計算正確batch比較好的辦法:

給定一個很保守的batch interval,比如5s-10s,以很慢的資料接受速率進行測試,要檢查應用是否你跟的上這個資料接收速率,可以檢查每一個batch的處理時間的延遲,如果處理時間與batch interval基本吻合,那麼應用就是穩定的,否則如果batch排程延遲持續增長,那麼就意味著應用無法跟得上這個速率,也就是不穩定的。記住,由於臨時性的資料增長導致的暫時的延遲增長,可以合理的,只要延遲情況可以在短時間內恢復即可。

五 記憶體調優

如果想要使用一個視窗長度為10分鐘的window操作,那麼叢集就必須有足夠的記憶體來儲存10分鐘內的資料。如果想要使用updateStateByKey來維護許多key的state,那麼你的記憶體資源就必須足夠大。反過來說,如果想要做一個簡單的map-filter-store操作,那麼需要使用的記憶體就很少。

通常來說,通過Receiver接收到的資料,會使用MEMORY_AND_DISK_SER_2持久化級別來進行儲存,因此無法儲存在記憶體中的資料會溢寫到磁碟上。而溢寫到磁碟上,是會降低應用的效能的。因此,通常是建議為應用提供它需要的足夠的記憶體資源。建議在一個小規模的場景下測試記憶體的使用量,並進行評估。

記憶體調優的另外一個方面是垃圾回收。對於流式應用來說,如果要獲得低延遲,肯定不想要有因為JVM垃圾回收導致的長時間延遲。有很多引數可以幫助降低記憶體使用和GC開銷:

5.1DStream的持久化

預設持久化的時候會序列化為位元組,與非序列化的方式相比,這會降低記憶體和GC開銷。使用Kryo序列化機制可以進一步減少記憶體使用和GC開銷。如果還要進一步降低內庫存是用了,可以進行資料壓縮,spark.rdd.compress引數控制(預設false)。

但是CPU資源的消耗可能就大了。

5.2 清理舊資料

預設情況下,所有輸入資料和通過DStreamtransformation操作生成的持久化RDD,會自動被清理。Spark Streaming會決定何時清理這些資料,取決於transformation操作型別。

例如,你在使用視窗長度為10分鐘內的window操作,Spark會保持10分鐘以內的資料,時間過了以後就會清理舊資料。但是在某些特殊場景下,比如Spark SQL和Spark Streaming整合使用時,在非同步開啟的執行緒中,使用Spark SQL針對batch RDD進行執行查詢。那麼就需要讓Spark儲存更長時間的資料,直到Spark SQL查詢結束。可以使用streamingContext.remember()方法來實現。

5.3CMS垃圾回收器

使用並行的mark-sweep垃圾回收機制,被推薦使用,用來保持GC低開銷。雖然並行的GC會降低吞吐量,但是還是建議使用它,來減少batch的處理時間(降低處理過程中的GC開銷)。如果要使用,那麼要在driver端和executor端都開啟。

在spark-submit中使用--driver-java-options設定使spark.executor.extra

JavaOptions引數設定-XX:+UseConcMarkSweepGC。