1. 程式人生 > >Spark版本定製4-Spark Streaming事務處理徹底理解

Spark版本定製4-Spark Streaming事務處理徹底理解

本講內容:

a. Exactly Once 
b. 輸出不重複

注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。

上節回顧:

上節課通過案例透視了Spark Streaming Job架構和執行機,並結合原始碼進行了詳細解說;同時也瞭解了Spark Streaming Job的容錯機制,包括 Executor 與 Driver兩方面的容錯機制。

也就是說Job的事務處理,主要是在Executor 與 Driver兩個應用中展開

開講

首先,我們必須知道什麼是事務及其一致性?

事務應該具有4個屬性:原子性、一致性、隔離性、永續性。這四個屬性通常稱為ACID特性。

原子性(atomicity)。一個事務是一個不可分割的工作單位,事務中包括的諸操作要麼都做,要麼都不做。

一致性(consistency)。事務必須是使資料庫從一個一致性狀態變到另一個一致性狀態。一致性與原子性是密切相關的。

隔離性(isolation)。一個事務的執行不能被其他事務干擾。即一個事務內部的操作及使用的資料對併發的其他事務是隔離的,併發執行的各個事務之間不能互相干擾。

永續性(durability)。永續性也稱永久性(permanence),指一個事務一旦提交,它對資料庫中資料的改變就應該是永久性的。接下來的其他操作或故障不應該對其有任何影響。

以銀行轉帳為例,A客戶向B客戶轉賬一次(假如此次轉賬1萬元),正常情況下A客戶的賬戶裡只會被扣除一次且金額為一萬元,B客戶的賬戶也只會收到A客戶轉給的一次錢且金額同樣是一萬元,這就是事務及其一致性的具體體現,也就是說資料會被處理且會被正確的處理一次。

然而, Spark Streaming的事務處理和上述事例中講的事務及其一致性有所不同;Spark Streaming的事務關注的是某個Job執行的一致性。

本講將從事務視角為大家探索Spark Streaming架構機制

Spark Streaming應用程式啟動,會分配資源,除非整個叢集硬體資源奔潰,一般情況下都不會有問題。Spark Streaming程式分成而部分,一部分是Driver,另外一部分是Executor。Receiver接收到資料後不斷髮送元資料給Driver,Driver接收到元資料資訊後進行CheckPoint處理。其中CheckPoint包括:Configuration(含有Spark Conf、Spark Streaming等配置資訊)、Block MetaData、DStreamGraph、未處理完和等待中的Job。當然Receiver可以在多個Executor節點的上執行Job,Job的執行完全基於SparkCore的排程模式進行的。

這裡寫圖片描述

 Executor只有函式處理邏輯和資料,外部InputStream流入到Receiver中通過BlockManager寫入磁碟、記憶體、WAL進行容錯。WAL先寫入磁碟然後寫入Executor中,失敗可能性不大。如果1G資料要處理,Executor一條一條接收,Receiver接收資料是積累到一定記錄後才會寫入WAL,如果Receiver執行緒失敗時,資料有可能會丟失。 
  
  Driver處理元資料前會進行CheckPoint,Spark Streaming獲取資料、產生作業,但沒有解決執行的問題,執行一定要經過SparkContext。Driver級別的資料修復從Driver CheckPoint中需要把資料讀入,在其內部會重新構建SparkContext、StreamingContext、SparkJob,再提交Spark叢集執行。Receiver的重新恢復時會通過磁碟的WAL從磁碟恢復過來。

 Spark Streaming和Kafka結合不會出現WAL資料丟失的問題,Spark Streaming必須考慮外部流水線的方式處理。

這裡寫圖片描述

這裡寫圖片描述

 上面的圖例很好的解釋了怎麼能完成完整的語義、事務的一致性,保證資料的零丟失,Exactly Once的事務處理? 
  
  a、怎麼保證資料零丟失? 
   
  必須要有可靠的資料來源和可靠的Receiver、整個應用程式的MetaData必須進行CheckPoint、通過WAL來保證資料安全(生產環境下Receiver接收Kafka的資料,預設情況下會在Executor中存在二份資料,且預設情況下必須二份資料備份後才進行計算;如果Receiver接收資料時崩潰,沒有拷貝副本,此時會重新從Kafka中進行拷貝,拷貝的依據是zookeeper元資料)。 
   
  大家可以將Kafka看作是一個簡單的檔案儲存系統,在Executor中Receiver確定受到Kafka的每一條記錄後進行Replication到其他Executor成功後會通過ack向Kafka傳送確認收到的資訊並繼續從Kafka中讀取下一條資訊。 
   
  b、Driver容錯如下圖所示:

這裡寫圖片描述

再次思考資料在哪些地方可能丟失?

  資料丟失的主要場景如下:

  在Receiver收到資料且通過Driver的排程,Executor開始計算資料的時候如果Driver突然崩潰(導致Executor會被Kill掉),此時Executor會被Kill掉,那麼Executor中的資料就會丟失,此時就必須通過例如WAL機制讓所有的資料通過類似HDFS的方式進行安全性容錯處理,從而解決Executor被Kill掉後導致資料丟失可以通過WAL機制恢復回來。

下面需要考慮二個很重要的場景:

資料的處理怎麼保證有且僅有被處理一次?

資料零丟失並不能保證Exactly Once,如果Receiver接收且儲存起來後沒來得及更新updateOffsets時,就會導致資料被重複處理。

更詳細的說明資料重複讀取的場景:

  在Receiver收到資料且儲存到了hdfs時Receiver奔潰,此時持久化引擎沒有來得及進行updateOffset,Receiver重新啟動後就會從管理Kafka的ZooKeeper中再次讀取元資料從而導致重複讀取元資料;從SparkStreaming來看是成功的,但是Kafka認為是失敗的(因為Receiver奔潰時沒有及時更新offsets到ZooKeeper中)重新恢復時會重新消費一次,此時會導致資料重新消費的情況。

這裡寫圖片描述

效能補充:

a、通過WAL方式保證資料不丟失,但弊端是通過WAL方式會極大的損傷SparkStreaming中的Receiver接收資料的效能(現網生產環境通常會Kafka direct api直接處理)。

b、需要注意到是:如果通過Kafka作為資料來源的話,Kafka中有資料,然後Receiver接受資料的時候又會有資料副本,這個時候其實是儲存資源的浪費。(重複讀取資料解決辦法,讀取資料時可以將元資料資訊放入記憶體資料庫中,再次計算時檢查元資料是否被計算過)。

  Spark1.3的時候為了避免WAL的效能損失和實現Exactly Once而提供了Kafka direct api,把Kafka作為檔案儲存系統!!!此時Kafka兼具有流的優勢和檔案系統的優勢,至此,Spark Streaming+Kafka就構建了完美的流處理世界!!!

  資料不需要copy副本,不需要WAL效能損耗,不需要Receiver,而直接通過kafka direct api直接消費資料,所有的Executors通過kafka api直接消費資料,直接管理offset,所以也不會重複消費資料;事務實現啦!!!

關於Spark Streaming資料輸出多次重寫及其解決方案

a、為什麼會有這個問題,因為Spark Streaming在計算的時候基於Spark Core,Spark Core天生會做以下事情導致Spark Streaming的結果(部分)重複輸出:

Task重試;

慢任務推測

Stage重複;

Job重試;

b、具體解決方案:

設定spark.task.maxFailures次數為1;最大允許失敗的次數,設為1就沒有task、stage、job等的重試;

設定spark.speculation為關閉狀態(因為慢任務推測其實非常消耗效能,所以關閉後可以顯著提高Spark Streaming處理效能)

Spark Streaming on Kafka的話,Job失敗會導致任務失敗,Job失敗後可以設定auto.offset.reset為“largest”的方式;

這裡寫圖片描述

最後再次強調

  可以通過transform和foreachRDD基於業務邏輯程式碼進行邏輯控制來實現資料不重複消費和輸出不重複!這二個方法類似於spark的後門,可以做任意想象的控制操作!

有興趣想學習國內頂級整套Spark+Spark Streaming+Machine learning課程的,歡迎加我qq  471186150。共享視訊,價效比超高!