1. 程式人生 > >Spark Streaming原始碼解讀之Receiver在Driver的精妙實現全生命週期徹底研究和思考

Spark Streaming原始碼解讀之Receiver在Driver的精妙實現全生命週期徹底研究和思考

在Spark Streaming中對於ReceiverInputDStream來說,都是現實一個Receiver,用來接收資料。而Receiver可以有很多個,並且執行在不同的worker節點上。這些Receiver都是由ReceiverTracker來管理的。

在ReceiverTracker的start方法中,會建立一個訊息通訊體ReceiverTrackerEndpoint:

/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if
 (isTrackerStarted) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker"new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo(
"ReceiverTracker started") trackerState = Started } }

然後再呼叫launchReceivers()方法:

private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo(
"Starting " + receivers.length + " receivers") endpoint.send(StartAllReceivers(receivers)) }

在上面的程式碼中,首先會從InputDStream中獲取Receiver。每個InputDStream對於一個Receiver。而對Spark Streaming程式來說,可以有多個InputDStream 。

這裡需要說明的是,它會執行一個方法runDummySparkJob(),從命名上可以看出,這是一個虛擬的Job。該Job的主要作用是讓receivers儘量的分散到不同的worker上執行。

也行你會想Master不是知道系統中有哪些worker嗎?直接用這些worker上的Executor不就可以了嗎?這裡會有一個問題,可能worker上的Executor宕機了,但是master並不知道。這樣就會導致receiver被分配到一個不能執行的Executor上。使用了runDummySparkJob()方法後,在通過BlockManager獲取到的Executor肯定當前是“活著”的。

怎麼實現的呢?

private def runDummySparkJob(): Unit = { if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 5050).map(x => (x, 1)).reduceByKey(_ _

相關推薦

Spark Streaming原始碼解讀Receiver在Driver的精妙實現生命週期徹底研究思考

在Spark Streaming中對於ReceiverInputDStream來說,都是現實一個Receiver,用來接收資料。而Receiver可以有很多個,並且執行在不同的worker節點上。這些Receiver都是由ReceiverTracker來管理的。

Spark定製班第9課:Spark Streaming原始碼解讀Receiver在Driver的精妙實現生命週期徹底研究思考

本期內容: 1. Receiver啟動的方式設想 2. Receiver啟動原始碼徹底分析 1. Receiver啟動的方式設想   Spark Streaming是個執行在Spark Core上的應用程式。這個應用程式既要接收資料,還要處理資料,這些都是在分散式的

Spark 定製版:010~Spark Streaming原始碼解讀流資料不斷接收生命週期徹底研究思考

本講內容: a. 資料接收架構設計模式 b. 資料接收原始碼徹底研究 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們給大傢俱體分析了Receiver啟動的方式及其啟動設計帶來的多個

Spark 定製版:015~Spark Streaming原始碼解讀No Receivers徹底思考

本講內容: a. Direct Acess b. Kafka 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們講Spark Streaming中一個非常重要的內容:State狀態管理

Spark Streaming原始碼解讀資料清理內幕徹底解密

本篇部落格的主要目的是: 1. 理清楚Spark Streaming中資料清理的流程 組織思路如下: a) 背景 b) 如何研究Spark Streaming資料清理? c) 原始碼解析

Spark 定製版:013~Spark Streaming原始碼解讀Driver容錯安全性

本講內容: a. ReceiverBlockTracker容錯安全性 b. DStreamGraph和JobGenerator容錯安全性 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,

第15課:Spark Streaming原始碼解讀No Receivers徹底思考

背景:      目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義R

Spark Streaming原始碼解讀No Receivers詳解

背景: 目前No Receivers在企業中使用的越來越多。No Receivers具有更強的控制度,語義一致性。No Receivers是我們操作資料來源自然方式,操作資料來源使用一個封裝器,且是RDD型別的。所以Spark Streaming就產生了自定義

Spark Streaming原始碼解讀Driver中的ReceiverTracker詳解

本篇博文的目標是: Driver的ReceiverTracker接收到資料之後,下一步對資料是如何進行管理 一:ReceiverTracker的架構設計 1. Driver在Executor啟動Receiver方式,每個Receiver都封裝成一個Tas

Spark Streaming原始碼解讀State管理updateStateByKeymapWithState解密

源地址:http://blog.csdn.net/snail_gesture/article/details/5151058 背景:  整個Spark Streaming是按照Batch Duractions劃分Job的。但是很多時候我們需要算過去的一天甚

Spark——Streaming原始碼解析容錯

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 ​ 策略 優點 缺點 (1) 熱備

Spark——Streaming原始碼解析資料的產生與匯入

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 資料的產生與匯入主要分為以下五個部分

Spark——Streaming原始碼解析DAG定義

此文是從思維導圖中匯出稍作調整後生成的,思維腦圖對程式碼瀏覽支援不是很好,為了更好閱讀體驗,文中涉及到的原始碼都是刪除掉不必要的程式碼後的虛擬碼,如需獲取更好閱讀體驗可下載腦圖配合閱讀: 此博文共分為四個部分: DAG定義 Job動態生成 資料的產生與匯入 容錯 1. DStream 1.1. RD

Spark MLlib原始碼解讀樸素貝葉斯分類器,NaiveBayes

Spark MLlib 樸素貝葉斯NaiveBayes 原始碼分析 基本原理介紹 首先是基本的條件概率求解的公式。 P(A|B)=P(AB)P(B) 在現實生活中,我們經常會碰到已知一個條件概率,求得兩個時間交換後的概率的問題。也就是在已知P(A

Hikaricp原始碼解讀(5)——物理連線生命週期介紹

5、物理連線生命週期介紹 HikariCP中的連線取用流程如下: 其中HikariPool負責對資源連線進行管理,而ConcurrentBag則是作為物理連線的共享資源站,PoolEntry則是對物理連線的1-1封裝。 PoolEntry通過borro

Spark SQL 原始碼分析Physical Plan 到 RDD的具體實現

  我們都知道一段sql,真正的執行是當你呼叫它的collect()方法才會執行Spark Job,最後計算得到RDD。 lazy val toRdd: RDD[Row] = executedPlan.execute()  Spark Plan基本包含4種操作型別,即Bas

Spark原始碼解讀RDD構建轉換過程

上一節講了Spark原始碼解讀之Context的初始化過程,發現其實一行簡單的new SparkContext(sparkConf)程式碼,spark內部會去做很多事情。這節主要講RDD的構建和轉換過

Hybrid----優秀開原始碼解讀JS與iOS Native Code互調的優雅實現方案

轉載自:http://blog.csdn.net/yanghua_kobe/article/details/8209751 簡介 它優雅地實現了在使用UIWebView時JS與ios 的ObjC nativecode之間的互調,支援訊息傳送、接收、訊息

JVM原始碼分析Attach機制實現完全解讀

Attach是什麼 在講這個之前,我們先來點大家都知道的東西,當我們感覺執行緒一直卡在某個地方,想知道卡在哪裡,首先想到的是進行執行緒dump,而常用的命令是jstack ,我們就可以看到如下執行緒棧了 2014-06-18 12:56:14 Full thread dump Java HotSpot(

【React原始碼解讀】- 元件的實現

前言 react使用也有一段時間了,大家對這個框架褒獎有加,但是它究竟好在哪裡呢? 讓我們結合它的原始碼,探究一二!(當前原始碼為react16,讀者要對react有一定的瞭解) 回到最初 根據react官網上的例子,快速構建react專案 npx create-react-app