Spark修煉之道(進階篇)——Spark入門到精通:第十節 Spark Streaming(一)
本節主要內容
- Spark流式計算簡介
- Spark Streaming相關核心類
- 入門案例
1. Spark流式計算簡介
Hadoop的MapReduce及Spark SQL等只能進行離線計算,無法滿足實時性要求較高的業務需求,例如實時推薦、實時網站效能分析等,流式計算可以解決這些問題。目前有三種比較常用的流式計算框架,它們分別是Storm,Spark Streaming和Samza,各個框架的比較及使用情況,可以參見:http://www.csdn.net/article/2015-03-09/2824135。本節對Spark Streaming進行重點介紹,Spark Streaming作為Spark的五大核心元件之一,其原生地支援多種資料來源的接入,而且可以與Spark MLLib、Graphx結合起來使用,輕鬆完成分散式環境下線上機器學習演算法的設計。Spark支援的輸入資料來源及輸出檔案如下圖所示:
在後面的案例實戰當中,會涉及到這部分內容。中間的”Spark Streaming“會對輸入的資料來源進行處理,然後將結果輸出,其內部工作原理如下圖所示:
Spark Streaming接受實時傳入的資料流,然後將資料按批次(batch)進行劃分,然後再將這部分資料交由Spark引擎進行處理,處理完成後將結果輸出到外部檔案。
先看下面一段基於Spark Streaming的word count程式碼,它可以很好地幫助初步理解流式計算
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: StreamingWordCount <directory>")
System.exit(1)
}
//建立SparkConf物件
val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]")
// Create the context
//建立StreamingContext物件,與叢集進行互動
val ssc = new StreamingContext(sparkConf, Seconds(20))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
//如果目錄中有新建立的檔案,則讀取
val lines = ssc.textFileStream(args(0))
//分割為單詞
val words = lines.flatMap(_.split(" "))
//統計單詞出現次數
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
//列印結果
wordCounts.print()
//啟動Spark Streaming
ssc.start()
//一直執行,除非人為干預再停止
ssc.awaitTermination()
}
}
執行上面的程式後,再通過命令列介面,將檔案拷貝到相應的檔案目錄,具體如下:
程式在執行時,根據檔案建立時間對檔案進行處理,在上一次執行時間後建立的檔案都會被處理,輸出結果如下:
2. Spark Streaming相關核心類
1. DStream(discretized stream)
Spark Streaming提供了對資料流的抽象,它就是DStream,它可以通過前述的 Kafka, Flume等資料來源建立,DStream本質上是由一系列的RDD構成。各個RDD中的資料為對應時間間隔( interval)中流入的資料,如下圖所示:
對DStream的所有操作最終都要轉換為對RDD的操作,例如前面的StreamingWordCount程式,flatMap操作將作用於DStream中的所有RDD,如下圖所示:
2.StreamingContext
在Spark Streaming當中,StreamingContext是整個程式的入口,其建立方式有多種,最常用的是通過SparkConf來建立:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
建立StreamingContext物件時會根據SparkConf建立SparkContext
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
也就是說StreamingContext是對SparkContext的封裝,StreamingContext還有其它幾個構造方法,感興趣的可以瞭解,後期在原始碼解析時會對它進行詳細的講解,建立StreamingContext時會指定batchDuration,它用於設定批處理時間間隔,需要根據應用程式和叢集資源情況去設定。當建立完成StreamingContext之後,再按下列步驟進行:
- 通過輸入源建立InputDStreaim
- 對DStreaming進行transformation和output操作,這樣操作構成了後期流式計算的邏輯
- 通過StreamingContext.start()方法啟動接收和處理資料的流程
- 使用streamingContext.awaitTermination()方法等待程式處理結束(手動停止或出錯停止)
- 也可以呼叫streamingContext.stop()方法結束程式的執行
關於StreamingContext有幾個值得注意的地方:
1.StreamingContext啟動後,增加新的操作將不起作用。也就是說在StreamingContext啟動之前,要定義好所有的計算邏輯
2.StreamingContext停止後,不能重新啟動。也就是說要重新計算的話,需要重新執行整個程式。
3.在單個JVM中,一段時間內不能出現兩個active狀態的StreamingContext
4.呼叫StreamingContext的stop方法時,SparkContext也將被stop掉,如果希望StreamingContext關閉時,保留SparkContext,則需要在stop方法中傳入引數stopSparkContext=false
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed). By default, if stopSparkContext
is not specified, the underlying
* SparkContext will also be stopped. This implicit behavior can be configured using the
* SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
* @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
def stop(
stopSparkContext: Boolean = conf.getBoolean(“spark.streaming.stopSparkContextByDefault”, true)
): Unit = synchronized {
stop(stopSparkContext, false)
}
5.SparkContext物件可以被多個StreamingContexts重複使用,但需要前一個StreamingContexts停止後再建立下一個StreamingContext物件。
3. InputDStreams及Receivers
InputDStream指的是從資料流的源頭接受的輸入資料流,在前面的StreamingWordCount程式當中,val lines = ssc.textFileStream(args(0)) 就是一種InputDStream。除檔案流外,每個input DStream都關聯一個Receiver物件,該Receiver物件接收資料來源傳來的資料並將其儲存在記憶體中以便後期Spark處理。
Spark Streaimg提供兩種原生支援的流資料來源:
Basic sources(基礎流資料來源)。直接通過StreamingContext API建立,例如檔案系統(本地檔案系統及分散式檔案系統)、Socket連線及Akka的Actor。
檔案流(File Streams)的建立方式:
a. streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass
b. streamingContext.textFileStream(dataDirectory)
實時上textFileStream方法最終呼叫的也是fileStream方法
def textFileStream(directory: String): DStream[String] = withNamedScope(“text file stream”) {
fileStreamLongWritable, Text, TextInputFormat.map(_._2.toString)
}基於Akka Actor流資料的建立方式:
streamingContext.actorStream(actorProps, actor-name)基於Socket流資料的建立方式:
ssc.socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)基於RDD佇列的流資料建立方式:
streamingContext.queueStream(queueOfRDDs)Advanced sources(高階流資料來源)。如Kafka, Flume, Kinesis, Twitter等,需要藉助外部工具類,在執行時需要外部依賴(下一節內容中介紹)
Spark Streaming還支援使用者
3. Custom Sources(自定義流資料來源),它需要使用者定義receiver,該部分內容也放在下一節介紹
最後有兩個需要注意的地方:
- 在本地執行Spark Streaming時,master URL不能使用“local” 或 “local[1]”,因為當input DStream與receiver(如sockets, Kafka, Flume等)關聯時,receiver自身就需要一個執行緒來執行,此時便沒有執行緒去處理接收到的資料。因此,在本地執行SparkStreaming程式時,要使用“local[n]”作為master URL,n要大於receiver的數量。
- 在叢集上執行Spark Streaming時,分配給Spark Streaming程式的CPU核數也必須大於receiver的數量,否則系統將只接受資料,無法處理資料。
3. 入門案例
為方便後期檢視執行結果,修改日誌級別為Level.WARN
import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
- NetworkWordCount
基於Socket流資料
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
//修改日誌層次為Level.WARN
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
//建立SocketInputDStream,接收來自ip:port傳送來的流資料
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
配置執行時引數
使用
//啟動netcat server
[email protected]:~/streaming# nc -lk 9999
執行NetworkWordCount 程式,然後在netcat server執行的控制檯輸入任意字串
root@sparkmaster:~/streaming# nc -lk 9999
Hello WORLD
HELLO WORLD WORLD
TEWST
NIMA
- QueueStream
基於RDD佇列的流資料
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object QueueStream {
def main(args: Array[String]) {
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("QueueStream").setMaster("local[4]")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
//建立RDD佇列
val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
// 建立QueueInputDStream
val inputStream = ssc.queueStream(rddQueue)
//處理佇列中的RDD資料
val mappedStream = inputStream.map(x => (x % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//列印結果
reducedStream.print()
//啟動計算
ssc.start()
// Create and push some RDDs into
for (i <- 1 to 30) {
rddQueue += ssc.sparkContext.makeRDD(1 to 3000, 10)
Thread.sleep(1000)
//通過程式停止StreamingContext的執行
ssc.stop()
}
}