1. 程式人生 > >Spark Streaming實時處理應用

Spark Streaming實時處理應用

1 框架一覽

  事件處理的架構圖如下所示。

locality

2 優化總結

  當我們第一次部署整個方案時,kafkaflume元件都執行得非常好,但是spark streaming應用需要花費4-8分鐘來處理單個batch。這個延遲的原因有兩點,一是我們使用DataFrame來強化資料,而強化資料需要從hive中讀取大量的資料; 二是我們的引數配置不理想。

  為了優化我們的處理時間,我們從兩方面著手改進:第一,快取合適的資料和分割槽;第二,改變配置引數優化spark應用。執行spark應用的spark-submit命令如下所示。通過引數優化和程式碼改進,我們顯著減少了處理時間,處理時間從4-8分鐘降到了低於25秒。

/opt/app/dev/spark-1.5.2/bin/spark-submit \
 --jars  \
/opt/cloudera/parcels/CDH/jars/zkclient-0.3.jar,/opt/cloudera/parcels/CDH/jars/kafka_2.10-0.8.1.1.jar,\
/opt/app/dev/jars/datanucleus-core-3.2.2.jar,/opt/app/dev/jars/datanucleus-api-jdo-3.2.1.jar,/opt/app/dev/jars/datanucleus-rdbms-3.2.1.jar \
--files /opt/app/dev/spark-1.5.2/conf/hive-site.xml,/opt/app/dev/jars/log4j-eir.properties \
--queue spark_service_pool \
--master yarn \
--deploy-mode cluster \
--conf "spark.ui.showConsoleProgress=false" \
--conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \ --conf "spark.sql.tungsten.enabled=false" \ --conf "spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory" \ --conf "spark.eventLog.enabled=true" \ --conf "spark.sql.codegen=false" \ --conf "spark.sql.unsafe.enabled=false" \ --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \ --conf "spark.streaming.backpressure.enabled=true" \ --conf "spark.locality.wait=1s" \ --conf "spark.streaming.blockInterval=1500ms" \ --conf "spark.shuffle.consolidateFiles=true" \ --driver-memory 10G \ --executor-memory 8G \ --executor-cores 20 \ --num-executors 20 \ --class com.bigdata.streaming.OurApp \ /opt/app/dev/jars/OurStreamingApplication.jar external_props.conf

  下面我們將詳細介紹這些改變的引數。

2.1 driver選項

  這裡需要注意的是,driver執行在spark on yarn的叢集模式下。因為spark streaming應用是一個長期執行的任務,生成的日誌檔案會很大。為了解決這個問題,我們限制了寫入日誌的訊息的條數, 並且用RollingFileAppender限制了它們的大小。我們也關閉了spark.ui.showConsoleProgress選項來禁用控制檯日誌訊息。

  通過測試,我們的driver因為永久代空間填滿而頻繁發生記憶體耗盡(永久代空間是類、方法等儲存的地方,不會被重新分配)。將永久代空間的大小升高到6G可以解決這個問題。

spark.driver.extraJavaOptions=-XX:MaxPermSize=6G

2.2 垃圾回收

  因為我們的spark streaming應用程式是一個長期執行的程序,在處理一段時間之後,我們注意到GC暫停時間過長,我們想在後臺減少或者保持這個時間。調整UseConcMarkSweepGC引數是一個技巧。

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \

2.3 禁用Tungsten

  Tungstenspark執行引擎主要的改進。但是它的第一個版本是有問題的,所以我們暫時禁用它。

spark.sql.tungsten.enabled=false
spark.sql.codegen=false
spark.sql.unsafe.enabled=false

2.4 啟用反壓

  Spark Streaming在批處理時間大於批間隔時間時會出現問題。換一句話說,就是spark讀取資料的速度慢於kafka資料到達的速度。如果按照這個吞吐量執行過長的時間,它會造成不穩定的情況。 即接收executor的記憶體溢位。設定下面的引數解決這個問題。

spark.streaming.backpressure.enabled=true

2.5 調整本地化和塊配置

  下面的兩個引數是互補的。一個決定了資料本地化到task或者executor等待的時間,另外一個被spark streaming receiver使用對資料進行組塊。塊越大越好,但是如果資料沒有本地化到executor,它將會通過網路移動到 任務執行的地方。我們必須在這兩個引數間找到一個好的平衡,因為我們不想資料塊太大,並且也不想等待本地化太長時間。我們希望所有的任務都在幾秒內完成。

  因此,我們改變本地化選項從3s到1s,我們也改變塊間隔為1.5s。

--conf "spark.locality.wait=1s" \
--conf "spark.streaming.blockInterval=1500ms" \

2.6 合併臨時檔案

  在ext4檔案系統中,推薦開啟這個功能。因為這會產生更少的臨時檔案。

--conf "spark.shuffle.consolidateFiles=true" \

2.7 開啟executor配置

  在你配置kafka Dstream時,你能夠指定併發消費執行緒的數量。然而,kafka Dstream的消費者會執行在相同的spark driver節點上面。因此,為了從多臺機器上面並行消費kafka topic, 我們必須例項化多個Dstream。雖然可以在處理之前合併相應的RDD,但是執行多個應用程式例項,把它們都作為相同kafka consumer group的一部分。

  為了達到這個目的,我們設定20個executor,並且每個executor有20個核。

--executor-memory 8G
--executor-cores 20
--num-executors 20

2.8 快取方法

  使用RDD之前快取RDD,但是記住在下次迭代之前從快取中刪除它。快取那些需要使用多次的資料非常有用。然而,不要使分割槽數目過大。保持分割槽數目較低可以減少,最小化排程延遲。下面的公式是我們使用的分割槽數的計算公式。

# of executors * # of cores = # of partitions