1. 程式人生 > >Spark入門實戰系列--7.Spark Streaming(上)--實時流計算Spark Streaming原理介紹

Spark入門實戰系列--7.Spark Streaming(上)--實時流計算Spark Streaming原理介紹

【注】該系列文章以及使用到安裝包/測試資料 可以在《》獲取

1、Spark Streaming簡介

1.1 概述

Spark Streaming 是Spark核心API的一個擴充套件,可以實現高吞吐量的、具備容錯機制的實時流資料的處理。支援從多種資料來源獲取資料,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,從資料來源獲取資料之後,可以使用諸如map、reduce、join和window等高階函式進行復雜演算法的處理。最後還可以將處理結果儲存到檔案系統,資料庫和現場儀表盤。在“One Stack rule them all”的基礎上,還可以使用Spark的其他子框架,如叢集學習、圖計算等,對流資料進行處理。

Spark Streaming處理的資料流圖:

clip_image002

Spark的各個子框架,都是基於核心Spark的,Spark Streaming在內部的處理機制是,接收實時流的資料,並根據一定的時間間隔拆分成一批批的資料,然後通過Spark Engine處理這些批資料,最終得到處理後的一批批結果資料。

對應的批資料,在Spark核心對應一個RDD例項,因此,對應流資料的DStream可以看成是一組RDDs,即RDD的一個序列。通俗點理解的話,在流資料分成一批一批後,通過一個先進先出的佇列,然後 Spark Engine從該佇列中依次取出一個個批資料,把批資料封裝成一個RDD,然後進行處理,這是一個典型的生產者消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。

1.2 術語定義

l離散流(discretized stream)或DStream:這是Spark Streaming對內部持續的實時資料流的抽象描述,即我們處理的一個實時資料流,在Spark Streaming中對應於一個DStream 例項。

l批資料(batch data):這是化整為零的第一步,將實時流資料以時間片為單位進行分批,將流處理轉化為時間片資料的批處理。隨著持續時間的推移,這些處理結果就形成了對應的結果資料流了。

l時間片或批處理時間間隔( batch interval):這是人為地對流資料進行定量的標準,以時間片作為我們拆分流資料的依據。一個時間片的資料對應一個RDD例項。

l視窗長度(window length):一個視窗覆蓋的流資料的時間長度。必須是批處理時間間隔的倍數,

l滑動時間間隔:前一個視窗到後一個視窗所經過的時間長度。必須是批處理時間間隔的倍數

lInput DStream :一個input DStream是一個特殊的DStream,將Spark Streaming連線到一個外部資料來源來讀取資料。

1.3 Storm與Spark Streming比較

l處理模型以及延遲

雖然兩框架都提供了可擴充套件性(scalability)和可容錯性(fault tolerance),但是它們的處理模型從根本上說是不一樣的。Storm可以實現亞秒級時延的處理,而每次只處理一條event,而Spark Streaming可以在一個短暫的時間窗口裡面處理多條(batches)Event。所以說Storm可以實現亞秒級時延的處理,而Spark Streaming則有一定的時延。

l容錯和資料保證

然而兩者的代價都是容錯時候的資料保證,Spark Streaming的容錯為有狀態的計算提供了更好的支援。在Storm中,每條記錄在系統的移動過程中都需要被標記跟蹤,所以Storm只能保證每條記錄最少被處理一次,但是允許從錯誤狀態恢復時被處理多次。這就意味著可變更的狀態可能被更新兩次從而導致結果不正確。

任一方面,Spark Streaming僅僅需要在批處理級別對記錄進行追蹤,所以他能保證每個批處理記錄僅僅被處理一次,即使是node節點掛掉。雖然說Storm的Trident library可以保證一條記錄被處理一次,但是它依賴於事務更新狀態,而這個過程是很慢的,並且需要由使用者去實現。

l實現和程式設計API

Storm主要是由Clojure語言實現,Spark Streaming是由Scala實現。如果你想看看這兩個框架是如何實現的或者你想自定義一些東西你就得記住這一點。Storm是由BackType和 Twitter開發,而Spark Streaming是在UC Berkeley開發的。

Storm提供了Java API,同時也支援其他語言的API。 Spark Streaming支援Scala和Java語言(其實也支援Python)。

l批處理框架整合

Spark Streaming的一個很棒的特性就是它是在Spark框架上執行的。這樣你就可以想使用其他批處理程式碼一樣來寫Spark Streaming程式,或者是在Spark中互動查詢。這就減少了單獨編寫流批量處理程式和歷史資料處理程式。

l生產支援

Storm已經出現好多年了,而且自從2011年開始就在Twitter內部生產環境中使用,還有其他一些公司。而Spark Streaming是一個新的專案,並且在2013年僅僅被Sharethrough使用(據作者瞭解)。

Storm是 Hortonworks Hadoop資料平臺中流處理的解決方案,而Spark Streaming出現在 MapR的分散式平臺和Cloudera的企業資料平臺中。除此之外,Databricks是為Spark提供技術支援的公司,包括了Spark Streaming。

雖然說兩者都可以在各自的叢集框架中執行,但是Storm可以在Mesos上執行, 而Spark Streaming可以在YARN和Mesos上執行。

2、執行原理

2.1 Streaming架構

SparkStreaming是一個對實時資料流進行高通量、容錯處理的流式處理系統,可以對多種資料來源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等複雜操作,並將結果儲存到外部檔案系統、資料庫或應用到實時儀表盤。

l計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark Core,也就是把Spark Streaming的輸入資料按照batch size(如1秒)分成一段一段的資料(Discretized Stream),每一段資料都轉換成Spark中的RDD(Resilient Distributed Dataset),然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果儲存在記憶體中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者儲存到外部裝置。下圖顯示了Spark Streaming的整個流程。

clip_image004

圖Spark Streaming構架

l容錯性:對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分散式可重算的資料集,其記錄著確定性的操作繼承關係(lineage),所以只要輸入資料是可容錯的,那麼任意一個RDD的分割槽(Partition)出錯或不可用,都是可以利用原始輸入資料通過轉換操作而重新算出的。  

對於Spark Streaming來說,其RDD的傳承關係如下圖所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size所產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連線的,由於Spark Streaming輸入資料可以來自於磁碟,例如HDFS(多份拷貝)或是來自於網路的資料流(Spark Streaming會將網路輸入資料的每一個數據流拷貝兩份到其他的機器)都能保證容錯性,所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

clip_image006

Spark Streaming中RDD的lineage關係圖

l實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段資料的處理都會經過Spark DAG圖分解以及Spark的任務集的排程過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。

l擴充套件性與吞吐量:Spark目前在EC2上已能夠線性擴充套件到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的資料量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所做的測試,在Grep這個測試中,Spark Streaming中的每個節點的吞吐量是670k records/s,而Storm是115k records/s。

clip_image008

Spark Streaming與Storm吞吐量比較圖

2.2 程式設計模型

DStream(Discretized Stream)作為Spark Streaming的基礎抽象,它代表持續性的資料流。這些資料流既可以通過外部輸入源賴獲取,也可以通過現有的Dstream的transformation操作來獲得。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每個RDD都包含了自己特定時間間隔內的資料流。如圖7-3所示。

clip_image010

圖7-3   DStream中在時間軸下生成離散的RDD序列

clip_image012

對DStream中資料的各種操作也是對映到內部的RDD上來進行的,如圖7-4所示,對Dtream的操作可以通過RDD的transformation生成新的DStream。這裡的執行引擎是Spark。

2.2.1 如何使用Spark Streaming

作為構建於Spark之上的應用框架,Spark Streaming承襲了Spark的程式設計風格,對於已經瞭解Spark的使用者來說能夠快速地上手。接下來以Spark Streaming官方提供的WordCount程式碼為例來介紹Spark Streaming的使用方式。

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

// Create a local StreamingContext with two working thread and batch interval of 1 second.

// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999

val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into words

val words = lines.flatMap(_.split(" "))

import org.apache.spark.streaming.StreamingContext._

// Count each word in each batch

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print()

ssc.start()              // Start the computation

ssc.awaitTermination()  // Wait for the computation to terminate

1.建立StreamingContext物件 同Spark初始化需要建立SparkContext物件一樣,使用Spark Streaming就需要建立StreamingContext物件。建立StreamingContext物件所需的引數與SparkContext基本一致,包括指明Master,設定名稱(如NetworkWordCount)。需要注意的是引數Seconds(1),Spark Streaming需要指定處理資料的時間間隔,如上例所示的1s,那麼Spark Streaming會以1s為時間視窗進行資料處理。此引數需要根據使用者的需求和叢集的處理能力進行適當的設定;

2.建立InputDStream如同Storm的Spout,Spark Streaming需要指明資料來源。如上例所示的socketTextStream,Spark Streaming以socket連線作為資料來源讀取資料。當然Spark Streaming支援多種不同的資料來源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter等資料來源;

3.操作DStream對於從資料來源得到的DStream,使用者可以在其基礎上進行各種操作,如上例所示的操作就是一個典型的WordCount執行流程:對於當前時間視窗內從資料來源得到的資料首先進行分割,然後利用Map和ReduceByKey方法進行計算,當然最後還有使用print()方法輸出結果;

4.啟動Spark Streaming之前所作的所有步驟只是建立了執行流程,程式沒有真正連線上資料來源,也沒有對資料進行任何操作,只是設定好了所有的執行計劃,當ssc.start()啟動後程序才真正進行所有預期的操作。

至此對於Spark Streaming的如何使用有了一個大概的印象,在後面的章節我們會通過原始碼深入探究一下Spark Streaming的執行流程。

2.2.2 DStream的輸入源

在Spark Streaming中所有的操作都是基於流的,而輸入源是這一系列操作的起點。輸入 DStreams 和 DStreams 接收的流都代表輸入資料流的來源,在Spark Streaming 提供兩種內建資料流來源:

l  基礎來源 在 StreamingContext API 中直接可用的來源。例如:檔案系統、Socket(套接字)連線和 Akka actors;

l  高階來源 如 Kafka、Flume、Kinesis、Twitter 等,可以通過額外的實用工具類建立。

2.2.2.1 基礎來源

在前面分析怎樣使用Spark Streaming的例子中我們已看到ssc.socketTextStream()方法,可以通過 TCP 套接字連線,從從文字資料中建立了一個 DStream。除了套接字,StreamingContext 的API還提供了方法從檔案和 Akka actors 中建立 DStreams作為輸入源。

Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法可以從任何檔案系統(如:HDFS、S3、NFS 等)的檔案中讀取資料,然後建立一個DStream。Spark Streaming 監控 dataDirectory 目錄和在該目錄下任何檔案被建立處理(不支援在巢狀目錄下寫檔案)。需要注意的是:讀取的必須是具有相同的資料格式的檔案;建立的檔案必須在 dataDirectory 目錄下,並通過自動移動或重新命名成資料目錄;檔案一旦移動就不能被改變,如果檔案被不斷追加,新的資料將不會被閱讀。對於簡單的文字文,可以使用一個簡單的方法streamingContext.textFileStream(dataDirectory)來讀取資料。

Spark Streaming也可以基於自定義 Actors 的流建立DStream ,通過 Akka actors 接受資料流,使用方法streamingContext.actorStream(actorProps, actor-name)。Spark Streaming使用 streamingContext.queueStream(queueOfRDDs)方法可以建立基於 RDD 佇列的DStream,每個RDD 佇列將被視為 DStream 中一塊資料流進行加工處理。

2.2.2.2 高階來源

這一類的來源需要外部 non-Spark 庫的介面,其中一些有複雜的依賴關係(如 Kafka、Flume)。因此通過這些來源建立 DStreams 需要明確其依賴。例如,如果想建立一個使用 Twitter tweets 的資料的DStream 流,必須按以下步驟來做:

1)在 SBT 或 Maven工程裡新增 spark-streaming-twitter_2.10 依賴。

2)開發:匯入 TwitterUtils 包,通過 TwitterUtils.createStream 方法建立一個DStream。

3)部署:新增所有依賴的 jar 包(包括依賴的spark-streaming-twitter_2.10 及其依賴),然後部署應用程式。

需要注意的是,這些高階的來源一般在Spark Shell中不可用,因此基於這些高階來源的應用不能在Spark Shell中進行測試。如果你必須在Spark shell中使用它們,你需要下載相應的Maven工程的Jar依賴並新增到類路徑中。

其中一些高階來源如下:

lTwitter Spark Streaming的TwitterUtils工具類使用Twitter4j,Twitter4J 庫支援通過任何方法提供身份驗證資訊,你可以得到公眾的流,或得到基於關鍵詞過濾流。

lFlume Spark Streaming可以從Flume中接受資料。

lKafka Spark Streaming可以從Kafka中接受資料。

lKinesis Spark Streaming可以從Kinesis中接受資料。

需要重申的一點是在開始編寫自己的 SparkStreaming 程式之前,一定要將高階來源依賴的Jar新增到SBT 或 Maven 專案相應的artifact中。常見的輸入源和其對應的Jar包如下圖所示。

clip_image014

另外,輸入DStream也可以建立自定義的資料來源,需要做的就是實現一個使用者定義的接收器。

2.2.3 DStream的操作

與RDD類似,DStream也提供了自己的一系列操作方法,這些操作可以分成三類:普通的轉換操作、視窗轉換操作和輸出操作。

2.2.3.1 普通的轉換操作

普通的轉換操作如下表所示:

轉換

描述

map(func)

源 DStream的每個元素通過函式func返回一個新的DStream。

flatMap(func)

類似與map操作,不同的是每個輸入元素可以被映射出0或者更多的輸出元素。

filter(func)

在源DSTREAM上選擇Func函式返回僅為true的元素,最終返回一個新的DSTREAM 。

repartition(numPartitions)

通過輸入的引數numPartitions的值來改變DStream的分割槽大小。

union(otherStream)

返回一個包含源DStream與其他 DStream的元素合併後的新DSTREAM。

count()

對源DStream內部的所含有的RDD的元素數量進行計數,返回一個內部的RDD只包含一個元素的DStreaam。

reduce(func)

使用函式func(有兩個引數並返回一個結果)將源DStream 中每個RDD的元素進行聚 合操作,返回一個內部所包含的RDD只有一個元素的新DStream。

countByValue()

計算DStream中每個RDD內的元素出現的頻次並返回新的DStream[(K,Long)],其中K是RDD中元素的型別,Long是元素出現的頻次。

reduceByKey(func, [numTasks])

當一個型別為(K,V)鍵值對的DStream被呼叫的時候,返回型別為型別為(K,V)鍵值對的新 DStream,其中每個鍵的值V都是使用聚合函式func彙總。注意:預設情況下,使用 Spark的預設並行度提交任務(本地模式下並行度為2,叢集模式下位8),可以通過配置numTasks設定不同的並行任務數。

join(otherStream, [numTasks])

當被呼叫型別分別為(K,V)和(K,W)鍵值對的2個DStream 時,返回型別為(K,(V,W))鍵值對的一個新DSTREAM。

cogroup(otherStream, [numTasks])

當被呼叫的兩個DStream分別含有(K, V) 和(K, W)鍵值對時,返回一個(K, Seq[V], Seq[W])型別的新的DStream。

transform(func)

通過對源DStream的每RDD應用RDD-to-RDD函式返回一個新的DStream,這可以用來在DStream做任意RDD操作。

updateStateByKey(func)

返回一個新狀態的DStream,其中每個鍵的狀態是根據鍵的前一個狀態和鍵的新值應用給定函式func後的更新。這個方法可以被用來維持每個鍵的任何狀態資料。

在上面列出的這些操作中,transform()方法和updateStateByKey()方法值得我們深入的探討一下:

l  transform(func)操作

該transform操作(轉換操作)連同其其類似的 transformWith操作允許DStream 上應用任意RDD-to-RDD函式。它可以被應用於未在 DStream API 中暴露任何的RDD操作。例如,在每批次的資料流與另一資料集的連線功能不直接暴露在DStream API 中,但可以輕鬆地使用transform操作來做到這一點,這使得DStream的功能非常強大。例如,你可以通過連線預先計算的垃圾郵件資訊的輸入資料流(可能也有Spark生成的),然後基於此做實時資料清理的篩選,如下面官方提供的虛擬碼所示。事實上,也可以在transform方法中使用機器學習和圖形計算的演算法。

l  updateStateByKey操作

該 updateStateByKey 操作可以讓你保持任意狀態,同時不斷有新的資訊進行更新。要使用此功能,必須進行兩個步驟 :

(1)  定義狀態 - 狀態可以是任意的資料型別。

(2)  定義狀態更新函式 - 用一個函式指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。

讓我們用一個例子來說明,假設你要進行文字資料流中單詞計數。在這裡,正在執行的計數是狀態而且它是一個整數。我們定義了更新功能如下:

clip_image016

此函式應用於含有鍵值對的DStream中(如前面的示例中,在DStream中含有(word,1)鍵值對)。它會針對裡面的每個元素(如wordCount中的word)呼叫一下更新函式,newValues是最新的值,runningCount是之前的值。

clip_image018

2.2.3.2 視窗轉換操作

Spark Streaming 還提供了視窗的計算,它允許你通過滑動視窗對資料進行轉換,視窗轉換操作如下:

轉換

描述

window(windowLengthslideInterval)

返回一個基於源DStream的視窗批次計算後得到新的DStream。

countByWindow(windowLength,slideInterval)

返回基於滑動視窗的DStream中的元素的數量。

reduceByWindow(funcwindowLength,slideInterval)

基於滑動視窗對源DStream中的元素進行聚合操作,得到一個新的DStream。

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])

基於滑動視窗對(K,V)鍵值對型別的DStream中的值按K使用聚合函式func進行聚合操作,得到一個新的DStream。

reduceByKeyAndWindow(func,invFunc,windowLengthslideInterval, [numTasks])

一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動視窗中新的時間間隔內資料增量聚合並移去最早的與新增資料量的時間間隔內的資料統計量。例如,計算t+4秒這個時刻過去5秒視窗的WordCount,那麼我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法可以複用中間三秒的統計量,提高統計的效率。

countByValueAndWindow(windowLength,slideInterval, [numTasks])

基於滑動視窗計算源DStream中每個RDD內每個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的型別,Long是元素頻次。與countByValue一樣,reduce任務的數量可以通過一個可選引數進行配置。

clip_image020

批處理間隔示意圖

在Spark Streaming中,資料處理是按批進行的,而資料採集是逐條進行的,因此在Spark Streaming中會先設定好批處理間隔(batch duration),當超過批處理間隔的時候就會把採集到的資料彙總起來成為一批資料交給系統去處理。

對於視窗操作而言,在其視窗內部會有N個批處理資料,批處理資料的大小由視窗間隔(window duration)決定,而視窗間隔指的就是視窗的持續時間,在視窗操作中,只有視窗的長度滿足了才會觸發批資料的處理。除了視窗的長度,視窗操作還有另一個重要的引數就是滑動間隔(slide duration),它指的是經過多長時間視窗滑動一次形成新的視窗,滑動視窗預設情況下和批次間隔的相同,而視窗間隔一般設定的要比它們兩個大。在這裡必須注意的一點是滑動間隔和視窗間隔的大小一定得設定為批處理間隔的整數倍。

如批處理間隔示意圖所示,批處理間隔是1個時間單位,視窗間隔是3個時間單位,滑動間隔是2個時間單位。對於初始的視窗time 1-time 3,只有視窗間隔滿足了才觸發資料的處理。這裡需要注意的一點是,初始的視窗有可能流入的資料沒有撐滿,但是隨著時間的推進,視窗最終會被撐滿。當每個2個時間單位,視窗滑動一次後,會有新的資料流入視窗,這時視窗會移去最早的兩個時間單位的資料,而與最新的兩個時間單位的資料進行彙總形成新的視窗(time3-time5)。

對於視窗操作,批處理間隔、視窗間隔和滑動間隔是非常重要的三個時間概念,是理解視窗操作的關鍵所在。

2.2.3.3 輸出操作

Spark Streaming允許DStream的資料被輸出到外部系統,如資料庫或檔案系統。由於輸出操作實際上使transformation操作後的資料可以通過外部系統被使用,同時輸出操作觸發所有DStream的transformation操作的實際執行(類似於RDD操作)。以下表列出了目前主要的輸出操作:

轉換

描述

print()

在Driver中打印出DStream中資料的前10個元素。

saveAsTextFiles(prefix, [suffix])

將DStream中的內容以文字的形式儲存為文字檔案,其中每次批處理間隔內產生的檔案以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsObjectFiles(prefix, [suffix])

將DStream中的內容按物件序列化並且以SequenceFile的格式儲存。其中每次批處理間隔內產生的檔案以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsHadoopFiles(prefix, [suffix])

將DStream中的內容以文字的形式儲存為Hadoop檔案,其中每次批處理間隔內產生的檔案以prefix-TIME_IN_MS[.suffix]的方式命名。

foreachRDD(func)

最基本的輸出操作,將func函式應用於DStream中的RDD上,這個操作會輸出資料到外部系統,比如儲存RDD到檔案或者網路資料庫等。需要注意的是func函式是在執行該streaming應用的Driver程序裡執行的。

dstream.foreachRDD是一個非常強大的輸出操作,它允將許資料輸出到外部系統。但是 ,如何正確高效地使用這個操作是很重要的,下面展示瞭如何去避免一些常見的錯誤。

通常將資料寫入到外部系統需要建立一個連線物件(如 TCP連線到遠端伺服器),並用它來發送資料到遠端系統。出於這個目的,開發者可能在不經意間在Spark driver端建立了連線物件,並嘗試使用它儲存RDD中的記錄到Spark worker上,如下面程式碼:

clip_image022

這是不正確的,這需要連線物件進行序列化並從Driver端傳送到Worker上。連線物件很少在不同機器間進行這種操作,此錯誤可能表現為序列化錯誤(連線對不可序列化),初始化錯誤(連線物件在需要在Worker 上進行需要初始化) 等等,正確的解決辦法是在 worker上建立的連線物件。

通常情況下,建立一個連線物件有時間和資源開銷。因此,建立和銷燬的每條記錄的連線物件可能招致不必要的資源開銷,並顯著降低系統整體的吞吐量 。一個更好的解決方案是使用rdd.foreachPartition方法建立一個單獨的連線物件,然後使用該連線物件輸出的所有RDD分割槽中的資料到外部系統。

這緩解了建立多條記錄連線的開銷。最後,還可以進一步通過在多個RDDs/ batches上重用連線物件進行優化。一個保持連線物件的靜態池可以重用在多個批處理的RDD上將其輸出到外部系統,從而進一步降低了開銷。

需要注意的是,在靜態池中的連線應該按需延遲建立,這樣可以更有效地把資料傳送到外部系統。另外需要要注意的是:DStreams延遲執行的,就像RDD的操作是由actions觸發一樣。預設情況下,輸出操作會按照它們在Streaming應用程式中定義的順序一個個執行。

2.3  容錯、持久化和效能調優

2.3.1 容錯

DStream基於RDD組成,RDD的容錯性依舊有效,我們首先回憶一下SparkRDD的基本特性。

lRDD是一個不可變的、確定性的可重複計算的分散式資料集。RDD的某些partition丟失了,可以通過血統(lineage)資訊重新計算恢復;

l如果RDD任何分割槽因worker節點故障而丟失,那麼這個分割槽可以從原來依賴的容錯資料集中恢復;

l由於Spark中所有的資料的轉換操作都是基於RDD的,即使叢集出現故障,只要輸入資料集存在,所有的中間結果都是可以被計算的。

Spark Streaming是可以從HDFS和S3這樣的檔案系統讀取資料的,這種情況下所有的資料都可以被重新計算,不用擔心資料的丟失。但是在大多數情況下,Spark Streaming是基於網路來接受資料的,此時為了實現相同的容錯處理,在接受網路的資料時會在叢集的多個Worker節點間進行資料的複製(預設的複製數是2),這導致產生在出現故障時被處理的兩種型別的資料:

1)Data received and replicated :一旦一個Worker節點失效,系統會從另一份還存在的資料中重新計算。

2)Data received but buffered for replication :一旦資料丟失,可以通過RDD之間的依賴關係,從HDFS這樣的外部檔案系統讀取資料。

此外,有兩種故障,我們應該關心:

(1)Worker節點失效:通過上面的講解我們知道,這時系統會根據出現故障的資料的型別,選擇是從另一個有複製過資料的工作節點上重新計算,還是直接從從外部檔案系統讀取資料。

(2)Driver(驅動節點)失效 :如果執行 Spark Streaming應用時驅動節點出現故障,那麼很明顯的StreamingContext已經丟失,同時在記憶體中的資料全部丟失。對於這種情況,Spark Streaming應用程式在計算上有一個內在的結構——在每段micro-batch資料週期性地執行同樣的Spark計算。這種結構允許把應用的狀態(亦稱checkpoint)週期性地儲存到可靠的儲存空間中,並在driver重新啟動時恢復該狀態。具體做法是在ssc.checkpoint(<checkpoint directory>)函式中進行設定,Spark Streaming就會定期把DStream的元資訊寫入到HDFS中,一旦驅動節點失效,丟失的StreamingContext會通過已經儲存的檢查點資訊進行恢復。

最後我們談一下Spark Stream的容錯在Spark 1.2版本的一些改進:

實時流處理系統必須要能在24/7時間內工作,因此它需要具備從各種系統故障中恢復過來的能力。最開始,SparkStreaming就支援從driver和worker故障恢復的能力。然而有些資料來源的輸入可能在故障恢復以後丟失資料。在Spark1.2版本中,Spark已經在SparkStreaming中對預寫日誌(也被稱為journaling)作了初步支援,改進了恢復機制,並使更多資料來源的零資料丟失有了可靠。

對於檔案這樣的源資料,driver恢復機制足以做到零資料丟失,因為所有的資料都儲存在了像HDFS或S3這樣的容錯檔案系統中了。但對於像Kafka和Flume等其它資料來源,有些接收到的資料還只快取在記憶體中,尚未被處理,它們就有可能會丟失。這是由於Spark應用的分佈操作方式引起的。當driver程序失敗時,所有在standalone/yarn/mesos叢集執行的executor,連同它們在記憶體中的所有資料,也同時被終止。對於Spark Streaming來說,從諸如Kafka和Flume的資料來源接收到的所有資料,在它們處理完成之前,一直都快取在executor的記憶體中。縱然driver重新啟動,這些快取的資料也不能被恢復。為了避免這種資料損失,在Spark1.2釋出版本中引進了預寫日誌(WriteAheadLogs)功能。

預寫日誌功能的流程是:1)一個SparkStreaming應用開始時(也就是driver開始時),相關的StreamingContext使用SparkContext啟動接收器成為長駐執行任務。這些接收器接收並儲存流資料到Spark記憶體中以供處理。2)接收器通知driver。3)接收塊中的元資料(metadata)被髮送到driver的StreamingContext。這個元資料包括:(a)定位其在executor記憶體中資料的塊referenceid,(b)塊資料在日誌中的偏移資訊(如果啟用了)。

使用者傳送資料的生命週期如下圖所示。

clip_image024

類似Kafka這樣的系統可以通過複製資料保持可靠性。允許預寫日誌兩次高效地複製同樣的資料:一次由Kafka,而另一次由SparkStreaming。Spark未來版本將包含Kafka容錯機制的原生支援,從而避免第二個日誌。

2.3.2 持久化

與RDD一樣,DStream同樣也能通過persist()方法將資料流存放在記憶體中,預設的持久化方式是MEMORY_ONLY_SER,也就是在記憶體中存放資料同時序列化的方式,這樣做的好處是遇到需要多次迭代計算的程式時,速度優勢十分的明顯。而對於一些基於視窗的操作,如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操作,如updateStateBykey,其預設的持久化策略就是儲存在記憶體中。

對於來自網路的資料來源(Kafka、Flume、sockets等),預設的持久化策略是將資料儲存在兩臺機器上,這也是為了容錯性而設計的。

另外,對於視窗和有狀態的操作必須checkpoint,通過StreamingContext的checkpoint來指定目錄,通過 Dtream的checkpoint指定間隔時間,間隔必須是滑動間隔(slide interval)的倍數。

2.3.3 效能調優

1.  優化執行時間

l 增加並行度 確保使用整個叢集的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操作,增加其並行度以確保更為充分地使用叢集資源;

l 減少資料序列化,反序列化的負擔 Spark Streaming預設將接受到的資料序列化後儲存,以減少記憶體的使用。但是序列化和反序列話需要更多的CPU時間,因此更加高效的序列化方式(Kryo)和自定義的系列化介面可以更高效地使用CPU;

l 設定合理的batch duration(批處理時間間) 在Spark Streaming中,Job之間有可能存在依賴關係,後面的Job必須確保前面的作業執行結束後才能提交。若前面的Job執行的時間超出了批處理時間間隔,那麼後面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成後續Job的阻塞。因此設定一個合理的批處理間隔以確保作業能夠在這個批處理間隔內結束時必須的;

l  減少因任務提交和分發所帶來的負擔 通常情況下,Akka框架能夠高效地確保任務及時分發,但是當批處理間隔非常小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone和Coarse-grained Mesos模式通常會比使用Fine-grained Mesos模式有更小的延遲。

2.  優化記憶體使用

l控制batch size(批處理間隔內的資料量) Spark Streaming會把批處理間隔內接收到的所有資料存放在Spark內部的可用記憶體區域中,因此必須確保當前節點Spark的可用記憶體中少能容納這個批處理時間間隔內的所有資料,否則必須增加新的資源以提高叢集的處理能力;

l及時清理不再使用的資料 前面講到Spark Streaming會將接受的資料全部儲存到內部可用記憶體區域中,因此對於處理過的不再需要的資料應及時清理,以確保Spark Streaming有富餘的可用記憶體空間。通過設定合理的spark.cleaner.ttl時長來及時清理超時的無用資料,這個引數需要小心設定以免後續操作中所需要的資料被超時錯誤處理;

l觀察及適當調整GC策略 GC會影響Job的正常執行,可能延長Job的執行時間,引起一系列不可預料的問題。觀察GC的執行情況,採用不同的GC策略以進一步減小記憶體回收對Job執行的影響。

參考資料:

(1)《Spark Streaming》 http://blog.debugo.com/spark-streaming/

1、例項演示

1.1 流資料模擬器

1.1.1 流資料說明

在例項演示中模擬實際情況,需要源源不斷地接入流資料,為了在演示過程中更接近真實環境將定義流資料模擬器。該模擬器主要功能:通過Socket方式監聽指定的埠號,當外部程式通過該埠連線並請求資料時,模擬器將定時將指定的檔案資料隨機獲取傳送給外部程式。

1.1.2 模擬器程式碼

import java.io.{PrintWriter}

import java.net.ServerSocket

import scala.io.Source

object StreamingSimulation {

// 定義隨機獲取整數的方法

def index(length: Int) = {

import java.util.Random

val rdm = new Random

rdm.nextInt(length)

}

def main(args: Array[String]) {

// 呼叫該模擬器需要三個引數,分為為檔案路徑、埠號和間隔時間(單位:毫秒)

if (args.length != 3) {

System.err.println("Usage: <filename> <port> <millisecond>")

System.exit(1)

}

// 獲取指定檔案總的行數

val filename = args(0)

val lines = Source.fromFile(filename).getLines.toList

val filerow = lines.length

// 指定監聽某埠,當外部程式請求時建立連線

val listener = new ServerSocket(args(1).toInt)

while (true) {

val socket = listener.accept()

new Thread() {

override def run = {

println("Got client connected from: " + socket.getInetAddress)

val out = new PrintWriter(socket.getOutputStream(), true)

while (true) {

Thread.sleep(args(2).toLong)

// 當該埠接受請求時,隨機獲取某行資料傳送給對方

val content = lines(index(filerow))

println(content)

out.write(content + '\n')

out.flush()

}

socket.close()

}

}.start()

}

}

}

clip_image002

1.1.3 生成打包檔案

【注】可以參見第3課《Spark程式設計模型(下)--IDEA搭建及實戰》進行打包

clip_image004

在打包配置介面中,需要在Class Path加入:/app/scala-2.10.4/lib/scala-swing.jar /app/scala-2.10.4/lib/scala-library.jar /app/scala-2.10.4/lib/scala-actors.jar,各個jar包之間用空格分開,

點選選單Build->Build Artifacts,彈出選擇動作,選擇Build或者Rebuild動作,使用如下命令複製打包檔案到Spark根目錄下

cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar

cp LearnSpark.jar /app/hadoop/spark-1.1.0/

ll /app/hadoop/spark-1.1.0/

clip_image006

1.2 例項1:讀取檔案演示

1.2.1 演示說明

在該例項中Spark Streaming將監控某目錄中的檔案,獲取在間隔時間段內變化的資料,然後通過Spark Streaming計算出改時間段內單詞統計數。

1.2.2 演示程式碼

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext