Spark學習筆記(13)——Spark Streaming 案例
阿新 • • 發佈:2018-11-06
1 Spark Streaming 介紹
Spark Streaming類似於Apache Storm,用於流式資料的處理。根據其官方文件介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支援的資料輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。資料輸入後可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能儲存在很多地方,如HDFS,資料庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。
1.1 特點
- 易用
- 容錯
- 方便整合到Spark 體系
1.2 Streaming 和 Storm 的對比
2 DStream
Discretized Stream是Spark Streaming的基礎抽象,代表持續性的資料流和經過各種Spark原語操作後的結果資料流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的資料,如下圖:
對資料的操作也是按照RDD為單位來進行的
計算過程由Spark engine來完成
3 測試案例
pom 檔案新增
<dependency>
< groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.3</version>
</dependency>
3.1 原始碼
package mystreaming
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming. {Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
//接收資料
val ds = ssc.socketTextStream("node1", 8888)
//DStream 是一個特殊的 RDD
val result = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
3.2 在node1 啟動SocketServer 傳送資料
3.3 執行結果
3.4 過濾列印日誌
package mystreaming
import org.apache.log4j.{Logger, Level}
import org.apache.spark.Logging
object LoggerLevels extends Logging {
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}