1. 程式人生 > >Spark Streaming程式設計指南(三)

Spark Streaming程式設計指南(三)

DStreams轉換(Transformation)

和RDD類似,轉換中允許輸入DStream中的資料被修改。DStream支援很多Spark RDD上的轉換。常用的轉換如下。

轉換 含義
map(func) 將源DStream中的每個元素傳給函式func,返回新的DStream。
flatMap(func) 和map類似,但是每個輸入條目可以對映到0或多個輸出條目。
filter(func) 選擇源DStream中經過func處理後返回true的元素,返回新的DStream。
repartition(numPartitions) 改變DStream的並行級別,可建立更多或者更少的分割槽。
union(otherStream) 返回新的DStream,包含源DStream和otherDStream元素的union。
count() 通過對每個RDD的元素計數返回單元素RDDs的DStream。
reduce(func) 使用函式func(兩個引數和一個返回值),通過對每個RDD元素進行聚合返回單元素RDDs的DStream。函式應該是可結合和可交換的,以便進行平行計算。
countByValue() 在元素型別為K的DStream上呼叫時,返回一個(K, Long)對的DStream,每個key的value是每個RDD中這個key出現的頻率。
reduceByKey(func, [numTasks]) 在(K, V)對的DStream上呼叫時,返回一個新(K, V)對的DStream,每個key的value是使用給定reduce函式進行聚合的結果。注意:預設情況,使用Spark的預設並行任務數量(本地模式為2,叢集模式數量由spark.default.parallelism配置項決定)進行分組。可以傳遞可選引數numTasks任務數量。
join(otherStream, [numTasks]) 當在(K, V)對的DStream和(K, W)對的DStream上呼叫時,返回(K, (V, W))對的DStream。
cogroup(otherStream, [numTasks]) 當在(K, V)對的DStream和(K, W)對的DStream上呼叫時,返回(K, Seq[V], Seq[W])元組的DStream。
transform(func) 對源DStream的每個RDD應用一個RDD-to-RDD的函式,返回一個新DStream。可用於在DStream上進行任意RDD操作。
updateStateByKey(func) 返回新的”state” DStream,通過對key的前一個state和新值應用給定方法更新每個key的state。可用於維護每個key的任意state資料。

其中幾個轉換需要詳細說明。

UpdateStateByKey操作
當連續使用新資訊更新state時,updateStateByKey操作允許使用者維護任意狀態。使用這個操作,必須包含以下兩個步驟。

  1. 定義state - state可以是任意資料型別。
  2. 定義state更新函式 - 用一個函式指定如何使用前一個state和來自輸入流的新值更新state。

在每個批次,Spark會對所有存在的key應用state更新函式,不管這些key是否有新資料。如果更新函式返回None,則key-value對會消除。

用一個例子說明。維護每個單詞的執行計數,執行計數是一個state並且是個整數。定義更新函式如下:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

應用在包含單詞對(pair DStream包含(word, 1),參見示例)的DStream上。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函式會在每個單詞上進行呼叫,newValues是1’s的序列(來自(word, 1)對),runningCount是之前的計數。

注意,使用updateStateByKey要求配置檢查點目錄,之後詳細討論。

Transform操作
transform操作(以及其變化transformWith)可以在DStream上應用任意RDD-to-RDD函式。可用於使用任意沒有暴露給DStream API的RDD操作。例如,對資料流中每個批次的資料和另一個數據集進行join的功能沒有直接暴露給DStream API。但是,可以簡單地使用transform完成。這增加了很多可能性。例如,通過對輸入資料流和預先計算的垃圾資訊(也可能是Spark生成的)進行join,完成實時資料清理,然後基於結果進行篩選。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

注意,在每個批次時間間隔中,提供的函式都會被呼叫。可用於完成隨時間變化的RDD操作,也就是說,RDD操作,分割槽數量和廣播變數等可以在批次之間改變。

Window操作
Spark Streaming也提供了windowed計算,可在一個滑動window資料上應用轉換。下圖進行說明。

image.png
如上圖所示,每次在源DStream滑動window,落在window中的源RDDs會被合併和操作用於產生windowed DStream的RDDs。在這個例子中,操作應用在後面3個時間單位的資料上,以2個時間單位進行滑動。任何window操作都需要指定兩個引數。

  • window長度 - window持續時間(上圖是3)。
  • 滑動間隔 - window操作執行的間隔(上圖是2)。

這兩個引數必須是源DStream批時間間隔的倍數。

下面用一個示例說明window操作。擴充套件之前的示例,生成過去30s的單詞計數,每10s一次。為實現這個功能,必須對pair DStream過去30s的資料應用reduceByKey。這裡使用reduceByKeyAndWindow操作。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些通用的window操作如下。所有操作都需要兩個引數,windowLength和slideInterval。

轉換 含義
window(windowLength, slideInterval) 返回新的DStream,基於源DStream的windowed批次進行計算。
countByWindow(windowLength, slideInterval) 返回資料流中元素的滑動視窗計數。
reduceByWindow(func, windowLength, slideInterval) 返回單元素資料流,使用func函式,聚合滑動時間間隔的資料流元素進行建立。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 當在(K, V)對DStream上呼叫時,返回一個新的(K, V)對DStream,在滑動window的批次上,使用給定reduce函式func通過聚合得到每個key的value。注意:預設情況下,使用Spark的預設並行任務數量(本地模式是2,叢集模式根據配置屬性spark.default.parallelism確定)。可傳遞可選引數numTasks設定不同的任務數量。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上面方法的另一個版本,每個window的reduce value使用前一個window的reduce values增量計算。通過reduce進入滑動window的資料完成這個操作,然後逆向reduce離開window的舊資料。舉個例子,加減keys的數量作為window slides。但是,只適用於”可逆向reduce函式”,也就是說,有對應”逆向reduce”函式的那些reduce函式(即invFunc引數)。和上面的方法類似,reduce任務的數量可以通過可選引數進行配置。注意,使用這個操作必須啟用檢查點。
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當在(K, V)對DStream上呼叫時,返回一個新的(K, Long)對DStream,每個key的value是這個key在滑動window中的頻次。和reduceByKeyAndWindow類似,reduce任務的數量可以通過可選引數進行配置。

Join操作
最後,值得強調的是,可以很容易地在Spark Streaming中執行不同型別的join。

Stream-stream joins

資料流可以很容易地與其它資料流進行join。

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

這裡,在每個批時間間隔中,stream1生成的RDD都會和stream2生成deRDD進行join。也可以使用leftOuterJoinrightOuterJoinfullOuterJoin。另外,在流的window之間進行join非常有用。

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

之前解釋DStream.transform操作時進行了說明。這裡是另外一個windowed資料流和資料集之間進行join的示例。

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

實際上,可以動態修改要進行join的資料集。提供給transform的函式會在每個批時間間隔進行評估,然後dataset指向的當前資料集。

DStreams輸出操作

輸出操作允許DStream的資料寫入到外部系統中,如資料庫或者檔案系統。由於輸出操作實際上允許轉換資料被外部系統消費,所以輸出操作出發了所有DStream轉換的額實際執行(類似RDDs的action)。目前,定義瞭如下輸出操作:

輸出操作 含義
print() 在執行streaming應用程式的驅動節點上輸出DStream中每個批次的前10個元素。 對於開發和除錯非常有用。Python API要呼叫pprint()。
saveAsTextFiles(prefix, [suffix]) 將DStream的內容儲存為檔案。每個批次時間間隔的檔名字基於prefix和suffix生成:”prefix-TIME_IN_MS[.suffix]”。
saveAsObjectFiles(prefix, [suffix]) 將DStream的內容儲存為Java物件序列化的SequenceFiles。每個批次時間間隔的檔名字基於prefix和suffix生成:”prefix-TIME_IN_MS[.suffix]”。Python API不支援。
saveAsHadoopFiles(prefix, [suffix]) 將DStream的內容儲存為Hadoop檔案。每個批次時間間隔的檔名字基於prefix和suffix生成:”prefix-TIME_IN_MS[.suffix]”。Python API不支援。
foreachRDD(func) 最通用的輸出操作符,在stream生成的每個RDD上應用函式func。這個函式應該講每個RDD的資料傳送到外部系統,如儲存RDD到檔案或者通過網路寫入到資料庫。注意,函式func在執行streaming應用程式的驅動程序中執行,並且函式中通常會有RDD actions出發streaming RDDs的計算。

使用foreachRDD的設計模式

dstream.foreachRDD是一個強大的原語,允許資料傳送到外部系統。但是,理解如何正確高效地使用這個原語非常重要。避免一些常見錯誤的方式如下。

通常寫資料到外部系統要求建立連線物件(如TCP連線到遠端服務)並使用連線傳送資料到遠端系統。為達到這個目的,開發者可能大一地在Spark driver建立連線物件,然後嘗試在Spark worker中使用連線來儲存RDDs中的記錄。例如:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

這種做法是錯誤,因為這要求連線物件要序列化並且傳送到worker中去。這樣的連線物件很少跨機器進行傳遞。這個錯誤可能會顯示為序列化錯誤(連線物件未進行序列化),初始化錯誤(連線物件需要在worker節點初始化)等等。正確的方法是在worker節點建立連線物件。

但是,這會導致另外一個常見錯誤 - 為每條記錄建立一個新的連線。例如,

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

建立連線物件需要時間和資源開銷。因此,為每條記錄建立和銷燬連線物件會引發不必要的高開銷並會顯著降低系統的吞吐量。好的解決防範是使用rdd.foreachPartition - 建立一個連線物件,然後使用它傳送一個RDD分割槽中的所有記錄。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

這種方式將連線建立的開銷平攤到多條記錄當中了。

最後,可以通過在多個RDDs/批次之間重用連線物件進一步優化。可以維護一個連線物件的靜態池,重用連線物件將RDDs的多個批次傳送到外部系統,進一步減少開銷。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

注意,池子中的連線應該是按照需要懶惰建立的並且如果一定時間不用會超時。這實現了向外部系統傳送資料最有效的方式。

其它需要記住的點:

  • DStream通過輸出操作懶惰執行,就想RDDs通過RDD action懶惰執行。DStream輸出操作中的RDD actions會強制接收資料的處理。因此,如果應用程式沒有任何輸出操作或者有dstream.foreachRDD()這種不帶任何RDD action的輸出操作,什麼也不會執行。系統會簡單地接收資料並丟棄資料。
  • 預設地,輸出操作只會執行一次。會按照在應用程式的定義順序執行。

DataFrame和SQL操作

可對流資料使用DataFrames and SQL操作。需要使用SparkContext建立一個SparkSession。必須這樣做,才能從驅動程式錯誤中恢復重啟。通過建立SparkSession的懶例項化單例完成。下面是一個示例。修改了之前的示例,使用DataFrames和SQL生成單詞計數。每個RDD會轉換為DataFrame,作為臨時表註冊,然後使用SQL查詢。

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

也可以從另外一個執行緒在表上執行查詢(非同步執行StreamingContext)。只要保證設定StreamingContext記住查詢需要的足量資料即可。否則StreamingContext,無法識別非同步查詢,會在查詢完成前刪除舊的流資料。例如,如果想查詢上一個批次的資料,但是你的查詢可能需要執行5分鐘,則呼叫streamingContext.remember(Minutes(5))

MLib操作

可以使用MLlib提供的機器學習演算法。首先,streaming機器學習演算法(如Streaming Linear RegressionStreaming KMeans等),可以同時從流資料中學習並在流資料上應用模型。除了這些,對於更大一類的機器學習演算法,可以離線學習模型(如使用歷史資料),然後將模型應用於線上流資料。具體參見MLlib