1. 程式人生 > >Spark2.1.0文件:Spark Streaming 程式設計指南(上)

Spark2.1.0文件:Spark Streaming 程式設計指南(上)

本文翻譯自spark官方文件,僅翻譯了Scala API部分,目前版本為2.1.0,如有疏漏錯誤之處請多多指教。

原文地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html

因文件篇幅較長故分為上下兩篇,本文為上篇,主要包括概述、入門示例、基本概念三部分

概述

Spark Streaming是核心Spark API的擴充套件,可實現可擴充套件、高吞吐量、可容錯的實時資料流處理。資料可以從諸如Kafka,Flume,Kinesis或TCP套接字等眾多來源獲取,並且可以使用由高階函式(如map,reduce,join和window)開發的複雜演算法進行流資料處理。最後,處理後的資料可以被推送到檔案系統,資料庫和實時儀表板。而且,您還可以在資料流上應用Spark提供的機器學習和圖處理演算法。


在內部,它的工作原理如下。Spark Streaming接收實時輸入資料流,並將資料切分成批,然後由Spark引擎對其進行處理,最後生成“批”形式的結果流。


Spark Streaming將連續的資料流抽象為discretizedstreamDStream。 可以從諸如Kafka,Flume和Kinesis等來源的輸入資料流中建立DStream,或者通過對其他DStream應用高階操作來建立。在內部,DStream 由一個RDD序列表示。

本指南介紹如何開始利用DStreams編寫Spark Streaming程式。您可以在Scala,Java或Python中編寫SparkStreaming程式(在Spark 1.2中引入),所有這些都在本指南中介紹。 您可以在本指南中找到標籤,讓您可以選擇不同語言的程式碼段(譯者注:本文內容僅翻譯了Scala部分,如果想學習其他語言介面,請參閱官網)。

一個小例子

在我們詳細介紹如何編寫自己的SparkStreaming程式之前,我們先看一下一個簡單的Spark Streaming程式是什麼樣子的。假設我們有一個數據伺服器正在對一個TCP套接字進行偵聽,然後需要統計接收的文字資料中的每個單詞的出現頻率。那麼你需要這樣做:

首先,我們將Spark Streaming相關的類和StreamingContext的一些隱式轉換匯入到我們的環境中,以便為我們需要的其他類(如DStream)新增有用的方法。StreamingContext是所有流功能的主要入口點。我們建立一個帶有兩個執行執行緒(譯者注:如果要執行本例,必須確保機器cpu核心大於2)的本地StreamingContext,並且設定流資料每批的間隔為1秒。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

使用此context,我們可以建立一個DStream,它表示來自特定主機名(例如localhost)和埠(例如9999)TCP源的流資料。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

在這行程式碼中,DStream表示從資料伺服器接收的資料流。此DStream中的每個記錄都是一行文字。接下來,我們要將每行文字以空格符為分隔符切分成一個個單詞。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap是一個一對多的DStream操作,該操作通過從源DStream中的每個記錄生成多個新記錄來建立新的DStream。在這種情況下,每一行將被分割成多個單詞,並將單詞流表示為單詞DStream。接下來,我們對這些單詞進行計數。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

單詞DStream進一步對映(一對一變換)到(word,1) 鍵值對的DStream,然後進行聚合以獲得每批資料中的單詞的頻率。最後,wordCounts.print()將列印每秒產生的計數結果中的若干條記錄。

請注意,當執行這些程式碼時,Spark Streaming僅是設定了預計算流程,目前為止這些計算還沒有真正的開始執行。在設定好所有計算操作後,要開始真正的執行過程,我們最終需要呼叫如下方法:
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完整的程式碼可以在SparkStreaming示例NetworkWordCount中找到。

如果您已經下載並構建了Spark,則可以以下面的方式執行此示例。在執行spark程式之前您將首先需要執行Netcat(大多數類Unix系統中的一個小型實用程式)作為資料伺服器。

$ nc -lk 9999

然後,開啟另外一個終端,鍵入一下命令啟動示例

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

然後,在執行netcat伺服器的終端中輸入的任何行將每秒進行單詞計數並列印在螢幕上。 執行效果像下面這樣。


基本概念

接下來,我們將越過前面的簡單示例,闡述一些Spark Streaming的基礎知識。

連線

與Spark類似,Spark Streaming的相關依賴可通過Maven Central獲得。 要編寫自己的Spark Streaming程式,您必須將以下依賴項新增到SBT或Maven專案中。

Maven:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.0</version>
</dependency>

SBT:

libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"

如果需要從SparkStreaming核心API中沒有包含的來源(如Kafka,Flume和Kinesis)採集資料,您必須將相應的artifact :spark-streaming-xyz_2.11新增到依賴關係中。例如,一些常用的artifact如下所示。

Source

Artifact

Kafka

spark-streaming-kafka-0-8_2.11

Flume

spark-streaming-flume_2.11

Kinesis

spark-streaming-kinesis-asl_2.11 [Amazon Software License]

有關最新列表,請參閱Mavenrepository,獲取支援的sources和artifacts的完整列表。

初始化StreamingContext

要初始化一個SparkStreaming程式,必須先建立一個StreamingContext物件,它是所有Spark Streaming方法的主要入口點。

StreamingContext物件可以從SparkConf物件中建立。

import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName引數是應用程式在叢集UI上顯示的名稱。 master是Spark,Mesos或YARN叢集的URL,或者一個特殊的“local [*]”字串來讓程式以本地模式執行。在具體的實踐中,當您在叢集上執行程式時,不需要在程式中硬編碼master引數,而是使用spark-submit提交應用程式並將master的URL以指令碼引數的形式傳入。但是,對於本地測試和單元測試,您可以通過“local[*]”來執行Spark Streaming程式(請確保本地系統中的cpu核心數夠用)。 需要注意的是,StreamingContext會內在的建立一個SparkContext的例項(所有Spark功能的起始點),你可以通過ssc.sparkContext訪問到這個例項。

批處理的時間視窗長度必須根據應用程式的延遲要求和可用的叢集資源進行設定。有關詳細資訊,請參閱“效能調優”部分。

StreamingContext物件還可以從一個現有的SparkContext例項中建立。

import org.apache.spark.streaming._
val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

定義context後,您必須執行以下操作。

1、通過建立input DStreams的形式來定義資料的輸入源。

2、通過將轉換和輸出操作應用於DStream來定義流式計算。

3、開始接收資料並呼叫streamingContext.start() 方法開始進行資料處理。

4、使用streamingContext.awaitTermination() 等待處理停止(手動停止或由錯誤引發)。

5、可以使用streamingContext.stop() 手動停止資料處理。

請記住以下幾點:

1、 一旦一個context開始運作,就不能設定或新增新的流計算。

2、 一旦一個上下文被停止,它將無法重新啟動。

3、 同一時刻,一個JVM中只能有一個StreamingContext處於活動狀態。

4、 StreamingContext上的stop() 方法也會停止SparkContext。 要僅停止StreamingContext(保持SparkContext活躍),請將stop() 方法的可選引數stopSparkContext設定為false。

5、 只要前一個StreamingContext在下一個StreamingContext被建立之前停止(不停止SparkContext),SparkContext就可以被重用來建立多個StreamingContext。

離散流(DStreams)

DiscretizedStreamDStream是Spark Streaming對流式資料的基本抽象。它表示連續的資料流,這些連續的資料流可以是從資料來源接收的輸入資料流,也可以是通過對輸入資料流執行轉換操作而生成的經處理的資料流。在內部,DStream由一系列連續的RDD表示(關於RDD的介紹參見“Spark程式設計指南”)。如下圖所示,DStream中的每個RDD都包含一定時間間隔內的資料。

任何定義於DStream之上的處理操作都將被轉換為對底層RDD的操作。例如,在之前的示例中,我們將一行行文字組成的流轉換為單詞流,具體做法為:將flatMap操作應用於名為lines的 DStream中的每個RDD上,以生成words DStream的RDD。如下圖所示。

這些底層的RDD轉換操作由Spark引擎完成計算。定義在DStream之上的操作可以隱藏大部分的底層細節,為開發人員提供更高級別的API以方便使用。 這些操作將在後面的章節中詳細討論。

輸入DStreams和接收器

輸入DStreams是表示從流媒體源接收的輸入資料流的DStream。在第一個示例中,lines是一個輸入DStream,因為它表示從netcat伺服器接收的資料流。 每個輸入DStream(除了檔案流,本節稍後討論這個特例)與一個名為ReceiverScala docJava doc)的物件相關聯,該物件從資料來源接收資料並將其儲存在Spark叢集的記憶體中等待處理。

Spark Streaming提供了兩類內建的流資料來源:

基本資料來源(Basicsources):StreamingContext API中直接提供的資料來源獲取方式。例如:檔案系統和套接字連線。

高階資料來源(Advancedsources):如Kafka,Flume,Kinesis等,可以通過額外的實用工具類來獲得。使用這些資料來源需要新增額外的依賴,可以參考之前的“基本概念”章節中的第一小節。

我們將稍後對每個類別中的一些資料來源進行探討。

請注意,如果要在流式資料處理程式中並行接收多個數據流,則可以建立多個輸入DStream(這將在“效能調優”一文中進一步討論)。這將建立多個接收器,同時接收多個數據流。但是請注意,Spark worker/executor是一個長期執行的任務,所以它會佔據分配給Spark Streaming應用程式的一個cpu核心。因此,一定要記住:你必須分配給Spark Streaming應用程式足夠的cpu核心(在local模式下執行時指“執行緒”)來對接收到的資料進行處理,以及執行資料接收器。

要記住兩點:

1、當以本地模式執行Spark Streaming程式時,不要將master URL設定為“local”或“local [1]”。這兩者意味著在本地只使用一個執行緒執行任務。如果您正在使用基於receiver(例如sockets,Kafka,Flume等)的輸入DStream,則這一個執行緒將用於執行receiver,就沒有執行緒來對接受到的資料進行處理了。因此,當在本地執行SparkStreaming程式時,請務必使用“local[n]”作為master URL,其中n大於要執行的接收器數量(有關如何設定master的資訊,請參閱Spark Properties)。

2、將邏輯擴充套件到在叢集上執行時,分配給Spark Streaming應用程式的核心數量必須大於接收器數量。否則系統雖然能夠收到資料,但無法對資料進行處理。

基本資料來源

在第一個小示例中,我們使用了ssc.socketTextStream(...)方法,該方法通過TCP套接字連線接手文字資料並基於這個資料流建立了一個DStream。除了套接字之外,StreamingContext 的API還提供了將檔案作為輸入源從而建立DStream的方法,下面介紹Spark Streaming支援的幾種基本資料來源。

1、 檔案流:可以從任何支援HDFS API的檔案系統(即:HDFS,S3,NFS等)中以流的方式讀取資料,讀取資料並建立DStream的方式如下所示:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming將對目錄dataDirectory進行監視,並處理使用者在該目錄中建立的任何檔案(不支援讀取巢狀目錄中的檔案)。 請注意:

1)所有檔案必須具有相同的資料格式。

2)在dataDirectory目錄中,檔案只能通過原子性的移動或重新命名(atomically moving or renaming)的方式建立。

3)檔案被移動進來後,就不能再更改。因為,如果檔案被不斷追加內容,新追加的部分不會被讀取。

對於讀取簡單文字檔案,有一個更簡單的方法streamingContext.textFileStream(dataDirectory)。 所有的檔案流不需要執行接收器(receiver),因此不需要分配核心。

注意:fileStream在Python API中不可用,你只能使用textFileStream。

2、 基於自定義接收器的流:開發者可以自定義接收器並通過自定義接收器接收的資料流來建立DStream。有關詳細資訊,請參閱自定義接收器指南

3、 RDD佇列作為流: 如果需要利用測試資料來測試Spark Streaming應用程式,您還可以使用streamingContext.queueStream(queueOfRDDs)建立基於RDD佇列的DStream。推送到佇列中的每個RDD將被視為DStream中的一批資料,並像流一樣進行處理。

有關從套接字和檔案獲取資料流的更多詳細資訊,請參閱StreamingContext for Scala,JavaStreamingContext for Java和Python的StreamingContext中相關函式的API文件。

高階資料來源

使用這些高階的資料來源需要與外部非Spark庫進行連線,某些資料來源還具有複雜的依賴關係(例如Kafka和Flume)。因此,為了最小化由依賴引發的版本衝突相關的問題,從這些資料來源建立DStream的功能已被移動到單獨的庫中,你可以在需要的時候手動新增這些依賴(詳見本文“連線”小節)。

請注意,這些高階源在Sparkshell中不可用,因此在shell中無法測試基於這些高階源的應用程式。如果您真的想在Spark shell中使用它們,則必須下載相應的Maven artifact的JAR及其依賴項,並將其新增到類路徑中。

一些高階資料來源如下所示:

Kafka:Spark Streaming 2.1.0相容Kafka代理版本0.8.2.1或更高版本。有關詳細資訊,請參閱“Kafka整合指南”。

Flume:Spark Streaming 2.1.0相容Flume 1.6.0。有關詳細資訊,請參閱“Flume整合指南”。

Kinesis:Spark Streaming 2.1.0與Kinesis Client Library 1.2.1相容。有關詳細資訊,請參閱“Kinesis整合指南”。

自定義資料來源

也可以通過自定義資料來源建立輸入DStream。你需要做的是實現一個使用者定義(user-defined)的接收器(請參見下一部分詳細瞭解)可以從自定義源接收資料並將其推入到Spark中。有關詳細資訊,請參閱自定義接收器指南

Receiver的可靠性

基於“可靠性”我們可以將資料來源分為兩類。一些資料來源(如Kafka和Flume)允許對傳輸的資料進行確認。如果系統從這些可靠的資料來源獲取資料,並可以正確地確認接收到的資料,則可以確保任何故障都不會導致資料丟失。我們可以對接收器作如下兩種分類:

1、 可靠接收器(Reliable Receiver):當資料被接受並被以多副本的形式儲存在Spark中時,可靠接收器會正確地向可靠資料來源傳送確認資訊。

2、 非可靠接收器(UnreliableReceiver非可靠接收器不會向資料來源傳送任何確認資訊。 這可以用於不支援確認機制的資料來源,或者雖然是從可靠資料來源接受資料,但是不希望或不需要進行復雜的確認時,可以將該接收器用於對接可靠資料來源。

“自定義接收器指南”中討論瞭如何編寫可靠接收器的細節。

DStreams之上的Transformation操作

與RDD的運算元類似,transformation操作允許修改來自輸入DStream的資料。DStreams支援許多常規Spark RDD上的transformation操作。一些常見的如下。

Transformation

Meaning

map(func)

利用方法func對源DStream中的元素分別進行處理,並返回一個新的DStream。

flatMap(func)

和map類似,不過每個輸入元素可以被對映為0或多個輸出元素。

filter(func)

選取被func方法計算後返回true的元素,形成新的DSteeam並返回。

repartition(numPartitions)

通過增加或減少分割槽數改變DStream的並行度。

union(otherStream)

將源DStream和otherDStream中所有元素取並集,形成一個新的DStream並返回。

count()

計算DStream中的每個RDD中的元素個數,每個RDD返回一個“單元素RDD”,這些單元素RDD組成新的DStream並返回。

reduce(func)

對DStream中每個RDD中的所有元素分別進行聚合,每個RDD生成一個單元素RDD,這些單元素RDD組成新的DStream並返回,func函式接受兩個引數並有一個返回值,且func操作必須是associative commutative,這樣才能支援平行計算。

countByValue()

對元素型別為K的DStream呼叫該方法,將返回型別為(K,Long)鍵值對的新DStream。“鍵”對應的“值”是該“鍵”在源DStream中每個RDD中的出現頻率。

reduceByKey(func, [numTasks])

當對元素型別為(K, V)對的DStream呼叫該方法,返回(K,V)對型別的新DStream,其中使用給定的reduce函式聚合每個鍵的值。注意:預設情況下,它使用Spark的預設並行任務數(本地模式下為2,群集模式中的並行數由屬性spark.default.parallelism指定)進行分組。您可以傳遞一個可選的numTasks引數來設定task的數量。

join(otherStream, [numTasks])

當源DStream型別為(K, V),otherStream型別為(K, W)時,返回一個新的型別為(K, (V,W))的DStream。

cogroup(otherStream, [numTasks])

當源DStream型別為(K, V),otherStream型別為(K, W)時,返回一個新的型別為(K, Seq[V], Seq[W])的DStream。

transform(func)

通過對源DStream的每個RDD應用RDD-to-RDD函式來返回一個新的DStream。這可以用於對DStream進行任意RDD操作。

updateStateByKey(func)

返回一個新的“state”DStream,其中通過對key的先前狀態和新的values應用給定的方法func,將計算結果用來更新每個key的狀態。這可以用於維護每個key的任意的狀態資料。


下面將對上述的某些transformation運算元進行深入討論。

UpdateStateByKey運算元

updateStateByKey操作允許您在使用新的資訊持續更新時保持任意的狀態。要使用這個操作,你將需要操作兩個步驟。

1、 定義“狀態(state)”,狀態可以是任意的資料型別。

2、 定義狀態更新函式,使用更新函式來指定如何使用先前狀態和輸入流中的新值更新狀態。

在每個批處理中,Spark將對所有現有的key應用狀態更新功能,無論它們在新批次資料中是否有對應的新的value。如果更新函式返回None,則鍵值對將被消除。

我們來舉例說明一下。假設你想在文字資料流中對每個單詞的計數持續更新。在這裡,單詞計數是狀態,它是一個整數。我們將更新功能定義為:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

這適用於包含單詞的DStream(例如,在前面的示例中,包含(word, 1)鍵值對的名為pairs的DStream)。

valrunningCounts=pairs.updateStateByKey[Int](updateFunction_)

將為每個單詞呼叫更新函式,其中newValues具有1的順序(來自(word, 1)鍵值對)且runningCount持有先前的計數。

請注意,使用updateStateByKey需要配置檢查點目錄,這在檢查點部分將詳細討論。

Transform操作

transform操作(以及其變體如transformWith)允許將任意RDD-to-RDD函式應用於DStream。 利用transform你可以將任何DStream API中未直接提供的RDD操作應用於DStream。例如,在DStream API中,沒有提供直接將資料流中的每小批資料與其他資料集相join的功能。但是,您可以通過使用transform來執行此操作。這帶來了豐富的可能性。例如,可以通過將輸入資料流與預先計算出的垃圾郵件特徵資訊(這些垃圾郵件特徵資訊也可能是Spark計算出的)結合起來進行實時資料清理,然後基於這種操作實現垃圾資訊的過濾。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

請注意,提供的函式在每個批次間隔中被呼叫。 這允許您設計隨時間變化的RDD操作,即RDD操作、分割槽數、廣播變數等可以在批次之間被更改。

視窗操作

Spark Streaming還提供了視窗計算功能,允許您在資料的滑動視窗上應用轉換操作。下圖說明了滑動視窗的工作方式。

如圖所示,每當視窗滑過originalDStream時,落在視窗內的源RDD被組合並被執行操作以產生windowed DStream的RDD。在上面的例子中,操作應用於最近3個時間單位的資料,並以2個時間單位滑動。這表明任何視窗操作都需要指定兩個引數。

視窗長度(windowlength) - 視窗的時間長度(上圖的示例中為:3)。

滑動間隔(slidinginterval) - 兩次相鄰的視窗操作的間隔(譯者注:即每次滑動的時間長度)(上圖示例中為:2)。

這兩個引數必須是源DStream的批間隔的倍數(上圖示例中為:1)。

我們以一個例子來說明視窗操作。 假設您希望對之前的單詞計數的示例進行擴充套件,每10秒鐘對過去30秒的資料進行wordcount。為此,我們必須在最近30秒的pairs DStream資料中對(word, 1)鍵值對應用reduceByKey操作。這是通過使用reduceByKeyAndWindow操作完成的。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些常見的視窗操作如下表所示。所有這些操作都用到了上述兩個引數 - windowLength和slideInterval。

Transformation

Meaning

window(windowLength

slideInterval)

返回基於源DStream的視窗批次計算而得到的新DStream。

countByWindow(wind

owLengthslideInterval)

返回基於滑動視窗的資料流中的元素個數。

reduceByWindow(fun

cwindowLengthslide

Interval)

使用func在滑動間隔中聚合資料流中的元素,生成一個新的“單元素”資料流並返回。該函式應該是associative and commutative,從而可以並行的執行計算。

reduceByKeyAndWin

dow(funcinvFuncwin

dowLengthslideInterv

al, [numTasks])

當包含(K,V)對的DStream進行呼叫時,返回包含(K,V)對的新DStream,其中每個鍵對應的所有值在滑動視窗的所有batch中使用給定的reduce函式func進行聚合。注意:預設情況下,它使用Spark的預設並行任務數(本地模式下為2,群集模式中的並行度由spark.default.parallelism屬性確指定)進行分組。 您可以傳遞一個可選的numTasks引數來設定不同並行度。

reduceByKeyAndWin

dow(funcinvFuncwin

dowLengthslideInterv

al, [numTasks])

上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一視窗的reduce計算結果遞增地計算每個視窗的reduce值。這是通過對進入滑動視窗的新資料進行reduce操作,以及“逆減(inverse reducing)”離開視窗的舊資料來完成的。一個例子是當視窗滑動時對鍵對應的值進行“一加一減”操作。但是,它僅適用於“可逆減函式(invertible reduce functions)”,即具有相應“反減”功能的減函式(作為引數invFunc)。 像reduceByKeyAndWindow一樣,通過可選引數可以配置reduce任務的數量。 請注意,使用此操作必須啟用檢查點。

countByValueAndWin

dow(windowLength,sli

deInterval, [numTasks])

當對包含(K,V)對的DStream呼叫時,返回(K,Long)對的新DStream,其中每個鍵的值是其滑動視窗內的出現頻數。像reduceByKeyAndWindow一樣,通過可選引數可以配置reduce任務的數量。

Join操作

最後,值得強調的是,您可以輕鬆地在Spark Streaming中執行不同型別的join操作。

Stream-stream joins

一個流可以很方便地與其他流進行join操作。

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

上述程式碼中,在每個批間隔中,由stream1生成的RDD將與stream2生成的RDD相join。 你也可以做leftOuterJoin,rightOuterJoin,fullOuterJoin。 此外,在流的視窗上進行聯接通常是非常有用的。這也很容易做到。

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

這在種操作在前面解釋DStream.transform操作時已經進行了展示。下面的例子是另一個join視窗流與資料集的例子。

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

實際上,您也可以動態更改用於join操作的資料集。提供給transform的函式引數在每個批次間隔都會被evaluated,因此將使用當前dataset指向的資料集。

DStream轉換操作的完整列表可在API文件中找到。有關Scala API,請參閱DStream和PairDStreamFunction。 對於Java API,請參閱JavaDStream和JavaPairDStream。 對於Python API,請參閱DStream。

DStreams的輸出操作

輸出操作允許將DStream的資料推送到外部系統,如資料庫或檔案系統。由於輸出操作實際上允許外部系統消費變換後的資料,所以輸出操作會觸發所有DStream transformation操作的實際執行(類似於RDD的action運算元)。目前,定義了以下輸出操作:

Output Operation

Meaning

print()

在執行streaming應用程式的driver節點上列印DStream中每個batch的前十個元素。一般用於開發和除錯。

saveAsTextFiles(prefix, [suffix])

將此DStream的內容儲存到文字檔案中。每個批資料對應的檔名基於prefix和suffix生成:“prefix-TIME_IN_MS [.suffix]”。

saveAsObjectFiles(prefix, [suffix])

將此DStream的內容另存為SequenceFiles格式的序列化Java物件。 每個批資料對應的檔名基於prefix和suffix生成“prefix-TIME_IN_MS [.suffix]”。

saveAsHadoopFiles(prefix, [suffix])

將此DStream的內容另存為Hadoop檔案。 每個批資料對應的檔名基於prefix和suffix生成“prefix-TIME_IN_MS [.suffix]”。

foreachRDD(func)

這是流資料處理中最常用的輸出操作,它可以對資料流中的每個RDD應用func方法。此方法應將每個RDD中的資料推送到外部系統,例如將RDD儲存到檔案,或將其通過網路寫入資料庫。請注意,函式func在執行streaming應用程式的driver程序中執行,通常會包括RDD action運算元,從而強制流式RDD資料的計算執行。

foreachRDD的設計模式

dstream.foreachRDD是一個強大的原語,它可以將資料傳送到外部系統。但是,瞭解如何正確有效地使用這個原語很重要。 一些需要注意避免的常見錯誤如下。

通常向外部系統寫入資料時需要建立一個連線物件(例如與遠端伺服器的TCP連線)並使用這個連線將資料傳送到遠端系統。為此,開發人員可能會嘗試在Spark driver程式中建立連線物件,然後在Spark worker中使用該連線物件來儲存RDD中的記錄。 例如下面這種做法:(在Scala中)

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

這顯然是不正確的,因為該操作需要將連線物件序列化並從driver程式傳送到worker。這種連線物件很少有能夠跨機器傳輸的,此錯誤可能會顯示為序列化錯誤(連線物件不可序列化),初始化錯誤(連線物件需要在worker初始化)等。正確的解決方案是在worker中建立連線物件。

但是,這又可能會導致另一個常見的錯誤 - 為每個記錄建立了一個新的連線。 例如:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

通常情況下,建立連線物件會有一定的時間和資源開銷。因此,為每個記錄建立和銷燬連線物件可能會引起不必要的高額開銷,並且會顯著降低系統的總體吞吐量。一個更好的解決方案是使用rdd.foreachPartition – 為每個分割槽建立一個連線物件,並使用該連線在RDD分割槽中傳送所有記錄。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

這樣可以在多個記錄上分攤建立連線物件的開銷。

最後,可以通過跨多個RDD 或者batch重用連線物件來進一步優化應用程式。可以維護連線物件的靜態連線池,連線池中的連線物件可以在多個batch資料的RDD中得到重用,並將這些資料推送到外部系統,從而進一步減少開銷。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

請注意,連線池中的連線應根據需要懶惰建立(lazily created),如果一段時間不使用,則會超時。這是實現將資料傳送到外部系統最高效的方式。

其它注意點:

1、 就像普通RDD的延遲操作由action運算元觸發,DStreams通過輸出操作觸發延遲計算。具體來說,其實是DStream輸出操作呼叫了RDD action運算元強制立即處理接收到的資料。因此,如果您的應用程式中沒有任何輸出操作,或者雖然具有dstream.foreachRDD()這樣的輸出操作,但是在其中沒有定義任何RDD action操作,則不會有任何操作被執行。系統將簡單地接收資料並將其丟棄。

2、 預設情況下,輸出操作同一時刻只能執行一個。它們按照它們在應用程式中定義的順序執行。

DataFrame和SQL操作

您可以很方便地使用DataFrames和SQL操作來處理流資料。您必須使用當前的StreamingContext對應的SparkContext建立一個SparkSession。此外,必須這樣做的另一個原因是使得應用可以在driver程式故障時得以重新啟動,這是通過建立一個可以延遲例項化的單例SparkSession來實現的。在下面的示例中,我們使用DataFrames和SQL來修改之前的wordcount示例並對單詞進行計數。我們將每個RDD轉換為DataFrame,並註冊為臨時表,然後在這張表上執行SQL查詢。

/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._
  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")
  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")
  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

您可以在原始碼的example目錄下找到完整程式碼或點選這裡

您還可以在來自不同執行緒的流資料(即非同步執行的StreamingContext)上定義表並執行SQL查詢。為使查詢操作可以順利執行,您需要確保將StreamingContext設定為記錄(remember)足夠數量的流資料。否則,由於StreamingContext不知道非同步SQL查詢的存在,可能會在查詢完成之前刪除舊的流資料。舉個例子,如果要對最後一個batch執行查詢操作,但是該查詢可能執行5分鐘之久,則可以呼叫streamingContext.remember(Minutes(5))讓streamingContext留存這些資料。

有關DataFrames的更多資訊,請參閱DataFrame和SQL指南。

MLlib操作

您還可以很方便的結合使用MLlib提供的機器學習演算法。首先,演算法包中提供了流式機器學習演算法(例如流式線性迴歸,流式KMeans等),其可以同時從流資料中學習,並將該模型應用於流資料。除此之外,對於大型的機器學習演算法,您可以離線訓練模型(即使用歷史資料),然後將該模型應用於實時的流資料。有關詳細資訊,請參閱MLlib指南。

快取/持久化

與RDD類似,DStreams還允許開發人員將流資料保留在記憶體中。也就是說,在DStream上呼叫persist() 方法會自動將該DStream的每個RDD保留在記憶體中。如果DStream中的資料將被多次計算(例如,相同資料上執行多個操作),這個操作就會很有用。對於基於視窗的操作,如reduceByWindow和reduceByKeyAndWindow以及基於狀態的操作,如updateStateByKey,資料會預設進行持久化。 因此,基於視窗的操作生成的DStream會自動儲存在記憶體中,而不需要開發人員呼叫persist()。

對於通過網路接收資料(例如Kafka,Flume,sockets等)的輸入流,預設持久化級別被設定為將資料複製到兩個節點進行容錯。

請注意,與RDD不同,DStreams的預設持久化級別將資料序列化儲存在記憶體中。這在“效能調優”部分有進一步的討論。有關不同持久化級別的更多資訊,請參見“Spark程式設計指南”。

檢查點支援

流資料處理程式通常都是全天候執行,因此必須對應用中邏輯無關的故障(例如,系統故障,JVM崩潰等)具有彈性。為了實現這一特性,Spark Streaming需要checkpoint足夠的資訊到容錯儲存系統,以便可以從故障中恢復。一般會對兩種型別的資料使用檢查點。

元資料檢查點(Metadatacheckpointing) - 將定義流計算的資訊儲存到容錯儲存中(如HDFS)。這用於從執行streaming程式的driver程式的節點的故障中恢復(稍後詳細討論)。元資料包括以下幾種:

1. 配置(Configuration) - 用於建立streaming應用程式的配置資訊。

2. DStream操作(DStream operations) - 定義streaming應用程式的DStream操作集合。

3. 不完整的batch(Incomplete batches) - jobs還在佇列中但尚未完成的batch。

資料檢查點(Datacheckpointing) - 將生成的RDD儲存到可靠的儲存層。對於一些需要將多個批次之間的資料進行組合的stateful變換操作,設定資料檢查點是必需的。在這些轉換操作中,當前生成的RDD依賴於先前批次的RDD,這導致依賴鏈的長度隨時間而不斷增加,由此也會導致基於血統機制的恢復時間無限增加。為了避免這種情況,stateful轉換的中間RDD將定期設定檢查點並儲存到到可靠的儲存層(例如HDFS)以切斷依賴關係鏈。

總而言之,元資料檢查點主要用於從driver程式故障中恢復,而資料或RDD檢查點在任何使用stateful轉換時是必須要有的。

何時啟用檢查點

對於具有以下任一要求的應用程式,必須啟用檢查點:

使用狀態轉換 - 如果在應用程式中使用updateStateByKey或reduceByKeyAndWindow(具有逆函式),則必須提供檢查點目錄以允許定期儲存RDD檢查點。

從執行應用程式的driver程式的故障中恢復 - 元資料檢查點用於使用進度資訊進行恢復。

請注意,不包含上述的“有狀態轉換操作”的簡單streaming應用程式無需啟用檢查點即可執行。當然,如果不啟用檢查點,driver端如果出現故障也只能進行部分恢復(一些接收但未處理的資料可能會丟失)。不過這通常是可以接受的,而且許多Spark Streaming應用程式就是以這種方式執行的。在未來,預計對非Hadoop環境的支援會有所改善。

如何配置檢查點

可以通過在一些可容錯、高可靠的檔案系統(例如,HDFS,S3等)中設定儲存檢查點資訊的目錄來啟用檢查點。這是通過使用streamingContext.checkpoint(checkpointDirectory)完成的。設定檢查點後,您就可以使用上述的有狀態轉換操作。此外,如果要使應用程式從驅動程式故障中恢復,您應該重寫streaming應用程式以使程式具有以下行為。

1、當程式第一次啟動時,它將建立一個新的StreamingContext,設定好所有流資料來源,然後呼叫start()方法。

2、當程式在失敗後重新啟動時,它將從checkpoint目錄中的檢查點資料重新建立一個StreamingContext。

使用StreamingContext.getOrCreate可以簡化此行為。 使用方式如下。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果checkpointDirectory存在,則將從檢查點資料重建上下文。如果目錄不存在(即第一次執行),則將呼叫函式functionToCreateContext來建立新的上下文並設定DStream。請參閱Scala示例RecoverableNetworkWordCount。 此示例將網路資料的單詞計數追加到一個檔案中。

除了使用getOrCreate之外,還需要確保在失敗時能夠自動重新啟動驅動程式程序。這隻能由用於執行應用程式的部署基礎架構來完成。這在“部署”部分進一步討論。

請注意,設定RDD的檢查點會帶來持久化過程的開銷。這可能會導致被設定RDD檢查點的批資料的處理時間增加。因此,需要仔細設定檢查點的時間間隔。當batchsize比較小的時候(例如1秒),為每個批次都設定檢查點可能會顯著降低操作吞吐量。相反,檢查點太少會導致血統和任務大小增長,這也可能會產生不利的影響。對於需要RDD檢查點的狀態轉換,預設間隔批間隔的倍數中第一個大於10秒的值(譯者注:即如果批間隔為3s,因為12是大於10且為3的倍數的最小值,所以此時預設的檢查點間隔為12秒)。它可以通過使用dstream.checkpoint(checkpointInterval)進行設定。通常,DStream的5到10個滑動間隔是設定檢查點間隔的一個比較合適的值。

累加器,廣播變數和檢查點

在SparkStreaming中,無法從檢查點恢復累加器和廣播變數。 如果啟用檢查點並使用累加器或廣播變數,則必須為累加器和廣播變數建立延遲例項化的單例例項,以便在驅動程式從故障中重新啟動後重新進行例項化。這下面的示例中進行了說明。

object WordBlacklist {
  @volatile private var instance: Broadcast[Seq[String]] = null
  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}
object DroppedWordsCounter {
  @volatile private var instance: LongAccumulator = null
  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

原始碼中可以看到完整程式碼。

部署應用程式

本節討論部署SparkStreaming應用程式的步驟。

要求

要執行SparkStreaming應用程式,您需要具備以下幾個條件。

叢集管理器叢集 - 這是任何Spark應用程式的一般要求,並在部署指南中有詳細討論。

應用程式打包成JAR - 您必須將流應用程式編譯為JAR包。如果您是使用spark-submit啟動應用程式,則不需要在JAR中提供Spark和Spark Streaming的依賴jar。但是,如果您的應用程式使用高階資料來源(例如Kafka,Flume),那麼您將必須將他們連線的額外artifact及其依賴項打包在用於部署應用程式的JAR中。例如,使用KafkaUtils的應用程式必須在應用程式JAR中包含spark-streaming-kafka-0-8_2.11及其所有次級依賴項。

為executor配置足夠的記憶體 - 由於接收到的資料必須儲存在記憶體中,所以執行程式必須配置足夠的記憶體來儲存接收到的資料。請注意,如果您正在進行10分鐘的視窗操作,系統必須至少能夠將最近10分鐘的資料儲存到記憶體中。因此,應用程式的記憶體要求取決於其中使用的操作。

配置檢查點 - 如果流應用程式需要設定檢查點,則必須在Hadoop API相容的可容錯的儲存層(例如HDFS,S3等)中配置檢查點目錄,並且streaming應用程式的書寫編寫方式也必須允許將檢查點資訊用於故障恢復。有關詳細資訊,請參閱檢查點部分。

配置應用程式driver程式的自動重新啟動 - 要實現從驅動程式故障中自動恢復,則用於執行streaming應用程式部署的基礎架構必須能夠監視驅動程式程序,並在驅動程式發生故障時重新啟動驅動程式。不同的叢集管理器有不同的工具來實現這一點。

Spark standalone- 可以用SparkStandalone模式將Spark應用提交到叢集中執行(請參閱“叢集部署模式”),即應用程式的驅動程式本身在其中一個工作節點上執行。然後,可以利用Standalone群集管理器來監控驅動程式,如果由於非零退出程式碼或者執行驅動程式的節點發生故障而導致驅動程式發生出現問題,則可以重新啟動它。有關詳細資訊,請參閱“SparkStandalone”指南中的有關“群集模式”和“監督”的部分。

YARN – yarn也支援類似的機制來實現應用程式的自動重啟。有關詳細資訊,請參閱YARN文件。

Mesos – 一個名Marathon的專案為已被用來在Mesos上實現這一點。

配置預寫日誌 – 從Spark 1.2開始,我們引入了預寫日誌來實現強大的容錯保證。如果啟用該功能,則從receiver接收的所有資料都將被寫入檢查點目錄中的預寫日誌中。這可以防止驅動程式恢復時的資料丟失,從而確保資料零丟失(在容錯語義部分中詳細討論)。可以通過將配置引數spark.streaming.receiver.writeAheadLog.enable設定為true來啟用此功能。然而,這些更強的語義可能以單個receiver的吞吐量為代價。通過並行執行更多的receiver可以改善這一點,增加總吞吐量。另外,建議在啟用預寫日誌時,如果在日誌已經儲存在支援複製容錯的儲存系統中時,禁用Spark接收到的資料的複製。這可以通過將輸入流的儲存級別設定為StorageLevel.MEMORY_AND_DISK_SER來完成。使用S3(或任何不支援重新整理的檔案系統)寫入日誌時,請記住啟用spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有關詳細資訊,請參閱SparkStreaming配置

設定最大接收速率 - 如果叢集資源不足以使得流資料的處理速度快於流資料的接受速度,則可以通過設定 “記錄/秒”的最大速率來對接收方進行速率限制。請參閱關於receiver的spark.streaming.receiver.maxRate配置引數和用於Direct Kafka方法的配置引數:spark.streaming.kafka.maxRatePerPartition。在Spark 1.5中,我們引入了一個稱為“反向施壓”(backpressure)的功能,該功能可以無需設定此速率限制,因為SparkStreaming會自動計算速率限制,並在處理條件發生變化時動態調整速率限制。可以通過將配置引數spark.streaming.backpressure.enabled設定為true來啟用此功能。

升級應用程式程式碼

如果正在執行的SparkStreaming應用程式需要新增新的應用程式程式碼進行升級,則有兩種可能的實現機制。

1、升級後的Spark Streaming應用程式與現有應用程式並行啟動並執行。一旦新的程式(接收與舊的應用程式相同的資料)已經做好熱身並準備好接手,舊的應用就可以被關掉。請注意,這要求資料來源支援可以將資料傳送到兩個處理端(即較早和已升級的應用程式)。

2、現有應用程式正常關閉(請參閱StreamingContext.stop(...)或JavaStreamingContext.stop(...)以獲取正常關閉選項),以確保已關閉的資料在關閉前已完全處理。然後可以啟動升級的應用程式,這將從舊的應用程式停止的同一資料點開始處理。請注意,只有對支援源端緩衝的輸入源(如Kafka和Flume)才可以進行此操作,因為資料需要在先前的應用程式關閉並且升級的應用程式尚未啟動時進行緩衝。並且,你無法從舊的應用程式設定的檢查點資訊中重新啟動,因為檢查點資訊基本上包含序列化的Scala / Java / Python物件,如果嘗試使用新的修改的類反序列化舊類物件可能會導致錯誤。在這種情況下,你可以使用不同的檢查點目錄啟動升級的應用程式,也可以刪除舊的檢查點目錄。

監控應用程式

除了Spark的監控功能,Spark Streaming還有其他特有的功能。當使用StreamingContext時,Spark Web UI會顯示一個附加的Streaming選項卡,顯示有關正在執行的receiver(接收器是否活動,接收到的記錄數量,接收器錯誤等)以及完成的批次(批處理時間,排隊延遲等)的統計資訊)。這可以用於監視流應用程式的進度。

Web UI中的以下兩個指標特別重要:

1、 處理時間 - 處理每批資料的時間。

2、計劃延遲 - 批處理在佇列中等待先前批次處理完成的時間。

如果批量處理時間始終超過批次間隔或者排隊延遲不斷增加或者兩種情況同時存在,則表示系統處理批次的速度跟不上批資料生成的速度。在這種情況下,請考慮減少批處理時間(譯者注:例如簡化批處理操作)。

Spark Streaming程式的進展也可以使用StreamingListener介面進行監控,這樣可以讓您獲得接收者狀態和處理時間。請注意,這是一個開發人員API,將來可能會有所改進。