1. 程式人生 > >Spark 定製版:010~Spark Streaming原始碼解讀之流資料不斷接收全生命週期徹底研究和思考

Spark 定製版:010~Spark Streaming原始碼解讀之流資料不斷接收全生命週期徹底研究和思考

本講內容:

a. 資料接收架構設計模式
b. 資料接收原始碼徹底研究

注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。

上節回顧

上一講中,我們給大傢俱體分析了Receiver啟動的方式及其啟動設計帶來的多個問題:

a. 如果有多個InputDStream,那就要啟動多個Receiver,每個Receiver也就相當於分片partition,那我啟動Receiver的時候理想的情況下是在不同的機器上啟動Receiver,但是Spark Core的角度來看就是應用程式,感覺不到Receiver的特殊性,所以就會按照正常的Job啟動的方式來處理,極有可能在一個Executor上啟動多個Receiver;這樣的話就可能導致負載不均衡

b. 有可能啟動Receiver失敗,只要叢集存在,Receiver就不應該啟動失敗

c. 從執行過程中看,一個Reveiver就是一個partition的話,Reveiver的啟動伴隨一個Task啟動,如果Task啟動失敗,以Task啟動的Receiver也會失敗

由此,我們通過原始碼分析,徹底解析了Spark Streaming是如何解決這些問題的:

a. Spark使用一個Job啟動一個Receiver.最大程度的保證了負載均衡

b. Spark Streaming已經指定每個Receiver執行在那些Executor上,在Receiver執行之前就指定了執行的地方

c. 如果Receiver啟動失敗,此時並不是Job失敗,在內部會重新啟動Receiver

開講

本講我們主要給大家介紹Spark Streaming在接收資料的全生命週期貫通;

a. 當有Spark Streaming有應用程式的時候Spark Streaming會持續不斷的接收資料

b. 一般Receiver和Driver不在一個程序中的,所以接收到資料之後要不斷的彙報給Driver

c. Spark Streaming要接收資料肯定要使用訊息迴圈器,迴圈器不斷的接收到資料之後,然後將資料儲存起來,再將儲存完的資料彙報給Driver

d. Sparkstreaming接收資料的整個流程類似於MVC模式,M就是Receiver,V就是Driver,C就是ReceiverSupervisor

e. ReceiverSupervisor是控制器,Receiver的啟動是靠ReceiverTracker啟動的,Receiver接收到資料之後是靠ReceiverSupervisor儲存資料的。然後Driver就獲得元資料也就是介面,通過介面來操作底層的資料,這個元資料就相當於指標

Spark Streaming接收資料流程如下:

這裡寫圖片描述

接收資料的時候肯定有一個迴圈器不斷的接收資料,接收到資料肯定也有儲存器,儲存過之後向Driver彙報。接收資料和儲存資料當然要分為兩個不同的模組。

這裡寫圖片描述

ReceiverSupervisorImpl是receiver的監控器,同時負責receiver的寫操作 這個方法需要傳入一個Iterator,實時上裡邊就只有一個Receiver

獲得receiver,這個receiver是根據資料輸入來源InputDstream獲得的receiver。以SocketInputDstream為例,它的receiver就是SocketReceiver.這裡的receiver只是一個引用,並沒有被例項化。作為一個引數傳入ReceiverSupervisorImpl

這裡寫圖片描述

為了啟動Receiver啟動了一個spark作業,每一個Receiver的啟動都會有一個作業來負責,Receiver是一個一個的啟動的如果是將所有的Receiver作為一個作業的不同task來啟動會有很多弱點

a. Reciver啟動可能失敗進而導致應用程式失敗
b. 執行的過程中會有任務傾斜的問題,將所有的Receiver作為一個作業的不同task來執行是採用的spark core的排程方式,在很不幸的情況下會出現所有Receiver執行在一個節點上,Receiver要不斷的接收資料,需要消耗很多資源,就會導致這個節點負載特別大。

將每個Receiver都作為一個job來執行就會最大可能的負載均衡,不過這樣也有可能失敗,失敗之後不會重試job,而是從新schedule提交一個新的job來執行

Receiver,並且不會在之前執行的executor上啟動,只要sparkstreaming程式不停止,假如Receiver出故障就會不休止的進行重新echedule並啟動,確保Receiver一定會啟動還有很重要的一點是,當重新啟動一個Receiver時,是用一個執行緒池在新的執行緒中啟動的

這裡寫圖片描述

ReceiverSupervisorImpl負責處理Receiver接收到的資料,處理之後彙報給ReceiverTracker,所以ReceiverSupervisorImpl內部有和ReceiverTracker進行通訊的endpoint。這個負責向ReceiverTracker傳送訊息。

private val trackerEndpoint = RpcUtils.makeDriverRef(“ReceiverTracker”, env.conf,env.rpcEnv)

這個負責接收ReceiverTracker傳送的訊息,CleanupOldBlocks是用來清除執行完的每個batch的Blocks,UpdateRateLimit是用來隨時調整限流(限流其實是限的資料儲存的速度)

這裡寫圖片描述

ReceiverSupervisor的start方法

這裡寫圖片描述

在onStart中啟動的是BlockGenerator,BlockGenerator是把接收到的一條一條的資料生成block儲存起來,一個BlockGenerator只服務於一個Receiver。所以BlockGenerator要在Receiver啟動之前啟動

這裡寫圖片描述

BlockGenerator種有一個定時器。這個定時器每隔一定(預設是200ms,和設定的batchduration無關)的時間就執行如下方法。這個方法就是把接收到的資料一條一條的放入到這個buffer快取中,再把這個buffer按照一定的時間或者尺寸合併成block。除了定時器以外還有一條執行緒不停的把生成的block交給blockmanager儲存起來。

這裡寫圖片描述

下面來看startReceiver方法

這裡寫圖片描述

在啟動Receiver之前還要向ReceiverTracker請求是否可以啟動Receiver。當返回是true才會啟動。ReceiverTracker接收到彙報的資訊就把註冊Receiver的資訊。

這裡寫圖片描述

Receiver的啟動只是呼叫receiver.onStart(),Receiver就在work節點上運行了

以SocketReceiver為例我看看它的onStart方法

這裡寫圖片描述