1. 程式人生 > >Spark Streaming介紹以及案例

Spark Streaming介紹以及案例

概觀

Spark Streaming是核心Spark API的擴充套件,可實現實時資料流的可擴充套件,高吞吐量,容錯流處理。

資料來源:Kafka,Flume,Kinesis或TCP套接字等,

可以使用高階函式進行復雜演算法進行處理map,例如reducejoinwindow

處理後的資料可以推送到檔案系統,資料庫等

 

Spark Streaming

它的工作原理:

Spark Streaming接收實時輸入資料流並將資料分成批處理,然後由Spark引擎處理以批量生成最終結果流

Spark Streaming

Spark Streaming提供稱為離散流DStream的高階抽象,表示連續的資料流。DStream可以從來自Kafka,Flume和Kinesis等源的輸入資料流建立,也可以通過在其他DStream上應用高階操作來建立。在內部,DStream表示為一系列 RDD。

案例介紹

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("MyTest")
    val ssc = new StreamingContext(conf,Seconds(1))
//建立一個DStream來表示來自TCP源的流資料,指定為主機名(例如localhost)和埠(例如9999)。
//此linesDStream表示將從資料伺服器接收的資料流。DStream中的每條記錄都是一行文字
    val lines = ssc.socketTextStream("localhost",9999)
//flatMap是一對多DStream操作,它通過從源DStream中的每個記錄生成多個新記錄來建立新的DStream。在這種情況下,每行將被分成多個單詞,單詞流表示為wordsDStream。
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word,1))
    val wordCount = pairs.reduceByKey(_+_)
    wordCount.print()
//Spark Streaming僅設定啟動時將執行的計算,並且尚未啟動實際處理。要在設定完所有轉換後開始處理
    ssc.start()
    ssc.awaitTermination()
  }
}

使用埠傳送資料:

nc -lk 9999

檢視埠使用情況:

lsof -i:9999