1. 程式人生 > >sparkStreaming的編程步驟

sparkStreaming的編程步驟

不起作用 active 分配 shadow 必須 接收數據 obj 註意點 狀態

(1)StreamingContext

  與spark core的編程類似,在編寫SparkStreaming的程序時,也需要一個通用的編程入口----StreamingContext。
StreamingContext的創建

object StreamingContextTest {
 def main(args: Array[String]): Unit = {
 val sparkConf = new SparkConf().setAppName("SCTest").setMaster("local[4]")
 val streamingContext = new StreamingContext(sparkConf, Seconds(2))
 }
}

註意
技術分享圖片
如果在計算的時候,指定--master時 使用的是local 並且只指定了一個線程,那麽只有receiver線程工作,計算的線程不會工作,所以在指定線程數的時候,最少指定2個。

(2)通過輸入源創建InputDStream:

在構建好StreamingContext之後,首先我們要讀取數據源的數據進行實時處理:
  InputDStreams指的是從數據流的源頭接收的輸入數據流,每個 InputDStream 都關聯一個 Receiver 對象,該 Receiver 對象接收數據源傳來的數據並將其保存在內存中以便後期 Spark 處理。
  Spark Streaming 提供兩種原生支持的流數據源和自定義的數據源:

    - 直接通過 StreamingContext API 創建,例如文件系統(本地文件系統及分布式文件系統)、 Socket 連接及 Akka 的 Actor。
    - Kafka, Flume, Kinesis, Twitter 等,需要借助外部工具類,在運行時需要外部依賴
    -Spark Streaming 還支持用戶自定義數據源,它需要用戶定義 receiver
註意
  - 在本地運行 Spark Streaming 時,master URL 不能使用”local”或”local[1] ”,因為當 Input DStream 與 Receiver(如 sockets, Kafka, Flume 等)關聯時,Receiver 自身就需要一個線程 來運行,此時便沒有線程去處理接收到的數據。因此,在本地運行 SparkStreaming 程序時,要使用”local[n]”作為 master URL,n 要大於 receiver 的數量。
  - 在集群上運行 Spark Streaming 時,分配給 Spark Streaming 程序的 CPU 核數也必須大於 receiver 的數量,否則系統將只接受數據,無法處理數據。

(3)對DStream進行transformation 和 output 操作,這樣操作構成了後期流式計算的邏輯

(4)通過streamingContext.start()方法啟動接收和處理數據的流程

(5)使用streamingContext.awaitTermination()方法等待程序結束(手動停止或出錯停止)

(6)調用streamingContext.stop()方法來結束程序的運行。


在編寫sparkStreaming時的註意點
  - streamingContext啟動後,增加新的操作將不起作用,一定要在啟動之前定義好邏輯,也就是說在調用start方法之後,在對sparkStreaming程序進行邏輯操作是不被允許的
  - StreamingContext 是單例對象停止後,不能重新啟動,除非重新啟動任務,重新執行計算
  - 在單個jvm中,一段時間內不能出現兩個active狀態的StreamingContext
  - 當在調用 StreamingContext 的 stop 方法時,默認情況下 SparkContext 也將被 stop 掉, 如果希望 StreamingContext 關閉時,能夠保留 SparkContext,則需要在 stop 方法中傳入參 數 stop SparkContext=false
  - 一個 SparkContext 可以用來創建多個 StreamingContext,只要前一個 StreamingContext 已經停止了。

sparkStreaming的編程步驟