當 Structured Streaming 碰到 Kafka
本文中,我們來看下用 Structured Streaming 怎麼處理 kafka中的複雜json 資料流,Structured Streaming 強項我們都知道,可以使用 event-time 進行聚合,可以使用 watermark處理延遲資料, 保證 exactly-once, 可以輸出各種外部系統,spark 和 kafka 一起可以讓你放飛自我
我們先來了解下 kafka, 然後舉幾個 Structured Streaming 讀寫 kafka的例子,然後看一個 真實使用場景。
/ kafka 瞭解一下?/
kafka 是現在最流行的一個分散式的實時流訊息系統,給下游訂閱消費系統提供了並行處理和可靠容錯機制,現在大公司在流式資料的處理場景, kafka 基本是標配。
kafka 把生產者傳送的資料放在不同的分割槽裡面,這樣就可以並行進行消費了。每個分割槽裡面的資料都是遞增有序的,跟 structured commit log 類似,生產者和消費者使用 kafka 進行解耦,消費者不管你生產者傳送的速率如何,我只要按照我們的節奏進行消費就可以了。每條訊息在一個分割槽裡面都有一個唯一的序列號 offset, kafka 會對內部儲存的訊息設定一個過期時間,如果過期了,就會標記刪除,不管這條訊息有沒有被消費。
kafka 消費策略
kafka 可以被看成一個無限的流,裡面的流資料是短暫存在的,如果不消費,訊息就過期滾動沒了,這就涉及一個問題,就是你如果開始消費,就要定一下從什麼位置開始。
-
earliest 從最起始位置開始消費,當然不一定是從 0 開始,因為如果資料過期了,就清掉了,所以可以理解為從現存的資料裡最小位置開始消費。
-
latest 這個好理解,從最末位置開始消費。
-
per-partition assignment , 對每個分割槽都指定一個 offset, 然後從這組 offset 位置開始消費。
當你第一次開始消費一個 kafka 流的時候,上述策略任選其一,如果你之前已經消費了,而且做了 checkpoint ,比如你消費程式升級了, 這時候就會從你上次結束的位置開始繼續消費。
/ Structured streaming 對kafka的支援 /
Structured Streaming 對 批處理 和流處理 提供統一的 API, 這一點在我看來是很重要的,我們一套相同的處理邏輯,沒必要寫出來兩套程式碼,減少了維護成本。 Structured Streaming 從 kafka 拉取訊息,然後就可以把 流資料看做 一個 DataFrame , 一張無限增長的大表,在這個大表上做查詢,Structured Streaming 給你保證了端到端的 exactly-once,你只需要關心你的業務即可,不用費心去關心底層是怎麼做的。
從 kafka topics 中讀取訊息
首先需要你指定你的資料來源,kafka 叢集的連線地址,你需要消費的 topic, 指定topic 的時候,可以使用正則來指定,也可以指定一個 topic 的集合,是不是很靈活?我們這裡只消費一個 topic
這個例子裡面我們指定消費 topic1, 在 spark 內部 DataStreamReader 會根據你的 配置來 拉取指定的 topic, kafka.bootstrap.servers (i.e. host:port) 地址 和 topic 這兩個引數是必填的。startingOffsets 可以選填,預設是 從 latest 位置開始消費
df.printSchema() , 我們來列印一些 DataFrame 的 schema
我們可以看到,schema 裡面包含了 kafka record 裡面的key value 和 相關 元資訊欄位,到這時候,我們就可以使用 DataFrame 和 Dataset 的方式來進行轉換處理了,有人問,如果 kafka 訊息的 value 位元組數組裡麵包含的 是 json 資料怎麼辦,我們在這裡拿到的 key 和 value 都是 位元組陣列,需要解析一下格式,當然 Spark+SQL/">Spark SQL 內嵌了一些型別處理方法了。
Data Stored as a UTF8 String
把位元組資料轉為 strings 型別
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Data Stored as JSON
如果你在kafka 裡面儲存的是 json 格式,我們可以使用內嵌的 from_json 去解析,你需要傳入一個 schema, 然後就變成了 Spark SQL 的型別了。
使用UDF註冊反序列化類
你也可以註冊一個 udf 進行反序列化,你可以實現一個 MyDeserialzer ,只要實現了 Kafka Deserializer 的介面即可
/ 輸出資料到 kafka /
往 kafka 裡面寫資料也類似,我們可以在 DataFrame 上呼叫 writeStream 來寫入 kafka, 指定value, key 是可選的,如果你不指定,就是null。如果 key 為null, 有時候可能導致 分割槽資料不均勻,需要注意一下。
需要打到哪個 topic, 可以在 D ataStreamWriter 上指定 option 配置, 也可以操作 DataFrame 的時候 在每條 record 上加一列 topic 欄位指定。
這個例子中,DataFrame 中是使用者資訊資料,我們把 序列化後的 userId 作為 key,然後把所有欄位 都轉成 json string 格式當做 value