1. 程式人生 > >Spark Streaming效能調優詳解(轉)

Spark Streaming效能調優詳解(轉)

原文連結:Spark Streaming效能調優詳解

 Spark Streaming提供了高效便捷的流式處理模式,但是在有些場景下,使用預設的配置達不到最優,甚至無法實時處理來自外部的資料,這時候我們就需要對預設的配置進行相關的修改。由於現實中場景和資料量不一樣,所以我們無法設定一些通用的配置(要不然Spark Streaming開發者就不會弄那麼多引數,直接寫死不得了),我們需要根據資料量,場景的不同設定不一樣的配置,這裡只是給出建議,這些調優不一定試用於你的程式,一個好的配置是需要慢慢地嘗試。

  1、設定合理的批處理時間(batchDuration)。

  在構建StreamingContext的時候,需要我們傳進一個引數,用於設定

Spark Streaming批處理的時間間隔。Spark會每隔batchDuration時間去提交一次Job,如果你的Job處理的時間超過了batchDuration的設定,那麼會導致後面的作業無法按時提交,隨著時間的推移,越來越多的作業被拖延,最後導致整個Streaming作業被阻塞,這就間接地導致無法實時處理資料,這肯定不是我們想要的。

  另外,雖然batchDuration的單位可以達到毫秒級別的,但是經驗告訴我們,如果這個值過小將會導致因頻繁提交作業從而給整個Streaming帶來負擔,所以請儘量不要將這個值設定為小於500ms。在很多情況下,設定為500ms效能就很不錯了。

  那麼,如何設定一個好的值呢?我們可以先將這個值位置為比較大的值(比如10S),如果我們發現作業很快被提交完成,我們可以進一步減小這個值,知道Streaming作業剛好能夠及時處理完上一個批處理的資料,那麼這個值就是我們要的最優值。

  2、增加Job並行度

  我們需要充分地利用叢集的資源,儘可能的將Task分配到不同的節點,一方面可以充分利用叢集資源;另一方面還可以及時的處理資料。比如我們使用Streaming接收來自Kafka的資料,我們可以對每個Kafka分割槽設定一個接收器,這樣可以達到負載均衡,及時處理資料(關於如何使用Streaming讀取Kafka中的資料,可以參見

《Spark Streaming和Kafka整合開發指南(一)》《Spark Streaming和Kafka整合開發指南(二)》)。

  再如類似reduceByKey()和Join函式都可以設定並行度引數。

  3、使用Kryo系列化。

  Spark預設的是使用Java內建的系列化類,雖然可以處理所有自繼承java.io.Serializable的類系列化的類,但是其效能不佳,如果這個成為效能瓶頸,可以使用Kryo系列化類,關於如何在Spark中使用Kroy,請參見《在Spark中自定義Kryo序列化輸入輸出API》。使用系列化資料可以很好地改善GC行為。

  4、快取需要經常使用的資料

  對一些經常使用到的資料,我們可以顯式地呼叫rdd.cache()來快取資料,這樣也可以加快資料的處理,但是我們需要更多的記憶體資源。

  5、清除不需要的資料

  隨著時間的推移,有一些資料是不需要的,但是這些資料是快取在記憶體中,會消耗我們寶貴的記憶體資源,我們可以通過配置spark.cleaner.ttl為一個合理的值;但是這個值不能過小,因為如果後面計算需要用的資料被清除會帶來不必要的麻煩。而且,我們還可以配置選項spark.streaming.unpersist為true(預設就是true)來更智慧地去持久化(unpersist)RDD。這個配置使系統找出那些不需要經常保有的RDD,然後去持久化它們。這可以減少Spark RDD的記憶體使用,也可能改善垃圾回收的行為。

  6、設定合理的GC

  GC是程式中最難調的一塊,不合理的GC行為會給程式帶來很大的影響。在叢集環境下,我們可以使用並行Mark-Sweep垃圾回收機制,雖然這個消耗更多的資源,但是我們還是建議開啟。可以如下配置:

1 spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC

  更多的關於GC行為的配置,請參考Java垃圾回收相關文章。這裡就不詳細介紹了。

  7、設定合理的CPU資源數

  很多情況下Streaming程式需要的記憶體不是很多,但是需要的CPU要很多。在Streaming程式中,CPU資源的使用可以分為兩大類:(1)、用於接收資料;(2)、用於處理資料。我們需要設定足夠的CPU資源,使得有足夠的CPU資源用於接收和處理資料,這樣才能及時高效地處理資料。