1. 程式人生 > >spark中stream程式設計指導(一)

spark中stream程式設計指導(一)

概述

spark stream是對spark核心api的擴充套件,其有著很好的擴充套件性,很高的吞吐量以及容錯性的動態資料的流式處理過程。資料可以來自不同的資料來源,例如Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets,一些具有高階功能的複雜的演算法,例如map,reduce,join andwindow可以使用這些演算法來進行資料的處理。最終,將資料推送到檔案系統,資料庫和儀表盤上。實際上,在資料流的處理過程中,可以使用機器學習演算法和“圖處理"(graph processing)演算法。


從內部來看,其工作流程如下。spark流式處理接收動態輸入資料流並且將資料切分成塊(batch),隨後spark引擎將會處理這些batch,以batch的形式來生成最後的結果流。



spark 流式處理提供了一種高階抽象的概念,叫作離散化流或是DStream,其表現為一種持續不斷的資料流, DStreams可以以多種形式被建立,可以從輸入資料流,例如Kafka, Flume, and Kinesis這樣的資料來源來進行建立,也可以將其他的DStream應用一些高階的操作進行DStream之間的轉換。系統內部可以理解為,一個DStream就是一連串的RDD。

注意:spark流式處理的Python API在spark1.2之後已經有了。對於java和scala的api一直都是支援的,但是,需要注意的是對於流式處理的資料來源的形式,現在spark只是支援基本的資料來源型別

,例如text file 或是text  data,對於一些工具的資料來源,例如Kafka and Flume,在今後的版本中會進行新增。

快速事例

下面是一個事例程式,該程式主要功能是計算文字資料的字數,這些資料來自於監聽的一個數據伺服器的TCP套接字埠。StreamingContext是所有流式處理程式的主要入口(類似於spark其他程式中的SparkContext),其中兩個引數第一個為SparkConf設定的內容,第二個是設定每個幾秒產生一個batch。

import org.apache.spark._
import org.apache.spark.streaming._
import
org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) 利用上面的StreamingContext我們可以創建出DStream,這個DStream代表了來自TCP資料來源的流式資料,方法引數 為主機名和埠(如下);  
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

上面的lines DStream表示著從資料伺服器中接收來的流式資料。在DStream中的每一條記錄就是文字中的一行,接下
來我們將要把每一行資料按空格來切分成單詞。
 
// Split each line into words
val words = lines.flatMap(_.split(" "))

flatmap函式是一個一對多的DStream操作,它在DStream資料來源中從每條記錄通過生成多條新記錄可以建立一個新的
DStream。在這個例子中,每一行將被分解成多個單詞並且單詞流用words DStream來進行表示。隨後我們將計算單
詞數量。

// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

words DStream被進一步的map成為(word,1)對式的DStream(mapped的過程是transformation的過程,一對一

的),隨後這些pairs將會按照上面(key,value)的形式來統計每個資料batch中單詞出現的頻次,wordCounts.print()

函式可以將每秒產生的次數列印輸出。需要注意的是,當這些程式碼執行的時候,spark流式處理僅僅設定計算,當程式開始

的時候,程式碼會執行,但是正真的處理過程還沒有開始,在所有的transformation都被設定好了以後,要開始處理過程,

我們可以呼叫如下函式

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

這段程式碼是spark當中的一個例子,可以在example下面找到,接下來開啟埠在Linux下面:
$ nc -lk 9999
接下來在spark控制檯每隔一秒就會有對”顯示,如下圖

spark流式處理基本流程

Linking

與spark相似,spark流式處理也是用maven工具或是sbt工具來構建我們的工程,下面是流式程式設計的核心依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.4.1</version>
</dependency>
對於從源中獲取資料例如 Kafka, Flume, and Kinesis,在SparkStreaming的核心api中並沒有現成的介面,那麼我們
必須從下面選擇相應的依賴來進行新增,如下舉出了部分依賴
Source Artifact
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

Initializing StreamingContext

要初始化Spark Streaming program,一個StreamingContext物件必須在所有spark流式處理的主函式入口被顯示建立

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
注意:在內部建立了SparkContext,他可以被訪問按如下形式ssc.sparkContext.
此外,StreamingContext也可以由已經存在的SparkContext來進行建立
import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

在context被定義以後,你必須做一下幾件事情:
  1. 定義建立 input DStreams的輸入源.
  2. 通過在DStreams上應用transformation和output operation來定義流式計算。
  3. 使用streamingContext.start()來開始接收資料並且處理它
  4. 使用streamingContext.awaitTermination()等待處理或者是直接通過手動或是錯誤來停止 .
  5. 處理過程可以被自動停止使用streamingContext.stop().
要點:
  • context一旦開始,新的流式計算將不能被設定或者加入。
  • 流式處理一旦被停止,將不能被自動重啟。
  • 在一個JVM中,同一時刻,只能有一個StreamingContext處理活躍狀態.
  • StreamingContext  中的stop()也可以是SparkContext停止執行,如果只想停止StreamingContext ,可以將stop() 設定可選引數叫做stopSparkContext,將其設定為 false即可。
  • 一個SparkContext 可以建立多個StreamingContext反覆使用,前提是在下一個StreamingContext被建立之前,前一個StreamingContext要先被停止

離散化流(Discretized Streams:DStreams)

Discretized Stream or DStream是spark Streaming提供的基本的抽象概念。它表現為一個持續不斷的資料流,輸入的資料流既可以接收資料來源的資料,也可以處理由轉換的輸入流生成的資料流。就係統內部而言,DStream  可以被認為是一系列連續的RDD,RDD為一個不可變的、分散式資料集,為spark的抽象概念。在DStream 中的每一個RDD都有一個來自內部特定的資料,如下圖所示

Spark Streaming

應用在DStream上面的任何操作都會轉化成在DStream裡邊RDD的操作。例如上面例子中提到的將一個lines DStream 轉換為words DStream 的過程中,flatmap操作就會被應用到每一個lines 下的RDD,進而生成words下的RDD,見下圖。

Spark Streaming

這些裡邊的RDD的transformation被Spark Engine來進行計算。DStream隱藏了大部分的細節並且提供給開發者一些很高階,方便的api可以使用。這些操作將會在後面部分詳細的討論。

Input DStreams and Receivers

Input DStreams是表示著那些從資料流源接收的輸入資料流的DStreams。在上面的例子當中,lines是一個Input DStreams,它表示著從netcat伺服器上接收的最初的資料流。每一個Input DStream都可以被Receiver聯絡起來,Receiver可以從資料來源接收資料並且把資料儲存到記憶體當中,等待處理。

Spark Streaming提供了兩種策略來建立流式處理的源:

  • 基本源(Basic sources): 這一類資料來源在StreamingContext API中可以直接使用,例如 file systems, socket connections, and Akka actors.
  • 高階源(Advanced sources):這類源比如 Kafka, Flume, Kinesis, Twitter, etc. ,使用必須通過其他的通過類。這個主要通過新增其他的依賴來實現,具體見linking

注意,如果你想要在你的流式應用中並行地接收多重資料流,你可以建立多個Input DStreams(詳細介紹見)。這將建立多個Receiver,這些Receiver將同時接收多個數據流。但是需要注意的是,一個Spark worker或是executor是一個long-running的任務,因此,它會長時間佔用分配給spark流式處理應用的核數(cores)。因此記住一個應用需要分配多少核數來處理接收的資料,和需要執行多少個Receiver來並行接收資料是十分重要的。

要點
  • 當在本地執行一個流式處理應用程式時,不要使用local或者是local[1]來作為master URL。這意味著只會有一個執行緒在處理執行著的任務時被用到。如果你正在使用一個基於Receiver(e.g. sockets, Kafka, Flume, etc.) input DStream接收資料,然後單一的執行緒將會被使用來執行receiver,從而沒有執行緒來處理已經接收的資料。因此,在本地執行時,local[n]中n的數量一定要大於receiver的數量。

  • 在一個叢集當中,擴充套件執行邏輯的時候,被分配給應用的核的數量一定要多於receiver的數量,否則系統將只是接收資料,並不會進行資料處理。