1. 程式人生 > >一篇文章學會spark-streaming

一篇文章學會spark-streaming

1.什麼是spark-streaming?

實際生產中會有許多應用到實時處理的場景,比如:實時監測頁面點選,實時監測系統異常,實時監測來自於外部的攻擊。針對這些場景,twitter研發了實時資料處理工具storm,並在後來開源。spark針對這些場景設計了spark-streaming實時計算模型,它允許使用者使用一系列批處理的API去處理實時資料,能做到程式碼邏輯的重複使用。
和spark中的rdd非常相似,spark-streaming中使用離散化流(discretized stream)作為抽象的表示,叫做DStream。它是隨時間推移而收集資料的序列,每個時間段收集到的資料在DStream內部以一個RDD的形式存在。DStream支援從kafka,flume,hdfs,s3等獲取輸入。DStream也支援兩種操作,即轉化操作和輸出操作(區別於RDD中的行動操作)。轉化操作又分為無狀態的轉化操作和有狀態的轉化操作,無狀態的轉化操作有map,filter,flatmap,repartition等,是針對單個時間區間內的操作。而有狀態的轉化操作可以針對不同的時間區間,後面詳述。

2.兩個簡單的例子

2.1 監聽socket獲取資料,程式碼如下:
這裡使用nc -lk 9999 在ip為10.121.33.44的機器上傳送訊息

scala    17行

object SocketStream {
  def main(args: Array[String]): Unit = {
    //本地測試,設定4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒為一個批次
    val ssc = new StreamingContext(conf,Seconds(10))
    //接收訊息
    val dstream = ssc.socketTextStream("10.121.33.44",9999,StorageLevel.MEMORY_AND_DISK_SER)
    //監測關鍵字error,出現則print
    dstream.filter(_.contains("error")).foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

2.2 從kafka讀取資料,比較常用

scala    31行

object KafkaStream {

  def main(args: Array[String]): Unit = {
    //本地測試,設定4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒為一個批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相關引數
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收訊息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    //監測關鍵字error,出現則print
    dstream.filter(_.contains("error")).foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })
    
    ssc.start()
    ssc.awaitTermination()
  }
}
3.再來談架構

通過上面兩個例子,你可能對spark-streaming有了初步的瞭解,我們再來看一下它的架構。
Spark-streaming使用"微批次"的架構,把流式計算當做一系列微型的批處理操作來對待,每個時間段都產生一個RDD。如圖:
wpc
作用於一個DStream上的無狀態轉化操作會對它其中的每個RDD生效,如針對一個輸入為語句的DStream做flatMap操作的示意圖如下:
shiyitu

4.轉化操作

4.1 無狀態的轉化操作。
無狀態轉化操作就是簡單的將轉化作用於DStream的每個RDD上面。下面列舉了一些常見的轉化操作,其中最後一個transform表示可以試用自定義的轉化函式,儘管它前面已經提供了很多現成的API。
wzt
4.2有狀態的轉化操作。
有狀態的轉化操作是跨時間段的資料操作,一些先前的批次也被用來在新的批次中做計算。主要有滑動視窗和updateStateByKey。前者以一個時間段為滑動視窗進行操作,後者則用來跟蹤每個鍵的狀態變化。有狀態的轉化操作需要開啟檢查點機制來保證容錯性。即:給ssc.checkpoint()設定一個檢查點目錄。
(1)基於視窗的轉化操作會在一個比ssc設定的更長的時間段內,通過整合多個批次的,計算出整個大的時間視窗的結果。基於視窗的操作需要兩個引數,一個是視窗時長,一個是滑動步長。這兩個引數是ssc設定的時長的整數倍。下面的圖表示了一個時間視窗為3,滑動步長為2的視窗轉化操作。
window
前面提到的監測關鍵字error的例子,現在需要每隔20s就對前面30s有error的日誌記錄做計數,程式碼如下:

scala    34行

object KafkaStream {

  def main(args: Array[String]): Unit = {
    //本地測試,設定4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒為一個批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相關引數
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收訊息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER)
      .map(_._2)
    //每隔20s對前30s出現error的日誌做計數
    val errors = dstream.window(Seconds(30),Seconds(20))
        .filter(_.contains("error"))
        .count()
    errors.foreachRDD(rdd=>{
      rdd.foreach(println(_))
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

(2)updateStateByKey
updateStateByKey能對鍵值對的資料進行不同批次間的資料計算,使用updateStateByKey,需要傳入一個update函式,這個函式接收某個key最新批次對應的values,以及該key之前對應的value,按照自定義的邏輯返回一個新的value。如需要計算一個實時日誌中http響應碼的計數,程式碼如下:

scala    39行

object KafkaStream {

  def main(args: Array[String]): Unit = {
    //輸出目錄
    val output = args(0)
    //本地測試,設定4核
    val conf = new SparkConf().setMaster("local[4]").setAppName("streaming")
    //以10秒為一個批次
    val ssc = new StreamingContext(conf,Seconds(10))

    val zkQuorum = "10.22.33.44:6688,10.22.33.45:6688/kafka_cluster"
    val group_id = "realtime_data"

    //kafka相關引數
    val kafka_param = Map[String,String](
      "zookeeper.connect" ->zkQuorum,
      "group.id" -> group_id,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760"
    )
    val topic = Map[String,Int]("test_topic" -> 16)
    //接收訊息
    val dstream = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,kafka_param,topic,StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    val rdd = dstream.map(_.split("\001"))
      .map(x=>(x(0),x(1).toLong))
      .updateStateByKey(update)
    //輸出
    rdd.foreachRDD(_.saveAsTextFile(output))
    ssc.start()
    ssc.awaitTermination()
  }
  //update函式
  def update(new_values:Seq[Long],old_value:Option[Long]):Option[Long]={
    val current_num = new_values.size
    val result_num = current_num + old_value.getOrElse(0L)
    Some(result_num)
  }
}

(3)所有有狀態轉化操作
state

5.輸出操作

輸出操作比較簡單,有以下幾種:
save

6.作業穩定性

spark-streaming作業一般都要全天候不間斷執行,那麼作業的穩定性如何保證?主要有以下幾點:
6.1 檢查點機制。
其原理就是階段性的將作業執行的資料存放到儲存系統,如hdfs,s3等。當作業執行出現異常時可以從上述資料中恢復。
6.2 驅動器容錯。
在建立實時計算作業的上下文時使用getOrCreate函式。程式碼如下:

scala    7行

	val ssc = StreamingContext.getOrCreate(cp_dir,createContext )
    def createContext(): StreamingContext  ={
      val sc = new SparkContext(conf)
      val ssc = new StreamingContext(sc,Seconds(10))
      ssc.checkpoint(cp_dir)
    }

更多文章請關注微信公眾號:bigdataer
wx