1. 程式人生 > >Spark學習(玖)- Spark Streaming核心概念與程式設計

Spark學習(玖)- Spark Streaming核心概念與程式設計

文章目錄

核心概念之StreamingContext

要初始化一個Spark流程式,必須建立一個StreamingContext物件,它是所有Spark流功能的主要入口點。

可以從SparkConf物件建立StreamingContext物件。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

batch interval可以根據你的應用程式需求的延遲要求以及叢集可用的資源情況來設定;具體可以參考官網
http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#initializing-streamingcontext

定義上下文之後,您必須執行以下操作

1、通過建立輸入DStreams定義輸入源。
2、通過對DStreams應用轉換和輸出操作來定義流計算。
3、開始接收資料並使用streamingContext.start()進行處理。
4、使用streamingContext.awaitTermination()等待程序停止(手動或由於任何錯誤)。
5、可以使用streamingContext.stop()手動停止處理。

注意

  • 一旦Context啟動,就不能設定或新增新的流計算。
  • Context一旦停止,就不能重新啟動。【start()不能再stop()之後】
  • 在同一時段一個StreamingContext只能存活在一個JVM。
  • StreamingContext上的stop()也會停止SparkContext。若要僅停止StreamingContext,請將名為stopSparkContext的stop()的可選引數設定為false。
  • 可以重用SparkContext來建立多個StreamingContext,只要在建立下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext)。

核心概念之DStream

http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#discretized-streams-dstreams
Discretized Stream或DStream是Sparkstreaming提供的基本抽象。它表示連續的資料流,要麼是從源接收到的輸入資料流,要麼是通過轉換輸入流生成的經過處理的資料流。在內部,DStream由一系列連續的rdd表示,rdd是Spark對不可變的分散式資料集的抽象(有關更多細節,請參閱Spark程式設計指南)。DStream中的每個RDD都包含來自某個時間間隔的資料,如下圖所示。
在這裡插入圖片描述
應用於DStream的任何操作都轉換為底層rdd上的操作。例如,在前面將行流轉換為單詞的示例中,flatMap對映操作應用於行DStream中的每個RDD,以生成words DStream的RDD。如下圖所示。
在這裡插入圖片描述
這些底層的RDD轉換由Spark引擎計算。DStream操作隱藏了大部分細節,併為開發人員提供了一個更高階的API,以便於使用。這些操作將在後面幾節中詳細討論。

個人總結:對DStream操作運算元,比如map/flatMap,其實底層會被翻譯為對DStream中的每個RDD都做相同的操作;
因為一個DStream是由不同批次的RDD所構成的。

核心概念之Input DStreams和Receivers

Input DStreams是表示從流源頭接收的輸入資料流的DStreams。

在這個上個spark學習筆記八的例子中,lines是一個輸入DStream,因為它表示從netcat伺服器接收到的資料流。

每個輸入DStream(除了檔案系統;檔案系統已經存在直接就可以處理不需要接受)都與一個Receiver(Scala doc、Java doc)物件相關聯,該物件從源接收資料並將其儲存在Spark記憶體中進行處理。

Spark流提供了兩類內建流源
基本資源:StreamingContext API中直接可用的資源。示例:file systems, socket 連線。
高階資源:可以通過額外的實用程式類獲得Kafka、Flume、Kinesis等資源。如連結一節中所討論的,這些需要針對額外依賴項進行連結。

注意
1、在本地執行Spark流程式時,不要使用“local”或“local[1]”作為主URL。這兩種方法都意味著只使用一個執行緒在本地執行任務。如果您正在使用基於接收器的輸入DStream(例如socket、Kafka、Flume等),那麼將使用單個執行緒來執行接收器,沒有執行緒來處理接收到的資料。因此,在本地執行時,始終使用“local[n]”作為主URL,其中要執行的接收方數量為n >(有關如何設定主URL的資訊,請參閱Spark屬性)。
2、將邏輯擴充套件到在叢集上執行時,分配給Spark流應用程式的核心數量必須大於接收器的數量。否則系統將接收資料,但無法處理它。

基本資源

我們已經在快速示例中查看了ssl . sockettextstream(…),它從通過TCP套接字連線接收到的文字資料建立DStream。除了套接字,StreamingContext API還提供了從檔案建立DStreams作為輸入源的方法

File Streams:對於從與HDFS API(即HDFS、S3、NFS等)相容的任何檔案系統上的檔案讀取資料,可以將DStream建立為:

 streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark流將監視目錄dataDirectory並處理在該目錄中建立的任何檔案(不支援在巢狀目錄中編寫的檔案)。請注意:

  • 檔案必須具有相同的資料格式。
  • 必須在dataDirectory中建立檔案,方法是將檔案原子地移動或重新命名到資料目錄中。
  • 一旦移動,檔案就不能更改。因此,如果檔案是連續追加的,則不會讀取新資料。

對於簡單的文字檔案,有一個更簡單的方法streamingContext.textFileStream(dataDirectory)。檔案流不需要執行接收器,因此不需要分配核心。
Python API檔案流不可用,只有textfilestream可用。

高階資源

http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#advanced-sources

核心概念之Transformation和Output Operations

與RDD類似,轉換允許修改來自輸入DStream的資料。DStreams支援普通Spark RDD上可用的許多轉換。一些常見的如下:

Transformation Meaning
map(func) 通過將源DStream的每個元素傳遞給函式func來返回一個新的DStream 。
flatMap(func) 與map類似,但每個輸入項可以對映到0個或更多輸出項。
filter(func) 通過只選擇func返回true的源DStream的記錄來返回一個新的DStream。
repartition(numPartitions) 通過建立更多或更少的分割槽來更改此DStream中的並行度級別。
union(otherStream) 返回一個新的DStream,它包含源DStream和otherDStream中元素的並集。
count() 通過計算源DStream的每個RDD中的元素數量,返回單元素RDD的新DStream。
reduce(func) 通過使用函式func(它接受兩個引數並返回一個)聚合源DStream的每個RDD中的元素,返回單元素RDD的新DStream 。該函式應該是關聯的和可交換的,以便可以平行計算。
countByValue() 當在型別K的元素的DStream上呼叫時,返回(K,Long)對的新DStream,其中每個鍵的值是其在源DStream的每個RDD中的頻率。
reduceByKey(func, [numTasks]) 當在(K,V)對的DStream上呼叫時,返回(K,V)對的新DStream,其中使用給定的reduce函式聚合每個鍵的值。注意:預設情況下,這使用Spark的預設並行任務數(本地模式為2,在群集模式下,數量由config屬性確定spark.default.parallelism)進行分組。您可以傳遞可選numTasks引數來設定不同數量的任務。
join(otherStream, [numTasks]) 當在(K,V)和(K,W)對的兩個DStream上呼叫時,返回(K,(V,W))對的新DStream與每個鍵的所有元素對。
cogroup(otherStream, [numTasks]) 當在(K,V)和(K,W)對的DStream上呼叫時,返回(K,Seq [V],Seq [W])元組的新DStream。
transform(func) 通過將RDD-to-RDD函式應用於源DStream的每個RDD來返回新的DStream。這可以用於在DStream上執行任意RDD操作。
updateStateByKey(func) 返回一個新的“狀態”DStream,其中通過在鍵的先前狀態和鍵的新值上應用給定函式來更新每個鍵的狀態。這可用於維護每個金鑰的任意狀態資料。

輸出操作允許將DStream的資料推送到外部系統,如資料庫或檔案系統。由於輸出操作實際上允許外部系統使用轉換後的資料,因此它們會觸發所有DStream轉換的實際執行(類似於RDD的操作)。目前,定義了以下輸出操作:

Output Operation Meaning
print() 在執行流應用程式的驅動程式節點上列印DStream中每批資料的前十個元素。這對開發和除錯很有用。 Python API這在Python API中稱為 pprint()。
saveAsTextFiles(prefix, [suffix]) 將此DStream的內容儲存為文字檔案。每個批處理間隔的檔名是基於字首和字尾生成的:“prefix-TIME_IN_MS [.suffix]”。
saveAsObjectFiles(prefix, [suffix]) 將此DStream的內容儲存為SequenceFiles序列化Java物件。每個批處理間隔的檔名是基於字首和 字尾生成的:“prefix-TIME_IN_MS [.suffix]”。Python API這在Python API中不可用。
saveAsHadoopFiles(prefix, [suffix]) 將此DStream的內容儲存為Hadoop檔案。每個批處理間隔的檔名是基於字首和字尾生成的:“prefix-TIME_IN_MS [.suffix]”。 Python API這在Python API中不可用。
foreachRDD(func) 最通用的輸出運算子,它將函式func應用於從流生成的每個RDD。此函式應將每個RDD中的資料推送到外部系統,例如將RDD儲存到檔案,或通過網路將其寫入資料庫。請注意,函式func在執行流應用程式的驅動程式程序中執行,並且通常會在其中執行RDD操作,這將強制計算流式RDD。

案例實戰之Spark Streaming處理socket資料

新增依賴

        <!-- Spark Streaming 依賴-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        
        <!-- 沒有此依賴會報找不到相關類的錯誤-->
        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.11</artifactId>
            <version>2.6.5</version>
        </dependency>
        
        <!-- 沒有此依賴會報找不到相關類的錯誤-->
        <dependency>
            <groupId>net.jpountz.lz4</groupId>
            <artifactId>lz4</artifactId>
            <version>1.3.0</version>
        </dependency>

編寫NetworkWordcount

package com.imooc.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming處理Socket資料
  *
  * 本地測試: nc -lk 6789
  */
object NetworkWordCount {


  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    /**
      * 建立StreamingContext需要兩個引數:SparkConf和batch interval
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.socketTextStream("localhost", 6789)

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
}

在這裡插入圖片描述

案例實戰之Spark Streaming處理檔案系統資料

package com.imooc.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用Spark Streaming處理檔案系統(local/hdfs)的資料
  */
object FileWordCount {

  def main(args: Array[String]): Unit = {
    //local;檔案系統是本地的不用receive接受;本地測試的時候不需要2個core
    val sparkConf = new SparkConf().setMaster("local").setAppName("FileWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val lines = ssc.textFileStream("file:///Users/rocky/data/imooc/ss/")

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}