1. 程式人生 > >Spark Streaming狀態管理函式(三)——MapWithState的使用(scala版)

Spark Streaming狀態管理函式(三)——MapWithState的使用(scala版)

MapWithState

  關於mapWithState

  注意事項

  示例程式碼

  執行

  結論

  

關於mapWithState

  需要自己寫一個匿名函式func來實現自己想要的功能。如果有初始化的值得需要,可以使用initialState(RDD)來初始化key的值。 另外,還可以指定timeout函式,該函式的作用是,如果一個key超過timeout設定的時間沒有更新值,那麼這個key將會失效。這個控制需要在func中實現,必須使用state.isTimingOut()來判斷失效的key值。如果在失效時間之後,這個key又有新的值了,則會重新計算。如果沒有使用isTimingOut,則會報錯。

注意事項

  下面程式是使用idea編寫的,使用的是scala語言,在程式中master(“local[2]”)設定為本地模式([]中的數指定的是執行緒數,不能少於2,否則看不到結果。主要是因為spark需要啟動一個執行緒receiver來迴圈接收資料,一個Executor來接收資料,如果少於2執行緒不夠將不能打印出結果。),在window上執行的。使用的spark版本是2.3.0,在2.x以後的版本,基本採用SparkSession來進行操作。同時,想要執行程式你的伺服器上還必須要安裝netcat這個軟體,使用yum install nc進行安裝(注意安全配置好yum源,DNS才能下載安裝),使用命令nc -lk 6666開啟服務傳送資料。最後在執行程式前還需要匯入spark、scala相應的依賴包。

示例程式碼
package spark2x

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

/**
  * 類名  MapWithState
  * 作者   彭三青
  * 建立時間  2018-12-01 14:08
  * 版本  1.0
  * 描述: $
  */

object MapWithState {
  // 設定本地執行模式
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("MapWithState")
      .getOrCreate()

    // 建立一個context,批次間隔為2秒鐘,
    val ssc: StreamingContext = new StreamingContext(spark.sparkContext, Seconds(3))

    // 設定checkpoint目錄
    ssc.checkpoint("hdfs://SC01:8020/user/tmp/cp-20181201-2")

    // 建立一個ReceiverInputDStream,從伺服器端的netcat接收資料。
    // 伺服器主機名SC01(SC01已在Window上的hosts檔案中做了對映,沒做對映的則寫ip就OK了),監聽埠為6666
    val line: ReceiverInputDStream[String] = ssc.socketTextStream("SC01", 6666)

    // 對接收到的資料進行處理,進行切割,分組形式為(day, 1) (word 1)
    val wordsStream: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_, 1))

    val wordCount: MapWithStateDStream[String, Int, Int, Any] = wordsStream.mapWithState(StateSpec.function(func).timeout(Seconds(30)))

	// 列印
    wordCount.print()
	// 提交
    ssc.start()
	// 
    ssc.awaitTermination()
  }

  /**
    * 定義一個函式,該函式有三個型別word: String, option: Option[Int], state: State[Int]
    * 其中word代表統計的單詞,option代表的是歷史資料,state代表的是返回的狀態
    */
  val func = (word: String, option: Option[Int], state: State[Int]) => {
    if(state.isTimingOut()){
      println(word + "is timeout")
    }else{
      // 獲取歷史資料,當前值加上上一個批次的該狀態的值
      val sum = option.getOrElse(0) + state.getOption().getOrElse(0)
      // 單詞和該單詞出現的頻率
      val wordFreq = (word, sum)
      // 更新狀態
      state.update(sum)
      wordFreq
    }
  }
}
執行

  伺服器執行nc
在這裡插入圖片描述
  idea端執行編寫好的程式
  伺服器傳送資料
在這裡插入圖片描述
  控制檯顯示結果
在這裡插入圖片描述

結論

  mapWithState它會按照時間線在每一個批次間隔返回之前的發生改變的或者新的key的狀態,不發生變化的不返回。同時mapWithState可以不用設定checkpoint,返回的資料量少,效能和效率都比mapWithState好。




第一篇:Spark Streaming狀態管理函式(一)——updateStateByKey和mapWithState
第二篇:Spark Streaming狀態管理函式(二)——updateStateByKey的使用(scala版)