1. 程式人生 > >【慕課網實戰】Spark Streaming實時流處理項目實戰筆記九之銘文升級版

【慕課網實戰】Spark Streaming實時流處理項目實戰筆記九之銘文升級版

file sin ssi 右上角 result map tap 核心 內容

銘文一級:

核心概念:
StreamingContext

def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}

def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}

batch interval可以根據你的應用程序需求的延遲要求以及集群可用的資源情況來設置


一旦StreamingContext定義好之後,就可以做一些事情

Discretized Streams (DStreams)
Internally, a DStream is represented by a continuous series of RDDs
Each RDD in a DStream contains data from a certain interval

對DStream操作算子,比如map/flatMap,其實底層會被翻譯為對DStream中的每個RDD都做相同的操作;
因為一個DStream是由不同批次的RDD所構成的。


Input DStreams and Receivers

Every input DStream (except file stream, discussed later in this section)
is associated with a Receiver object which
receives the data from a source and stores it
in Spark’s memory for processing.

銘文二級:

第六章:Spark Streaming核心概念與編程

DStream、Transfornations、Output operation

IDEA右上角的放大鏡可以搜索類,查看源碼

this為附屬構造方法

Context開始後無法設置或者添加

停止Streaming Context也可以通過停Spark Context來實現:

stop()

stopSparkContext()

DStream->其實是一系列的RDDs

來源:1.流進來  2.其他DStream轉化過來

實戰之處理Socket數據:

創建類NetworkWordCount

val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")  //雙引號勿忘,val定義!!!

val ssc = new StreamingContext(sparkConf,Seconds(5))  //Seconds

val lines = ssc.socketTextStream("localhost",6789)     //lines此時就是DStream

val result = lines.flatMap(_.split(" ")).map((_,1)).reduceBykey(_+_)

result.print

ssc.start()

ssc.awaitTermination()

啟動:nc -lk 6789

不能使用local[1]或者local,因為receiver自己operation也要使用一個,否則沒有輸出內容

運行會報錯,提示缺少依賴,可以打開maven project按要求導入相對應的依賴

還可能會提示缺少LZ4 And XxHash的依賴,去maven repository網址引入即可

【慕課網實戰】Spark Streaming實時流處理項目實戰筆記九之銘文升級版