1. 程式人生 > >Spark 定製版:004~Spark Streaming事務處理徹底掌握

Spark 定製版:004~Spark Streaming事務處理徹底掌握

本講內容:

a. Exactly Once
b. 輸出不重複

注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。

上節回顧:

上節課通過案例透視了Spark Streaming Job架構和執行機,並結合原始碼進行了詳細解說;同時也瞭解了Spark Streaming Job的容錯機制,包括 Executor 與 Driver兩方面的容錯機制。

也就是說Job的事務處理,主要是在Executor 與 Driver兩個應用中展開

開講

首先,我們必須知道什麼是事務及其一致性?

事務應該具有4個屬性:原子性、一致性、隔離性、永續性。這四個屬性通常稱為ACID特性。

原子性(atomicity)。一個事務是一個不可分割的工作單位,事務中包括的諸操作要麼都做,要麼都不做。

一致性(consistency)。事務必須是使資料庫從一個一致性狀態變到另一個一致性狀態。一致性與原子性是密切相關的。

隔離性(isolation)。一個事務的執行不能被其他事務干擾。即一個事務內部的操作及使用的資料對併發的其他事務是隔離的,併發執行的各個事務之間不能互相干擾。

永續性(durability)。永續性也稱永久性(permanence),指一個事務一旦提交,它對資料庫中資料的改變就應該是永久性的。接下來的其他操作或故障不應該對其有任何影響。

以銀行轉帳為例,A客戶向B客戶轉賬一次(假如此次轉賬1萬元),正常情況下A客戶的賬戶裡只會被扣除一次且金額為一萬元,B客戶的賬戶也只會收到A客戶轉給的一次錢且金額同樣是一萬元,這就是事務及其一致性的具體體現,也就是說資料會被處理且會被正確的處理一次。

然而, Spark Streaming的事務處理和上述事例中講的事務及其一致性有所不同;Spark Streaming的事務關注的是某個Job執行的一致性。

本講將從事務視角為大家探索Spark Streaming架構機制

Spark Streaming應用程式啟動,會分配資源,除非整個叢集硬體資源奔潰,一般情況下都不會有問題。Spark Streaming程式分成而部分,一部分是Driver,另外一部分是Executor。Receiver接收到資料後不斷髮送元資料給Driver,Driver接收到元資料資訊後進行CheckPoint處理。其中CheckPoint包括:Configuration(含有Spark Conf、Spark Streaming等配置資訊)、Block MetaData、DStreamGraph、未處理完和等待中的Job。當然Receiver可以在多個Executor節點的上執行Job,Job的執行完全基於SparkCore的排程模式進行的。

這裡寫圖片描述

 Executor只有函式處理邏輯和資料,外部InputStream流入到Receiver中通過BlockManager寫入磁碟、記憶體、WAL進行容錯。WAL先寫入磁碟然後寫入Executor中,失敗可能性不大。如果1G資料要處理,Executor一條一條接收,Receiver接收資料是積累到一定記錄後才會寫入WAL,如果Receiver執行緒失敗時,資料有可能會丟失。
 
  Driver處理元資料前會進行CheckPoint,Spark Streaming獲取資料、產生作業,但沒有解決執行的問題,執行一定要經過SparkContext。Driver級別的資料修復從Driver CheckPoint中需要把資料讀入,在其內部會重新構建SparkContext、StreamingContext、SparkJob,再提交Spark叢集執行。Receiver的重新恢復時會通過磁碟的WAL從磁碟恢復過來。

 Spark Streaming和Kafka結合不會出現WAL資料丟失的問題,Spark Streaming必須考慮外部流水線的方式處理。

這裡寫圖片描述

這裡寫圖片描述

 上面的圖例很好的解釋了怎麼能完成完整的語義、事務的一致性,保證資料的零丟失,Exactly Once的事務處理?
 
  a、怎麼保證資料零丟失?
  
  必須要有可靠的資料來源和可靠的Receiver、整個應用程式的MetaData必須進行CheckPoint、通過WAL來保證資料安全(生產環境下Receiver接收Kafka的資料,預設情況下會在Executor中存在二份資料,且預設情況下必須二份資料備份後才進行計算;如果Receiver接收資料時崩潰,沒有拷貝副本,此時會重新從Kafka中進行拷貝,拷貝的依據是zookeeper元資料)。
  
  大家可以將Kafka看作是一個簡單的檔案儲存系統,在Executor中Receiver確定受到Kafka的每一條記錄後進行Replication到其他Executor成功後會通過ack向Kafka傳送確認收到的資訊並繼續從Kafka中讀取下一條資訊。
  
  b、Driver容錯如下圖所示:

這裡寫圖片描述

再次思考資料在哪些地方可能丟失?

  資料丟失的主要場景如下:

  在Receiver收到資料且通過Driver的排程,Executor開始計算資料的時候如果Driver突然崩潰(導致Executor會被Kill掉),此時Executor會被Kill掉,那麼Executor中的資料就會丟失,此時就必須通過例如WAL機制讓所有的資料通過類似HDFS的方式進行安全性容錯處理,從而解決Executor被Kill掉後導致資料丟失可以通過WAL機制恢復回來。

下面需要考慮二個很重要的場景:

資料的處理怎麼保證有且僅有被處理一次?

資料零丟失並不能保證Exactly Once,如果Receiver接收且儲存起來後沒來得及更新updateOffsets時,就會導致資料被重複處理。

更詳細的說明資料重複讀取的場景:

  在Receiver收到資料且儲存到了hdfs時Receiver奔潰,此時持久化引擎沒有來得及進行updateOffset,Receiver重新啟動後就會從管理Kafka的ZooKeeper中再次讀取元資料從而導致重複讀取元資料;從SparkStreaming來看是成功的,但是Kafka認為是失敗的(因為Receiver奔潰時沒有及時更新offsets到ZooKeeper中)重新恢復時會重新消費一次,此時會導致資料重新消費的情況。

這裡寫圖片描述

效能補充:

a、通過WAL方式保證資料不丟失,但弊端是通過WAL方式會極大的損傷SparkStreaming中的Receiver接收資料的效能(現網生產環境通常會Kafka direct api直接處理)。

b、需要注意到是:如果通過Kafka作為資料來源的話,Kafka中有資料,然後Receiver接受資料的時候又會有資料副本,這個時候其實是儲存資源的浪費。(重複讀取資料解決辦法,讀取資料時可以將元資料資訊放入記憶體資料庫中,再次計算時檢查元資料是否被計算過)。

  Spark1.3的時候為了避免WAL的效能損失和實現Exactly Once而提供了Kafka direct api,把Kafka作為檔案儲存系統!!!此時Kafka兼具有流的優勢和檔案系統的優勢,至此,Spark Streaming+Kafka就構建了完美的流處理世界!!!

  資料不需要copy副本,不需要WAL效能損耗,不需要Receiver,而直接通過kafka direct api直接消費資料,所有的Executors通過kafka api直接消費資料,直接管理offset,所以也不會重複消費資料;事務實現啦!!!

關於Spark Streaming資料輸出多次重寫及其解決方案

a、為什麼會有這個問題,因為Spark Streaming在計算的時候基於Spark Core,Spark Core天生會做以下事情導致Spark Streaming的結果(部分)重複輸出:

Task重試;

慢任務推測

Stage重複;

Job重試;

b、具體解決方案:

設定spark.task.maxFailures次數為1;最大允許失敗的次數,設為1就沒有task、stage、job等的重試;

設定spark.speculation為關閉狀態(因為慢任務推測其實非常消耗效能,所以關閉後可以顯著提高Spark Streaming處理效能)

Spark Streaming on Kafka的話,Job失敗會導致任務失敗,Job失敗後可以設定auto.offset.reset為“largest”的方式;

這裡寫圖片描述

最後再次強調

  可以通過transform和foreachRDD基於業務邏輯程式碼進行邏輯控制來實現資料不重複消費和輸出不重複!這二個方法類似於spark的後門,可以做任意想象的控制操作!

相關推薦

Spark 製版004~Spark Streaming事務處理徹底掌握

本講內容: a. Exactly Once b. 輸出不重複 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧: 上節課通過案例透視了Spark Streaming Job架構和執行機,並結合原

Spark 製版015~Spark Streaming原始碼解讀之No Receivers徹底思考

本講內容: a. Direct Acess b. Kafka 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們講Spark Streaming中一個非常重要的內容:State狀態管理

Spark 製版013~Spark Streaming原始碼解讀之Driver容錯安全性

本講內容: a. ReceiverBlockTracker容錯安全性 b. DStreamGraph和JobGenerator容錯安全性 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,

Spark 製版010~Spark Streaming原始碼解讀之流資料不斷接收全生命週期徹底研究和思考

本講內容: a. 資料接收架構設計模式 b. 資料接收原始碼徹底研究 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧 上一講中,我們給大傢俱體分析了Receiver啟動的方式及其啟動設計帶來的多個

Spark版本定製4-Spark Streaming事務處理徹底理解

本講內容: a. Exactly Once  b. 輸出不重複 注:本講內容基於Spark 1.6.1版本(在2016年5月來說是Spark最新版本)講解。 上節回顧: 上節課通過案例透視了Spark Streaming Job架構和執行機,並結合原始碼進行了詳細

Spark製版2通過案例對SparkStreaming透徹理解三板斧之二

本節課主要從以下二個方面來解密SparkStreaming: 一、解密SparkStreaming執行機制 二、解密SparkStreaming架構 SparkStreaming執行時更像SparkCore上的應用程式,SparkStreaming程式啟動後會啟動很

<spark> error啟動spark後查看進程,進程中master和worker進程沖突

告訴 若有 master 沖突 存在 查看進程 spark 但是 文件 啟動hadoop再啟動spark後jps,發現master進程和worker進程同時存在,調試了半天配置文件。 測試發現,當我關閉hadoop後 worker進程還是存在, 但是,當我再關閉spar

Spark On Yarn提交Spark應用程式到Yarn

Spark On Yarn模式配置非常簡單,只需要下載編譯好的Spark安裝包,在一臺帶有Hadoop Yarn客戶端的機器上解壓,簡單配置之後即可使用。 要把Spark應用程式提交到Yarn執行,首先需要配置HADOOP_CONF_DIR或者YARN_C

Spark學習筆記初識Spark

=。= // 將users中的vertex屬性新增到graph中,生成graph2 // 使用joinVertices操作,用user中的屬性替換圖中對應Id的屬性 // 先將圖中的頂點屬

深入解析分散式系統的事務處理經典問題及模型(轉載分享)

編者按:資料服務的高可用是所有企業都想擁有的,但是要想讓資料有高可用性,就需要冗餘資料寫多份。寫多份的問題會帶來一致性的問題,而一致性的問題又會帶來效能問題,這就會陷入一個無解的死迴圈!這裡所謂資料一致性,就是當多個使用者試圖同時訪問一個數據庫時,如果它們的事務同時使用相同的資料,可能會發生以下四種情況:

第4課Spark Streaming的Exactly Once的事務處理

本期內容: Exactly once 輸出不重複 Exactly once 1,事務一定會被處理,且只被處理一次; 2,輸出能夠輸出且只會被輸出。 Receiver:資料通過BlockManager寫入記憶體+磁碟或者通過WAL來保證資料的安全性。WAL機制:寫資料

spark制之五使用說明

建表 pan schemardd 特性 -s map data div popu 背景 spark-shell是一個scala編程解釋運行環境,能夠通過編程的方式處理邏輯復雜的計算,但對於簡單的類似sql的數據處理,比方分組求和,sql為”selec

kafka(六)spark streaming對接,spark streaming接收kafka資料來源

1.功能實現 spark streaming從kafka接收資料,有兩種方式,receiver和direct兩種方式。 2.pom依賴 針對kafka_2.10-0.8.2.1版本         <!-- https

Spark修煉之道(進階篇)——Spark入門到精通第十三節 Spark Streaming—— Spark SQL、DataFrame與Spark Streaming

主要內容 Spark SQL、DataFrame與Spark Streaming 1. Spark SQL、DataFrame與Spark Streaming import org.apache.spark.SparkConf import org

實時流Streaming大資料Storm,Spark和Samza

當前有許多分散式計算系統能夠實時處理大資料,這篇文章是對Apache的三個框架進行比較,試圖提供一個快速的高屋建瓴地異同性總結。 Apache Storm   在Storm中,你設計的實時計算圖稱為toplogy,將其以叢集方式執行,其主節點會在工作節點之間分發程式碼

Spark Streaming大規模流式資料處理

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。  複雜的批量資料處理(batch data processing),通常的時間跨度

Spark定製班第1課通過案例對Spark Streaming透徹理解三板斧之一解密Spark Streaming另類實驗及Spark Streaming本質解析

package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seco

Spark定製班第29課深入理解Spark 2.x中的Structured Streaming內幕

本期內容: 1. 新型的Spark Streaming思維 2. Structured Streaming內幕 Spark 2.0 仍有bug,不適合於生成環境。只用於測試。 Spark 2.X提出了continuous application(連續的應用程式)的概念,非

Spark學習——Spark Streaming大規模流式資料處理

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於資料分析的軟體棧。從它的視角來看,目前的大資料處理可以分為如以下三個型別。  複雜的批量資料處理(batch data processing),通常的時間跨度

第124課Spark Streaming效能優化通過Spark Streaming進行裝置日誌監控報警及效能優化

通過Spark Streaming進行裝置日誌監控報警及效能優化 1、Spark Streaming進行裝置監控及報警 2、Spark Streaming進行裝置監控效能優化 ELK Stack:一整套開源的日誌處理平臺解決方案,可以集日誌的採集、檢索、視