1. 程式人生 > >streaming中partition裡用執行緒池非同步優化

streaming中partition裡用執行緒池非同步優化

點選hadoop123關注我喲

最知名的Hadoop/Spark大資料技術分享基地,分享hadoop/spark技術內幕hadoop/spark最新技術進展hadoop/spark行業技術應用釋出hadoop/spark相關職位和求職資訊hadoop/spark技術交流聚會講座以及會議等。

一、Spark Streaming概述

Spark是美國加州伯克利大學AMP實驗室推出的新一代分散式計算框架,其核心概念是RDD,一個只讀的、有容錯機制的分散式資料集,RDD可以全部快取在記憶體中,並在多次計算中重複使用。相比於MapReduce程式設計模型,Spark具有以下幾個優點:

  1. 更大的靈活性和更高的抽象層次,使得使用者用更少的程式碼即可實現同樣的功能;

  2. 適合迭代演算法,在MapReduce程式設計模型中,每一輪迭代都需要讀寫一次HDFS,磁碟IO負載很大,相比之下,Spark中的RDD可以快取在記憶體中,只需第一次讀入HDFS檔案,之後迭代的資料全都儲存在記憶體中,這使得程式的計算速度可提升約10-100倍。

SparkStreamingSpark生態系統中的重要組成部分,在實現上覆用Spark計算引擎。如圖1所示,Spark Streaming支援資料來源有很多,如KafkaFlumeTCP等。SparkStreaming內部的資料表示形式為DStream

DiscretizedStream離散的資料流介面設計與RDD非常相似,這使得它Spark使用者非常友好。SparkStreaming的核心思想是把流式處理轉化為“微批處理”,即以時間為單位切分資料流,每個切片內的資料對應一個RDD,進而可以採用Spark引擎進行快速計算。由於SparkStreaming採用了微批處理方式,是近實時的處理系統,而不是嚴格意義上的流式處理系統。

1Spark Streaming資料流

        另一個著名的開源流式計算引擎是Storm,這是一個真正的流式處理系統,它每次從資料來源讀一條資料,然後單獨處理。相比於SparkStreaming

Storm有更快速的響應時間(小於一秒),更適合低延遲的應用場景,比如信用卡欺詐系統,廣告系統等。相比之下,SparkStreaming的優勢是吞吐量大,響應時間也可以接受(秒級),並且相容Spark系統中的其他工具庫如MLlibGraphX。對於時間不敏感且流量很大的系統,Spark Streaming是更優的選擇。

二、Spark StreamingHulu應用

Hulu是美國的專業線上視訊網站,每天會有大量使用者線上觀看視訊,進而產生大量使用者觀看行為資料,這些資料通過收集系統進入Hulu的大資料平臺,從而進行儲存和進一步處理。在大資料平臺之上,各個團隊會根據需要設計相應的演算法對資料進行分析和挖掘以便產生商業價值:推薦團隊從這些資料裡挖掘出使用者感興趣的內容並做精準推薦,廣告團隊根據使用者的歷史行為推送最合適的廣告,資料團隊從資料的各個維度進行分析從而為公司的策略制定提供可靠依據。

Hulu大資料平臺的實現依循Lambda架構。Lambda架構是一個通用的大資料處理框架,包含離線的批處理層、線上的加速層和服務層三部分,具體如圖2所示。服務層一般使用HTTP服務或自定製的客戶端對外提供資料訪問,離線的批處理層一般使用批處理計算框架SparkMapReduce進行資料分析,線上的加速層一般使用流式實時計算框架SparkStreamingStorm進行資料分析


2lambda架構原理圖

對於實時計算部分,Hulu內部使用了KafkaCodisSparkStreaming面按照資料流的過程,介紹我們的專案。

1.收集資料 -從伺服器日誌中收集資料,流程如圖3所示:

  1. 來自網頁、手機App、機頂盒等裝置的使用者產生視訊觀看、廣告點選等行為,這些行為資料記錄在各自的Nginx服務的日誌中;

  2. 使用Flume將使用者行為資料同時匯入HDFSKafka,其中HDFS中的資料用於離線分析,而Kafka中資料則用於流式實時分析。

3Hulu資料收集流程

2. 儲存標籤資料 - Hulu使用Hbase儲存使用者標籤資料,包括基本資訊如性別、年齡、是否付費,以及其他模型推測出來的偏好屬性。這些屬性需要作為計算模型的輸入,同時HBase隨機讀取的速度比較慢,所以需要將資料同步到快取伺服器中以加快資料讀取速度。Redis是一個應用廣泛的開源快取伺服器,一個免費開源的高效能Key-Value資料庫,但其本身是個單機系統,不能很好地支援大量資料的快取。為解決Redis擴充套件性差的問題,豌豆莢開源了Codis,一個分散式Redis解決方案。HuluCodis打成Docker映象,並實現一鍵式構建快取系統,附帶自動監控和修復功能。為了更精細的監控,我們構建了多個Codis快取,分別是:

  1. codis-profile,同步HBase中的使用者屬性;

  2. codis-action,快取來自Kafka的使用者行為;

  3. codis-result,記錄計算結果

3.  實時處理資料 -準備就緒,啟動Spark Streaming程式

1)SparkStreaming啟動Kafka Receiver,持續Kafka伺服器拉資料;

2)每隔兩秒,Kafka的資料被整理成一個RDD,交給Spark引擎處理;

3)對一條使用者行為,Spark會從codis-action快取中拿到該使用者的行為記錄,然後把新的行為追加進去;

4)Sparkcodis-actioncodis-profile中獲得該使用者的所有相關屬性,然後執行廣告推薦的計算模型,最後把結果寫入codis-result,進而供服務層實時讀取這些結果

三、Spark Streaming優化經驗

        實踐中,業務邏輯首先保證完成,使得在Kafka輸入資料量較小的情況下系統穩定執行,且輸入輸出滿足專案需求。然後開始調優,修改SparkStreaming的引數,比如Executor的數量,Core的數量,Receiver的流量等。最後發現僅調引數無法完全滿足本專案的業務場景,所以有更進一步的優化方案,總結如下:

1.Executor初始化

很多機器學習的模型在第一次執行時,需要執行初始化方法,還會連線外部的資料庫,常常需要5-10分鐘,這會成為潛在的不穩定因素。在Spark Streaming應用中,當Receiver完成初始化,它就開始源源不斷地接收資料,並且由Driver定期排程任務消耗這些資料。如果剛啟動時Executor需要幾分鐘做準備,會導致第一個作業一直沒有完成,這段時間內Driver不會排程新的作業。這時候在Kafka Receiver端會有資料積壓,隨著積壓的資料量越來越大,大部分資料會撐過新生代進入老年代,進而給Java GC帶來嚴重的壓力,容易引發應用程式崩潰。

本專案的解決方案是,修改Spark核心,在每個Executor接收任務之前先執行一個使用者自定義的初始化函式,初始化函式中可以執行一些獨立的使用者邏輯。示例程式碼如下:

  // sc:SparkContext, setupEnvironmentHulu擴充套件的API

  sc.setupEnvironment(() => {

    application.initialize() // 使用者應用程式初始化,需執行幾分鐘

    println(“Invoke executor  setup method successfully.”)

  })

該方案需要更改Spark的任務排程器,首先將每個Executor設定為未初始化狀態。此時,排程器只會給未初始化狀態的Executor分配初始化任務(執行前面提到的初始化函式)。等初始化任務完畢,排程器更新Executor的狀態為已初始化,這樣的Executor才可以分配正常的計算任務。

2.非同步處理Task中的業務邏輯

本專案中,模型的輸入引數均來自Codis,甚至模型內部也可能訪問外部儲存,直接導致模型計算時長不穩定,很多時間消耗在網路等待上。

為提高系統吞吐量,增大並行度是比較通用的優化方案,但在本專案的場景中並不適用。因為Spark作業的排程策略是,等待上一個作業的所有Task執行完畢,然後排程下一個作業。如果單個Task的執行時間不穩定,易發生個別Task拖慢整個作業的情況,以至於資源利用率不高,系統吞吐量上不去;甚至並行度越大,該問題越嚴重。一種常用的解決Task不穩定的方案是增大Spark Streamingmicro batch的時間間隔,該方案會使整個實時系統的延遲變長,並不推薦。

該問題的解決方案是非同步處理Task中的業務邏輯。如下文的程式碼所示,同步方案中,Task內執行業務邏輯,處理時間不定;非同步方案中,Task把業務邏輯嵌入執行緒,交給執行緒池執行,Task立刻結束,ExecutorDriver報告執行完畢,非同步處理的時間非常短,在100ms以內。另外,當執行緒池中積壓的執行緒數量太大時(程式碼中qsize>100的情況),會暫時使用同步處理,配合反壓機制(見下文的引數spark.streaming.backpressure.enabled),可以保證不會因為資料積壓過多而導致系統崩潰。為設定合適的執行緒池大小,我們藉助JVisualVM工具監控ExecutorCPU使用率,通過調整引數找到最優併發執行緒數。經實驗驗證,該方案大大提高了系統的吞吐量。

  // 同步處理

  // 函式runBusinessLogic Task中的業務邏輯,執行時間不定

  rdd.foreachPartition(partition  => runBusinessLogic (partition))

  // 非同步處理,threadPool是執行緒池

   rdd.foreachPartition(partition => {

    val  qsize = threadPool.getQueue.size // 執行緒池中積壓的執行緒數

    if  (qsize > 100) {

      runBusinessLogic(partition)  // 暫時同步處理

    }

     threadPool.execute(new Runnable {

       override def run() = runBusinessLogic(partition)

    })

  })

非同步化Task也存在缺點:如果Executor發生異常,存放線上程池中的業務邏輯無法重新計算,會導致部分資料丟失。經實驗驗證,僅當Executor異常崩潰時有資料丟失,且不常見,在本專案的場景中可以接受。

3.Kafka Receiver的穩定性

本專案使用SparkStreaming中的Kafka Receiver本質上呼叫Kafka官方的客戶端ZookeeperConsumerConnector。其策略是每個客戶端在Zookeeper的固定路徑下把自己註冊為臨時節點,於是所有客戶端都知道其他客戶端的存在,然後自動協調和分配Kafka的資料資源。該策略存在一個弊端,當一個客戶端與Zookeeper的連線狀態發生改變(斷開或者連上),所有的客戶端都會通過Zookeeper協調,重新分配Kafka的資料資源;在此期間所有客戶端都斷開與Kafka的連線,系統接收不到Kafka的資料,直到重新分配成功。如果網路質量不佳,並且Receiver的個數較多,這種策略會造成資料輸入不穩定,很多SparkStreaming使用者遇到這樣的問題。在我們的系統中,該策略並沒有產生明顯的負面影響。值得注意的是,Kafka客戶端與Zookeeper有個預設的引數zookeeper.session.timeout.ms=6000,表示客戶端與Zookeeper連線的session有效時間為6秒,我們的客戶端多次出現因為Full GC超過6秒而與Zookeeper斷開連線,之後再次連線上,期間所有客戶端都受到影響,系統表現不穩定。所以專案設定引數zookeeper.session.timeout.ms=30000

4.YARN資源搶佔問題

       Hulu內部,Spark Streaming這樣的長時服務與MapRedueSparkHive等批處理應用共享YARN叢集資源。在共享環境中,經常因一個批處理應用佔用大量網路資源或者CPU資源,導致Spark Streaming服務不穩定(儘管我們採用了CGroup進行資源隔離,但效果不佳)。更嚴重的問題是,如果個別Container崩潰Driver需要向YARN申請新的Container,或者如果整個應用崩潰需要重啟,SparkStreaming不能保證很快申請到足夠的資源,也就無法保證線上服務的質量。為解決該問題,Hulu使用label-based scheduling的排程策略,從YARN叢集中隔離出若干節點專門執行SparkStreaming和其他長時服務,避免與批處理程式競爭資源。

5.完善監控資訊

監控反映系統執行的效能狀態,也是一切優化的基礎。SparkStreaming Web介面提供了比較豐富的監控資訊,同時本專案依據業務邏輯的特點增加了更多監控。Hulu使用GraphiteGrafana作為第三方監控系統,本專案把系統中關鍵的效能引數(如計算時長和次數)傳送給Graphite伺服器,就能夠在Grafana網頁上看到直觀的統計圖。


4Graphite監控資訊,展示了Kafka中日誌的剩餘數量,一條線對應於一個partition的歷史餘量

4是統計Kafka中日誌的剩餘數量,一條線對應於一個partition的歷史餘量,大部分情況下餘量接近零,符合預期。圖中09:55左右日誌餘量開始出現很明顯的尖峰,之後又迅速逼近零。事後經過多種資料核對,證實Kafka的資料一直穩定,而當時Spark Streaming執行作業突然變慢,反壓機制生效,於是Kafka Receiver減小讀取日誌的速率,造成Kafka資料積壓;一段時間之後SparkStreaming又恢復正常,快速消耗了Kafka中的資料餘量。

直觀的監控系統能有效地暴露問題,進而理解和強化系統。對於不同的業務邏輯,需要監控的資訊也不相同。在我們的實踐中,主要的監控指標有:

  1. Kafka的剩餘資料量

  2. Spark的作業執行時間和排程時間

  3. 每個Task的計算時間

  4. Codis的訪問次數、時間、命中率

另外,有指令碼定期分析這些統計資料,出現異常則發郵件報警。比如圖4 Kafka的日誌餘量過大時,會有連續的報警郵件。我們的經驗是,監控越細緻,之後的優化工作越輕鬆。同時,優秀的監控也需要對系統深刻的理解。

6.引數優化

下表列出本專案中比較關鍵的幾個引數:

spark.yarn.max.executor.failures

Executor允許的失敗上限如果超過該上限,整個Spark Streaming會失敗,需要設定比較大

spark.yarn.executor.memoryOverhead

ExecutorJVM的開銷,與堆記憶體不一樣,設定太小會導致記憶體溢位異常

spark.receivers.num

Kafka Receiver的個數

spark.streaming.receiver.maxRate

每個Receiver能夠接受資料的最大速率;這個值超過峰值約50%

spark.streaming.backpressure.enabled

反壓機制;如果目前系統的延遲較長,Receiver端會自動減小接受資料的速率,避免系統因資料積壓過多而崩潰

spark.locality.wait

系統排程Task會盡量考慮資料的區域性性,如果超過spark.locality.wait設定時間的上限,就放棄區域性性;該引數直接影響Task的排程時間

spark.cleaner.ttl

Spark系統內部的元資訊的超時時間;Streaming長期執行,元資訊累積太多會影響效能

四、總結

        Spark Streaming的產品上線執行一年多,期間進行了多次Spark版本升級,從最早期的0.8版本到最近1.5.x版本總體上Spark Streaming是一款優秀的實時計算框架,可以在線上使用。但仍然存在一些不足,包括:

1.Spark同時使用堆內和堆外的記憶體,缺乏一些有效的監控資訊,遇到OOM時分析和除錯比較困難;

2.缺少Executor初始化介面;

3.Spark採用函數語言程式設計方式,抽象層次高,好處是使用方便,壞處是理解和優化困難;

4.新版本的Spark有一些異常,如Shuffle過程中Block丟失、記憶體溢位。