1. 程式人生 > >Spark系列4- Spark Streaming

Spark系列4- Spark Streaming

1 流計算

靜態資料和流資料

靜態資料類似儲存在水庫中的水,是相對靜止不動的,如資料倉庫中儲存的資料、關係型資料庫中儲存的資料等。流資料是指在時間分佈和數量上無限的一系列動態資料合體,資料記錄是流資料的最小組成單元。

靜態資料和流資料的處理,分別對應兩種不同的計算模式:批量計算實時計算。資料的兩種處理模型如下圖所示。

靜態資料和動態資料的處理模型

2 Spark Streaming

Spark Streaming簡介

Spark Streaming是構建在spark上的實時計算框架,它擴充套件了Spark處理大規模流式資料的能力。Spark Streaming可結合批處理和互動式查詢,適用需要將歷史資料和實時資料聯合分析的應用場景。

針對流資料的實時計算稱為流計算。總體來說,流計算秉承一個基本理念,即資料的價值隨著時間的流逝而降低。對於一個流計算系統來說,它應達到高效能、海量式、實時性、分散式、易用性和可靠性等要求。流計算系統一般包括:資料實時採集、資料實時計算和資料實時查詢服務三大部分,Spark Streaming的實時計算系統的三大組成部分如下圖所示。

Spark Streaming流計算系統的三大組成部分

Spark Streaming最主要的抽象是離散化資料流(Discretized Stream, DStream),表示連續不斷的資料流。Streaming內部按照時間片將資料分成一段一段,每一段資料轉換成一個RDD,並且對DStream的操作最終都會被轉化成對RDD的操作。其執行流程如下圖所示,

Spark Streaming執行流程

**思考:**DStream是由多個具有時間屬性的RDD組成。

Spark Streaming 和Hadoop Storm對比

Spark Streaming只能實現秒級的計算,而Hadoop Storm可以實現毫秒級的響應,因此Spark Streaming無法滿足實時性要求非常高的場景,只能勝任其他流式準實時計算場景。

相比於Storm,Spark Streaming是基於RDD的,更容易做高效的容錯處理。Spark Streaming的離散化工作機制,使其可以同時相容批量和實時資料處理的邏輯和演算法,適用需要將歷史資料和實時資料聯合分析的應用場景。

Hadoop+Storm的Lambda架構涉及的元件較多,部署比較繁瑣,而Spark Streaming同時集成了流式計算和批量計算的功能,涉及元件少,部署簡單。

Spark Streaming 工作機制

Spark Streaming有一個Reciver元件,作為一個守護程序執行在Executor上,每個Receiver負責一個DStream輸入流。Reciver元件收到資料之後會提交給Spark Streaming程式進行處理。處理的結果可以傳遞給視覺化顯示或者儲存到HBase、HDFS中。如下圖所示,

Spark Streaming 工作機制

3 DStream程式設計

Spark Streaming程式設計的基本步驟

  • 通過建立輸入DStream來定義輸入源;
  • 通過對DStream應用轉換和輸出操作來定義計算流;
  • 呼叫StreamContext物件的start()方法,啟動接收和處理流程;
  • 呼叫StreamContext物件的awaitTermination()方法,等待流計算結束或者呼叫stop()方法,手動結束流計算;

建立Streaming Context

import org.apache.spark.streaming._
val ssc =  new StreamingContext(sc, Seconds(1))

編寫獨立應用程式時

import org.apache.spark.streaming._
val sc = new SparkConf().setAppName("AppName")
val ssc =  new StreamingContext(sc, Seconds(1))

從檔案中不停的獲取資料流

package sparkstudy.sparkstreaming

import org.apache.spark._
import org.apache.spark.streaming._

object StreamingFromFile {
  
    def main(args: Array[String]) = {
       val sparkConf = new SparkConf().setAppName("StreamingFromFile")
       val ssc = new StreamingContext(sparkConf, Seconds(2))
       
       val lines = ssc.textFileStream("file:///root/data/streaming")
       val words = lines.flatMap(line => line.split(","))
       val wordsCount = words.map(x => (x, 1)).reduceByKey(_ + _)
       wordsCount.print()
       ssc.start()
       ssc.awaitTermination()
    }
}
  • 只會對/root/data/streaming目錄下新增的檔案進行操作,即使修改歷史檔案,streaming也不會再操作。
  • ssc.stop()之後,只有退出會話,不能再通過同一個會話啟動DStreaming;

輔助指令碼,用於不停的生成新的檔案

mkdir /root/data/streaming
rm -f streaming/*
for v in `seq 1 100`;do tail -n $(($v+10)) user_login.txt > streaming/$v.txt; sleep 1; done

從socket中讀取資料流

使用Linux中的nc命令模擬服務端

nc -lk 9999

Streaming客戶端

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

import org.apache.spark.internal.Logging
import org.apache.log4j.{Level,Logger}

object StreamingExamples extends Logging {
    def setStreamingLogLevels() {
        val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements()
        if (!log4jInitialized) {
            logInfo("Setting log level to [WARN] for Streaming examples.")
            Logger.getRootLogger.setLevel(Level.WARN)
        }
    }
} 

object StreamingFromSocket {
  
    def main(args: Array[String]) = {
        
        if (args.length < 2) {
            System.err.println("StreamingFromSocket <hostname> <port>")
            System.exit(1)
        }
        
        StreamingExamples.setStreamingLogLevels()
        
        val sparkConf = new SparkConf().setAppName("StreamingFromSocket")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
        
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(line => line.split(","))
        val wordsCount = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordsCount.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

從RDD佇列中讀取資料流

可以呼叫StreamingContext物件的queueStream()方法建立基於RDD佇列的DStream。下面的例子中,每個1秒建立一個RDD放入佇列,Spark Streaming每隔2秒就從佇列中取出資料進行處理

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel

object StreamingFromRddQueue {
    def main(args: Array[String]) = {
        val sparkConf = new SparkConf().setAppName("StreamingFromRddQueue")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        
        val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
        val queueStream = ssc.queueStream(rddQueue)
        val mappedStream = queueStream.map(r => (r % 10,1))
        val reduceStream = mappedStream.reduceByKey(_ + _)
        reduceStream.print()
        ssc.start()
        for( i <- 1 to 10) {
            rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)
            Thread.sleep(1000)
        }
        ssc.stop()
    }
}

從kafka中讀取資料流

kafka的安裝和部署方法以及topic建立和測試方法請參考:Kafka叢集的安裝和部署

建立topic

kafka-topics.sh --create \
--zookeeper master:2181,slave2:2181,slave3:12181 \
--partitions 3 --replication-factor 3 --topic streaming 
  • 下載並拷貝spark-streaming-kafka*.jar包到$SPARK_HOME/jars/kafka目錄下
  • 拷貝$KAFKA_HOME/lib下的所有jar包到$SPARK_HOME/jars/kafka
mkdir $SPARK_HOME/jars/kafka
cp spark-streaming-kafka-0-8_2.11-2.1.0.jar $SPARK_HOME/jars/kafka/
cp $KAFKA_HOME/libs/*.jar $SPARK_HOME/jars/kafka/

啟動spark

sh $SPARK_HOME/sbin/start-all.sh

生產者程式

package sparkstudy.sparkstreaming
import java.util.HashMap
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object MyKafkaProducer {
  
    def main(args: Array[String]) = {
        
        if (args.length < 4) {
            System.err.println("StreamingFromSocket <BrokerList><hostname><messagePerSec><wordsPerMessage>")
            System.exit(1)
        }
        val Array(brokers, topic, messagePerSec, wordsPerMessage) = args
        val props = new HashMap[String, Object]
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        
        val producer = new KafkaProducer[String, String](props)
        
        // send some messages
        while(true) {
            (1 to messagePerSec.toInt).foreach {
                messageNum => val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
                print(str)
                println()
                val message = new ProducerRecord[String, String](topic, null, str)
                producer.send(message)
            }
            Thread.sleep(1000)
        }
    }
}

消費者程式

package sparkstudy.sparkstreaming
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils
import common.StreamingExamples
object MyKafkaConsumer {
    def main(args: Array[String]) = {
        StreamingExamples.setStreamingLogLevels()
        val sc =  new SparkConf().setAppName("MyKafkaConsumer")
        val ssc = new StreamingContext(sc, Seconds(10))
        ssc.checkpoint("/data/kafka/checkpoint")
        val zkQuorum = "localhost:2181"
        val groupId = "1"
        val topics = "streaming"
        val numThread = 1
        val topicMap = topics.split(",").map((_, numThread.toInt)).toMap
        
        val lineMap = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap)
        val lines = lineMap.map(_._2)
        
        val words = lines.flatMap(_.split(" "))
        val pair = words.map(x => (x,1))
        val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _,  Minutes(2), Seconds(10), 2)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

執行生產者和消費者程式

使用maven編譯上述程式碼,生產的jar包為SparkStreaming-0.0.1-SNAPSHOT.jar。 執行生產者程式

spark-submit --jars /root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/kafka-clients-0.10.2.0.jar \
--class sparkstudy.sparkstreaming.MyKafkaProducer SparkStreaming-0.0.1-SNAPSHOT.jar master:9092 streaming 10 4

執行消費者程式

spark-submit --jars /root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/kafka-clients-0.10.2.0.jar,\
/root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/spark-streaming-kafka-0-8_2.11-2.2.0.jar,\
/root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/metrics-core-2.2.0.jar,\
/root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/kafka_2.11-0.10.2.0.jar,\
/root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/zkclient-0.10.jar \
--class sparkstudy.sparkstreaming.MyKafkaConsumer SparkStreaming-0.0.1-SNAPSHOT.jar

spark streaming去取kafka資料時踩到的坑

**錯誤:**Caused by: java.lang.ClassNotFoundException: org.I0Itec.zkclient.IZkStateListener **解決方案:**是因為未指定zk的依賴,通過jars指定zkclient庫,–jars /root/software/spark-2.2.0-bin-hadoop2.6/jars/kafka/zkclient-0.10.jar

**錯誤:**java.lang.NoClassDefFoundError: org/apache/spark/Logging **解決方案:**org.apache.spark.Logging is available in Spark version 1.5.2 or lower version. It is not in the 2.0.0. 因此我把spark-streaming-kafka替換成spark-streaming-kafka-0-8。因為我的spark版本使用的是2.2,scala版本使用的是2.11.8,所以使用了spark-streaming-kafka-0-8_2.11-2.2.0.jar,pom的依賴配置為:

<!-- <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.11</artifactId>
    <version>1.6.1</version>
</dependency>-->

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

從flume讀取資料流

參考