1. 程式人生 > >Spark學習筆記(13)——Spark Streaming 案例

Spark學習筆記(13)——Spark Streaming 案例

1 Spark Streaming 介紹

Spark Streaming類似於Apache Storm,用於流式資料的處理。根據其官方文件介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支援的資料輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。資料輸入後可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能儲存在很多地方,如HDFS,資料庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。
在這裡插入圖片描述

1.1 特點

  • 易用
    在這裡插入圖片描述
  • 容錯
    在這裡插入圖片描述
  • 方便整合到Spark 體系
    在這裡插入圖片描述

1.2 Streaming 和 Storm 的對比

在這裡插入圖片描述

2 DStream

Discretized Stream是Spark Streaming的基礎抽象,代表持續性的資料流和經過各種Spark原語操作後的結果資料流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的資料,如下圖:
在這裡插入圖片描述
對資料的操作也是按照RDD為單位來進行的
在這裡插入圖片描述
計算過程由Spark engine來完成
在這裡插入圖片描述

3 測試案例

pom 檔案新增

<dependency>
            <
groupId
>
org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.3</version> </dependency>

3.1 原始碼

package mystreaming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.
{Seconds, StreamingContext} object StreamingWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) //接收資料 val ds = ssc.socketTextStream("node1", 8888) //DStream 是一個特殊的 RDD val result = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() ssc.start() ssc.awaitTermination() } }

3.2 在node1 啟動SocketServer 傳送資料

在這裡插入圖片描述

3.3 執行結果

在這裡插入圖片描述

3.4 過濾列印日誌

package mystreaming

import org.apache.log4j.{Logger, Level}
import org.apache.spark.Logging

object LoggerLevels extends Logging {

  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

在這裡插入圖片描述