1. 程式人生 > >Spark修煉之道(進階篇)——Spark入門到精通:第十節 Spark Streaming(一)

Spark修煉之道(進階篇)——Spark入門到精通:第十節 Spark Streaming(一)

本節主要內容

  1. Spark流式計算簡介
  2. Spark Streaming相關核心類
  3. 入門案例

1. Spark流式計算簡介

Hadoop的MapReduce及Spark SQL等只能進行離線計算,無法滿足實時性要求較高的業務需求,例如實時推薦、實時網站效能分析等,流式計算可以解決這些問題。目前有三種比較常用的流式計算框架,它們分別是Storm,Spark Streaming和Samza,各個框架的比較及使用情況,可以參見:http://www.csdn.net/article/2015-03-09/2824135。本節對Spark Streaming進行重點介紹,Spark Streaming作為Spark的五大核心元件之一,其原生地支援多種資料來源的接入,而且可以與Spark MLLib、Graphx結合起來使用,輕鬆完成分散式環境下線上機器學習演算法的設計。Spark支援的輸入資料來源及輸出檔案如下圖所示:

這裡寫圖片描述

在後面的案例實戰當中,會涉及到這部分內容。中間的”Spark Streaming“會對輸入的資料來源進行處理,然後將結果輸出,其內部工作原理如下圖所示:

這裡寫圖片描述
Spark Streaming接受實時傳入的資料流,然後將資料按批次(batch)進行劃分,然後再將這部分資料交由Spark引擎進行處理,處理完成後將結果輸出到外部檔案。

先看下面一段基於Spark Streaming的word count程式碼,它可以很好地幫助初步理解流式計算

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object
StreamingWordCount {
def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: StreamingWordCount <directory>") System.exit(1) } //建立SparkConf物件 val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]") // Create the context
//建立StreamingContext物件,與叢集進行互動 val ssc = new StreamingContext(sparkConf, Seconds(20)) // Create the FileInputDStream on the directory and use the // stream to count words in new files created //如果目錄中有新建立的檔案,則讀取 val lines = ssc.textFileStream(args(0)) //分割為單詞 val words = lines.flatMap(_.split(" ")) //統計單詞出現次數 val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) //列印結果 wordCounts.print() //啟動Spark Streaming ssc.start() //一直執行,除非人為干預再停止 ssc.awaitTermination() } }

執行上面的程式後,再通過命令列介面,將檔案拷貝到相應的檔案目錄,具體如下:
這裡寫圖片描述

程式在執行時,根據檔案建立時間對檔案進行處理,在上一次執行時間後建立的檔案都會被處理,輸出結果如下:
這裡寫圖片描述

2. Spark Streaming相關核心類

1. DStream(discretized stream)

Spark Streaming提供了對資料流的抽象,它就是DStream,它可以通過前述的 Kafka, Flume等資料來源建立,DStream本質上是由一系列的RDD構成。各個RDD中的資料為對應時間間隔( interval)中流入的資料,如下圖所示:
這裡寫圖片描述

對DStream的所有操作最終都要轉換為對RDD的操作,例如前面的StreamingWordCount程式,flatMap操作將作用於DStream中的所有RDD,如下圖所示:

這裡寫圖片描述

2.StreamingContext
在Spark Streaming當中,StreamingContext是整個程式的入口,其建立方式有多種,最常用的是通過SparkConf來建立:

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

建立StreamingContext物件時會根據SparkConf建立SparkContext

  /**
   * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
   * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
   * @param batchDuration the time interval at which streaming data will be divided into batches
   */
  def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }

也就是說StreamingContext是對SparkContext的封裝,StreamingContext還有其它幾個構造方法,感興趣的可以瞭解,後期在原始碼解析時會對它進行詳細的講解,建立StreamingContext時會指定batchDuration,它用於設定批處理時間間隔,需要根據應用程式和叢集資源情況去設定。當建立完成StreamingContext之後,再按下列步驟進行:

  1. 通過輸入源建立InputDStreaim
  2. 對DStreaming進行transformation和output操作,這樣操作構成了後期流式計算的邏輯
  3. 通過StreamingContext.start()方法啟動接收和處理資料的流程
  4. 使用streamingContext.awaitTermination()方法等待程式處理結束(手動停止或出錯停止)
  5. 也可以呼叫streamingContext.stop()方法結束程式的執行

關於StreamingContext有幾個值得注意的地方:

1.StreamingContext啟動後,增加新的操作將不起作用。也就是說在StreamingContext啟動之前,要定義好所有的計算邏輯
2.StreamingContext停止後,不能重新啟動。也就是說要重新計算的話,需要重新執行整個程式。
3.在單個JVM中,一段時間內不能出現兩個active狀態的StreamingContext
4.呼叫StreamingContext的stop方法時,SparkContext也將被stop掉,如果希望StreamingContext關閉時,保留SparkContext,則需要在stop方法中傳入引數stopSparkContext=false
/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed). By default, if stopSparkContext is not specified, the underlying
* SparkContext will also be stopped. This implicit behavior can be configured using the
* SparkConf configuration spark.streaming.stopSparkContextByDefault.
*
* @param stopSparkContext If true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
def stop(
stopSparkContext: Boolean = conf.getBoolean(“spark.streaming.stopSparkContextByDefault”, true)
): Unit = synchronized {
stop(stopSparkContext, false)
}
5.SparkContext物件可以被多個StreamingContexts重複使用,但需要前一個StreamingContexts停止後再建立下一個StreamingContext物件。

3. InputDStreams及Receivers
InputDStream指的是從資料流的源頭接受的輸入資料流,在前面的StreamingWordCount程式當中,val lines = ssc.textFileStream(args(0)) 就是一種InputDStream。除檔案流外,每個input DStream都關聯一個Receiver物件,該Receiver物件接收資料來源傳來的資料並將其儲存在記憶體中以便後期Spark處理。

Spark Streaimg提供兩種原生支援的流資料來源:

  1. Basic sources(基礎流資料來源)。直接通過StreamingContext API建立,例如檔案系統(本地檔案系統及分散式檔案系統)、Socket連線及Akka的Actor。
    檔案流(File Streams)的建立方式:
    a. streamingContext.fileStreamKeyClass, ValueClass, InputFormatClass
    b. streamingContext.textFileStream(dataDirectory)
    實時上textFileStream方法最終呼叫的也是fileStream方法
    def textFileStream(directory: String): DStream[String] = withNamedScope(“text file stream”) {
    fileStreamLongWritable, Text, TextInputFormat.map(_._2.toString)
    }

    基於Akka Actor流資料的建立方式:
    streamingContext.actorStream(actorProps, actor-name)

    基於Socket流資料的建立方式:
    ssc.socketTextStream(hostname: String,port: Int,storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)

    基於RDD佇列的流資料建立方式:
    streamingContext.queueStream(queueOfRDDs)

  2. Advanced sources(高階流資料來源)。如Kafka, Flume, Kinesis, Twitter等,需要藉助外部工具類,在執行時需要外部依賴(下一節內容中介紹)

Spark Streaming還支援使用者
3. Custom Sources(自定義流資料來源),它需要使用者定義receiver,該部分內容也放在下一節介紹

最後有兩個需要注意的地方:

  1. 在本地執行Spark Streaming時,master URL不能使用“local” 或 “local[1]”,因為當input DStream與receiver(如sockets, Kafka, Flume等)關聯時,receiver自身就需要一個執行緒來執行,此時便沒有執行緒去處理接收到的資料。因此,在本地執行SparkStreaming程式時,要使用“local[n]”作為master URL,n要大於receiver的數量。
  2. 在叢集上執行Spark Streaming時,分配給Spark Streaming程式的CPU核數也必須大於receiver的數量,否則系統將只接受資料,無法處理資料。

3. 入門案例

為方便後期檢視執行結果,修改日誌級別為Level.WARN

import org.apache.spark.Logging

import org.apache.log4j.{Level, Logger}

/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {

  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
  1. NetworkWordCount
    基於Socket流資料
object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }
    //修改日誌層次為Level.WARN
    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    //建立SocketInputDStream,接收來自ip:port傳送來的流資料
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

配置執行時引數
這裡寫圖片描述

使用

//啟動netcat server
[email protected]:~/streaming# nc -lk 9999

執行NetworkWordCount 程式,然後在netcat server執行的控制檯輸入任意字串

root@sparkmaster:~/streaming# nc -lk 9999
Hello WORLD
HELLO WORLD WORLD
TEWST
NIMA

這裡寫圖片描述

  1. QueueStream
    基於RDD佇列的流資料
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object QueueStream {

  def main(args: Array[String]) {

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("QueueStream").setMaster("local[4]")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create the queue through which RDDs can be pushed to
    // a QueueInputDStream
    //建立RDD佇列
    val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()

    // Create the QueueInputDStream and use it do some processing
    // 建立QueueInputDStream 
    val inputStream = ssc.queueStream(rddQueue)

    //處理佇列中的RDD資料
    val mappedStream = inputStream.map(x => (x % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    //列印結果
    reducedStream.print()

    //啟動計算
    ssc.start()

    // Create and push some RDDs into
    for (i <- 1 to 30) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 3000, 10)
      Thread.sleep(1000)

    //通過程式停止StreamingContext的執行
    ssc.stop()
  }
}

這裡寫圖片描述