1. 程式人生 > >在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL

在Spark結構化流readStream、writeStream 輸入輸出,及過程ETL

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

本文翻譯自DataBricks官方部落格,主要描述了Apache Spark 2.0中推出的新功能Structured Streaming(結構化流處理)從Kafka中讀取訊息,實時處理後再寫入不同的下游系統的使用示例。

結構化流處理API使得以一種兼具一致性和容錯性的方法開發被稱為連續應用的端到端流處理應用成為可能。它讓開發者不用再去深究流處理本身的細節,而且允許開發者使用類似Spark SQL中的熟悉概念,比如DataFrames和DataSets。由於上述原因,很多人有興趣仔細研究一些使用案例。從

入門,到ETL,再到複雜的資料格式,都已經有了很多材料涉及了。結構化流處理API同樣也可以和一些第三方的元件整合,比如Kafka,HDFS,S3,RDBMS等等。

在這篇文章中,我會講解與Kafka的端到端整合,從中處理訊息,進行簡單到複雜的基於window的ETL,以及將輸出放到不同的接收系統中,諸如記憶體,控臺,檔案,資料庫以及回到Kafka中。對於將輸出寫到檔案的情況,本文也會討論如何將新資料寫到分割槽表中。

Connecting to a Kafka Topic

與Kafka Topic連線

假設你有個可以連線的Kafka叢集,你想用Spark的結構化流處理功能來接收並處理一個topic來的訊息。Databricks平臺已經包含了Apache Kafka 0.10的結構化流處理功能聯結器,所以建立一個資訊流讀取訊息就變得很容易了:

import org.apache.spark.sql.functions.{get_json_object, json_tuple}

var streamingInputDF =  
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip")
    .option("subscribe", "topic1")     
    .option("startingOffsets", "latest")  
    .option("minPartitions", "10")  
    .option("failOnDataLoss", "true")
    .load()
import org.apache.spark.sql.functions.{get_json_object, json_tuple}  
streamingInputDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]  

讀取流資料的時候有一些可以設定的選項,這些選項的細節可以在這裡找到

我們可以快速的看一下我們剛剛建立的streamingInputDF這個DataFrame的schema

Schema中包含了keyvaluetopicpartitionoffsettimestamptimestampType這些域。我們可以從中選擇我們需要處理的域。value域中包含了我們真正的資料,timestamp是訊息接受的時間戳。在基於window處理的情況下,我們不要把這個timestamp域和訊息中真正含有的時間戳搞混了,後者大部分情況下才是我們關心的。

流處理ETL

在我們將流處理設定好了之後,我們就可以對其做需要的ETL來產生有意義的結論。注意streamingInputDF是一個DataFrame。因為Dataframe本質上說是無型別的行資料集,所以我們也可以對其做類似的操作。

假設一些ISP訪問的JSON資料被推送到上述的Kafka <topic>。比如一個數據點可能是這樣的:

val value = """{"city": "<CITY>",  
  "country": "United States", 
  "countryCode": "US", 
  "isp": "<ISP>", 
  "lat": 0.00, 
  "lon": 0.00, 
  "region": "CA", 
  "regionName": "California", 
  "status": "success", 
  "hittime": "<TIMPSTAMP>", 
  "zip": "<ZIP>" 
}"""

接下來我們就可以快速的做一些有意思的分析了,比如多少使用者是從某一個郵編地區來的,使用者通過哪個ISP進入等等。我們可以進一步建立一些資料儀表盤來跟我們的公司分享,下面讓我們深度分析一下:

import org.apache.spark.sql.functions._

var streamingSelectDF =  
  streamingInputDF
   .select(get_json_object(($"value").cast("string"), "$.zip").alias("zip"))
    .groupBy($"zip") 
    .count()

display(streamingSelectDF)  

注意在上述的命令中,我們可以把郵編從JSON訊息中提取出來,把他們group起來再計數,這些步驟全部是我們一邊從kafka的topic讀取資料一邊實時處理的。在我們得到計數結果後我們把結果顯示出來,這個過程會在後端開始一個流處理程式處理新進來的訊息並且不斷的顯示更新的結果。這張自動更新的圖表就可以在Databricks的平臺上作為一個訪問許可權可控的資料儀表盤和公司其他人分享了。

基於Window的處理

我們現在已經讓parse,select,groupBy和count這些查詢持續的在運行了,接下來如果我們想知道每個郵編的在10分鐘內的總流量,並且從每個小時的第2分鐘開始每5分鐘跟新一次該怎麼辦呢?

在這個例子中,進入的JSON資料包含一個表示時間戳的域'hittime', 我們可以用這個域來查詢每10分鐘的總流量。

注意在結構化流處理中,基於window的處理被認為是一種groupBy操作。下面的餅狀圖代表了每10分鐘視窗的流量。

import org.apache.spark.sql.functions._

var streamingSelectDF =  
  streamingInputDF
   .select(get_json_object(($"value").cast("string"), "$.zip").alias("zip"), get_json_object(($"value").cast("string"), "$.hittime").alias("hittime"))
   .groupBy($"zip", window($"hittime".cast("timestamp"), "10 minute", "5 minute", "2 minute"))
   .count()

輸出選項

至此,我們已經看到最終結果自動被顯示出來了。如果我們對於輸出選項需要更多的控制,有多種輸出模式可以供使用。比如,如果我們需要debug,你可能選擇控制檯輸出。如果我們希望資料一邊被處理我們能一邊實時查詢資料,記憶體輸出則是合理的選擇。類似的,輸出也可以被寫到檔案,外部資料庫,甚至可以重新流入Kafka。

我們來詳細過一遍這些選項。

記憶體

這種情況下,資料被作為記憶體中的資料表儲存起來。從記憶體中,使用者可以對資料集用SQL進行查詢。資料表的名字可以通過queryName選項來制定。注意我們繼續使用上述基於window處理例子中的streamingSelectDF

import org.apache.spark.sql.streaming.ProcessingTime

val query =  
  streamingSelectDF
    .writeStream
    .format("memory")        
    .queryName("isphits")     
    .outputMode("complete") 
    .trigger(ProcessingTime("25 seconds"))
    .start()

基於此你可以做更多有意思的分析,就像你對普通的資料表的做法一樣,而同時資料會自動被更新。

Console
控制檯

這種情況下,輸出被直接列印到控臺或者stdout日誌

In this scenario, output is printed to console/stdout log.

import org.apache.spark.sql.streaming.ProcessingTime

val query =  
  streamingSelectDF
    .writeStream
    .format("console")        
    .outputMode("complete") 
    .trigger(ProcessingTime("25 seconds"))
    .start()

檔案

這種情景是將輸出長期儲存的最佳方法。不像記憶體或者控臺這樣的接收系統,檔案和目錄都是具有容錯性的。所以,這個選項還要求一個“檢查點”目錄來存放一些為了容錯性需要的狀態.

import org.apache.spark.sql.streaming.ProcessingTime

val query =  
  streamingSelectDF
    .writeStream
    .format("parquet")
    .option("path", "/mnt/sample/data")
    .option("checkpointLocation", "/mnt/sample/check"))
    .trigger(ProcessingTime("25 seconds"))
    .start()

資料被儲存下來之後就可以像其他資料集一樣在Spark中被查詢了。

val streamData = spark.read.parquet("/mnt/sample/data")  
streamData.filter($"zip" === "38908").count()  

另一個輸出到檔案系統的好處是你可以動態地基於任何列對接受的訊息進行分割槽。在上述例子中,我們可以基於‘zipcode’或者‘day’進行分割槽。這可以讓查詢變得更快,因為通過引用一個個分割槽,一大部分資料都可以被跳過。

import org.apache.spark.sql.functions._

var streamingSelectDF =  
  streamingInputDF
   .select(get_json_object(($"value").cast("string"), "$.zip").alias("zip"),    get_json_object(($"value").cast("string"), "$.hittime").alias("hittime"), date_format(get_json_object(($"value").cast("string"), "$.hittime"), "dd.MM.yyyy").alias("day"))
    .groupBy($"zip") 
    .count()
    .as[(String, String)]

接下來我們可以把輸入資料按照‘zip’和‘day’分割槽

import org.apache.spark.sql.streaming.ProcessingTime

val query =  
  streamingSelectDF
    .writeStream
    .format("parquet")
    .option("path", "/mnt/sample/test-data")
    .option("checkpointLocation", "/mnt/sample/check")
    .partitionBy("zip", "day")
    .trigger(ProcessingTime("25 seconds"))
    .start()

我們來看看輸出資料夾是什麼樣的

現在,分割槽的資料可以直接在資料集和DataFrames被使用,如果一個數據表建立的時候指向了這些檔案被寫入的資料夾,Spark SQL可以用來查詢這些資料。

%sql CREATE EXTERNAL TABLE  test_par
    (hittime string)
    PARTITIONED BY (zip string, day string)
    STORED AS PARQUET
    LOCATION '/mnt/sample/test-data'

這種方法需要注意的一個細節是資料表需要被加入一個新的分割槽,資料表中的資料集才能被訪問到

%sql ALTER TABLE test_par ADD IF NOT EXISTS
    PARTITION (zip='38907', day='08.02.2017') LOCATION '/mnt/sample/test-data/zip=38907/day=08.02.2017'

分割槽引用也可以被預先填滿,這樣隨時檔案在其中被建立,他們可以立即被訪問。

%sql select * from test_par

現在你就可以對這個自動更新的資料表作分析了,與此同時資料在正確的分割槽中被儲存下來。

資料庫
Databases

我們經常想要把流處理輸出寫到像MySQL這樣的外部資料庫中。目前,結構化流處理API還不支援寫入外部資料庫。但是,在支援加入後,API的選項會像.format("jdbc").start("jdbc:mysql/..")這麼簡單。同時,我們可以用‘foreach’輸出來寫入資料庫。讓我們來寫一個自定義的JDBCSink來繼承ForeachWriter來實現集中的方法。

import java.sql._

class  JDBCSink(url:String, user:String, pwd:String) extends ForeachWriter[(String, String)] {  
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _

    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }

      def process(value: (String, String)): Unit = {
        statement.executeUpdate("INSERT INTO zip_test " + 
                "VALUES (" + value._1 + "," + value._2 + ")")
      }

      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
   }

我們現在就可以使用我們的JDBCSink了:

val url="jdbc:mysql://<mysqlserver>:3306/test"  
val user ="user"  
val pwd = "pwd"

val writer = new JDBCSink(url,user, pwd)  
val query =  
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

批處理完成後,每個郵編的總數就會被插入/更新到我們的MySQL資料庫中了

As batches are complete, counts by zip could be INSERTed/UPSERTed into MySQL as needed.

Kafka

跟寫入資料庫類似,現在的結構化流處理API還不原生支援"kafka"輸出格式,但是下一版中這個功能會被加上。與此同時,我們可以建立自定義的類KafkaSink來繼承ForeachWriter,我們來看看程式碼是怎麼樣的:

import java.util.Properties
import kafkashaded.org.apache.kafka.clients.producer._
import org.apache.spark.sql.ForeachWriter


 class  KafkaSink(topic:String, servers:String) extends ForeachWriter[(String, String)] {
      val kafkaProperties = new Properties()
      kafkaProperties.put("bootstrap.servers", servers)
      kafkaProperties.put("key.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
      kafkaProperties.put("value.serializer", "kafkashaded.org.apache.kafka.common.serialization.StringSerializer")
      val results = new scala.collection.mutable.HashMap[String, String]
      var producer: KafkaProducer[String, String] = _

      def open(partitionId: Long,version: Long): Boolean = {
        producer = new KafkaProducer(kafkaProperties)
        true
      }

      def process(value: (String, String)): Unit = {
          producer.send(new ProducerRecord(topic, value._1 + ":" + value._2))
      }

      def close(errorOrNull: Throwable): Unit = {
        producer.close()
      }
   }

下面我們就可以使用這個writer:

val topic = "<topic2>"
val brokers = "<server:ip>"

val writer = new KafkaSink(topic, brokers)

val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

你現在就能看到我們在將訊息流入Kafka的topic2.我們每個批處理後會把更新後的zipcode:count傳回Kafka。另一件需要注意的事情是流處理Dashboard會提供流入資料量和處理速率的對比,批處理時間和用來產生Dashboard的原始資料。這些在debug問題和監控系統的時候會很有用。

在Kafka的consumer元件裡面,我們可以看到:

這裡,我們把輸出執行在“更新”模式中。隨著訊息被處理,在一次批處理中被更新的郵編會被送回Kafka,沒被更新的郵編則不會被髮送。你也可以“完全”模式執行,類似我們在上面資料庫的例子裡那樣,這樣所有的郵編的最近的計數都會被髮送,即使有些郵編的總數與上次批處理比並沒有變化。

結論

本文概述了結構化流處理API和Kafka的整合,描述瞭如果用這套API和不同的資料輸入和輸出系統一起使用。這裡用到的一些概念對於其他流處理系統也同樣相關,比如埠,目錄等等。比如你想從埠源中讀取資料並且將處理好的訊息傳送到MySQL,將文中的例子修改一下輸入輸出流就能做到。另外,像例子裡使用的ForeachWriter也可以用來把輸出資料同時寫到多個下游系統中。我會在後續的文章中詳細描述寫入多個下游系統的方法。

我們在本部落格中使用的示例程式碼可用作Databricks Notebook您可以通過註冊免費的Databricks社群版帳戶,開始實驗結構化流式處理如果您有疑問或想要開始使用Databricks,請與我們聯絡

最後,我鼓勵您閱讀我們的一系列關於結構化流媒體的部落格:

資料庫部落格