1. 程式人生 > >[spark-streaming,kafka] Exactly-once 語義實現設計文件

[spark-streaming,kafka] Exactly-once 語義實現設計文件

kafka 版本 0.8.x

spark 版本 1.3

文章連結址:

翻譯原因: 0.8 的 kafka 版本中, 所有 topic partition 的 offset 消費記錄集中儲存在 zookeeper 上,而 spark-streaming 中資料記錄到自己的 WAL 中,造成了 spark-streaming driver 資料傳輸記錄和 kafka 資料傳輸記錄分開儲存在兩個地方,對 exactly-once 的實現帶來了很大難度,這篇設計文件就是在這個背景下提出來的解決方法,其中的方法很值得學習與借鑑,另外作者在社群發完文件之後自己就就這當前的環境把程式碼實現了一遍,所以就這這篇文件 + 原始碼一起學習能加深對這裡問題的理解分析和解決。

------

Exactly-once + WAL-free Kafka Support in Spark Streaming 

Kafka 中 Exactly-once + WAL(write ahead log) 在 Spark Streaming 中的實現

Problem
問題描述
        Currently the best way to use Kafka with Spark Streaming is to use the exsiting KafkaUtils, and enable Write Ahead Logs in Spark Streaming.

        目前解決 Kafka Spark Streaming 二者之間資料傳輸的最佳方法均被封裝在 KafkaUtils 類中對外提供 API,且 Spark Streaming 中支援資料以  WAL 的方式寫入。

         There are two ways by which this Kafka integration of Spark Streaming can be improved -exactly-once semantics and not using the write ahread log in Spark Streaming.

           但 Kafka 和 Spark Streaming 二者資料傳輸而言仍有兩個地方待改進:訊息只傳輸一次(exaclty once)語義的實現方式和 在 Spark Streaming 中放棄資料通過 WAL 的方式寫入。 

           This design doc tries to solve both issues by implementing a new integration . 

           本篇設計文件試圖通過整合出一種新的用於 Spark Streaming 和 Kafka之間資料傳輸的模組來解決上述的兩點不足。

Eaxctly-once Semantics
Exactly-once 語義介紹
       Kafka Stream with the WAL enabled can avoid data loss and provide zero data loss of  all data records. 

       Kafka 在處理資料流時藉助 WAL 的資料寫入方式來防止資料丟失,

       However, it cannot still provide exactly-once semantics. 

       然而即便如此,Kafka 卻無法藉助 WAL 來實現 exactly-once 的語義

      The fundamental reason any "transfer" of records cannot be acknowledged by both Spark Streaming and Kafka in a signle transactional update. 

        exactly-once 語義很難實現本質上是因為構成資料流的 record 在資料傳輸過程中的進度記錄對於資料收發雙方 Spark Streaming 和 Kafka 而言無法通過單獨事務更新(這就造成了資料收發雙方記錄資料傳輸進度的不一致)

        When the driver / Kafka receiver fails, it may so happen that some data records have been saved in Spark Streaming(written to WAL, etc.) but the Kafka offsets have not been updated accrodingly due to the failure.

        例如(開始介紹上面所說的不一致)當 Spark Streaming driver 或是 Kafka 作為資料接收者掛掉時會有這樣一種情況:就是資料流的 record 在 Spark Streaming 被接收的已經寫到  WAL 中,但是 Kafka 這邊的 offset 卻因為程序掛掉沒有及時的更新(沒落盤持久化)

     In that case, Kafka will send the data again to the recovered Spark Streaming process. 

     接下來, Kafka 會將上次已經發送但沒記錄 offset 的資料再次傳送給恢復重啟的 Spark Streaming 程序中。

     And the key reason why this can happen is that the metadata information about the what has happen successfully received is stored in two places -- WAL in Spark Streaming and Zookeeper offsets in Kafka. 

    造成上述情況發生的關鍵因素便是記錄發生了什麼的 metadata 元資料被接收和記錄(持久化)在兩個不同的地方 -- Spark Streaming 的 WAL 檔案中 和 Kafka 的 Zookeeper 的 offset 中(也會持久化到 zk 的元資料檔案中) 

     These two places cannot be transactionally updated, leanding to inconsistencies and therefore duplicates.

     而這兩個地方檔案中的資訊無法通過事務的方式來同步更新,更新的不同步便造成了資料上的不一致,最終導致了相同的資料 record 因記錄的消費進度不一致而被重複傳送,這就破壞了 exactly-once 語義。

     The logical way to resolve this is to save the necessary metadata in only one place. Furthermore, for failure recovery, it needs to be, in a way such that the batches can be exactly once. 

     從上述 metadata 分隔兩地的問題符合邏輯的解決方法便是將 metadata 指定一個地點進行存放。 進一步說,對於失敗之後資料和任務的恢復而言,也得這麼做(metadata 存放在一個地方)只有這麼做才能保證批處理的資料被執行且僅被執行一次。

     In order to ensure that the Spark Streaming receives every record exactly-once, 

     為了實現 exactly once 語義我需要做的是保證 Spark Streaming 在接收資料時就僅接收一次才行。  

No use of WAL 
廢棄使用 WAL 的方式來實現 Exactly-once 語義的思路介紹
       Using the WAL for Kafka is not ideal because the data is getting replicated twice -- once for Kafka , and once for again for Spark Streaming WAL . 

       上述提到的 WAL 是在 Spark Streaming 中實現的,所以在 Kafka 中使用 WAL 來保證 exactly once 語義顯然是不行的,因為這樣的話還是會導致 metadata 的重複儲存,一份在 Kafka 的 WAL 日誌中,另一份在 Spark Streaming 的 WAL 日誌中。

    In fact Kafka iteself is a replicated log, we should ideally use Kafka to recover all the data in case of failures.

    事實上 Kafka 從其本身的設計而言它實現了日誌的冗餘存放,所以當計算出錯時 Kafka 是理想的用於資料恢復的中介軟體。 

Requirements 
需求(在分析了問題及問題出現原因之後,接下來對要做些什麼,達到什麼效果進行了詳細描述)
(需求一)Allow streaming applications to be created when every record from Kafka is effectively received exactly-once .
讓每個流計算應用 app 其中和重啟時每個 Kafka 資料流中的 record 被 app 有效接收一次。
Note the effectively. Each record may be read from Kafka multiple times but from the point of view of the computation, the record is received exactly once. 
注於上面提到的一詞‘有效地’,指的是 Kafka 中的每條記錄/record都是被多次讀取的,但站在計算的角度我們只要保證每個 record 下游僅接收到一次(即不會出現資料重複計算,保證資料結果正確)即可。 
Example: Suppose the series of records is 1,2,3 ... , 10, and the streaming application has to add them up. The expected answer is 55 despite any failures.  The system may read any of the records from Kafka multiple times ( to recover from failures), but the final answer still should be 55 as it they have been effectively read exactly once.
比如說:設想資料流中的 records 從 1 - 10 10個數字,實時計算運算元想要把這些資料依次相加得到計算和。理想情況下是無論發生什麼錯誤,計算結果都能得到 55 這個數值。在計算過程中為了恢復計算中途有可能出現的錯誤允許系統執行從 Kafka 中讀取任意多次,任意多個數據記錄 record,只要保證計算任務恢復之後得到的計算結果仍舊是 55 這個數值,這種仍叫做資料的有效只傳輸一次(exactly once)
Also note that we are not ensuring the exactly-once output of data(using output operations like foreachRDD) to external storage systems. This problem is specific to the application requirements, the type of update to the storage system (idempotent or not) and the abilities of the storage system ( transaction suport or not). Ensuring out-of-the-box end-to-end exactly-once semantics(i.e. including data store output operation) is not in the scope of this design. 
仍要注意的是,我們雖然能夠保證資料傳輸和計算過程中資料的 ‘exactly-once’ 的語義,但我們無法保證資料從記憶體寫入到外存的時候資料順序仍然是有序的,例如像是使用 foreachRDD 這種操作寫檔案這種。 這種記憶體到外存的情況需要分析計算運算元具體的環境及其業務場景而定,以及儲存系統是否支援冪等操作,以及是否具備支援事務操作的能力。 確保落盤的時候端到端,exactly-once 語義(包括資料落盤的方式)不在本設計文件討論和實現範圍內。


(需求二) Recover from driver failures without using Write Ahead Logs to save the data .
當 Spark Streaming 的 driver 計算失敗重啟的時候不會載入之前寫入的 WAL 檔案來恢復其資料
To recover from driver failures, the lastest batches must be exactly reconstructed after recovery. Those batches must have the exact same data in the same order as it was before failure. 
為了從上次 driver 計算的失敗中恢復過來,最後的批處理計算的資料必須恢復到上次計算的狀態。這些批處理計算中必須保證資料以及資料的順序和當時出錯時是完全一致的。
Proposed Solution 
提出的解決方法
      The basic idea is to treat Kafka like a durable filesystem source, rather than an ephemeral network source .

       最根本的解決方法就是將 Kafka 當作持久的檔案系統資料來源,而並非是提供短暫服務的網路資料來源。(即,將 Kafka = HDFS )

     After each batch interval, the input DStream generates a RDD which is defined by the range of Kafka offsets(i.e. offsets for each Kafka topic + partition) that define the set of records in that batch . 

     在批處理的中的每次間隔,資料流從 DStream 轉換成 RDD ,而這個 RDD 則是由 Kafka 中的 offset 所構成的,(offset 就是 Kafka 中的 topic + partition) 而 RDD 中的 offset 則是用來定義了每個批處理中處理的資料記錄 record 集合。

     Each partition in the RDD is defined by a Kafka topic+partition and a range of offsets. 

     RDD 中的每一個分割槽(partition)都是由 Kafka 中的 topic + partition 和 一個範圍內的 offset 所構成的。

     So there is a one-to-one correspondence between RDD partitions and Kafka topic + partitions.

    所以,在這裡不就是剛好的 RDD 中的分割槽和 Kafka 中的 topic + 分割槽的一一對應的銜接點嗎? 

   When the RDD is executed, for each partition, a Kafka consumers is created, that reads the Kafka data corresponding to the offset range.

   當 RDD 在計算時,會逐一地遍歷 partition 進行計算,每遍歷到一個 partition 時便會建立一個對應的 kafka consumer,在計算該 partition 的時候所建立的 kafka consumer 被用來從特定的 topic,對應範圍的 offset 中讀取資料(該資料參與該 partition 的計算)。

Failure Recovery 
失敗恢復
       These offset ranges are also saved as part of DStream checkpoints. 

      這些 offset 範圍在計算失敗時會被一併記錄到 DStream 的檢查點記錄檔案中

       This allows the RDDs containing Kafka data to be recreated( from the checkpoint offsets) after the driver recovers form failures, and the regenerated RDDs will have the same data as before.

        這就讓包含者 Kafka 中資料的 RDD 在 Spark Streaming driver 得以根據檢查點記錄檔案中的 offsets 數值得以重新被構建, 並且重建的 RDD 有著和 driver 失敗之前同樣的資料(及資料順序)

      And since all the transformed RDD generated from these RDDs are deterministic, the final transformed RDD will be deterministically generated and have the same value as it would have been without any failure. 

          並且,由於最後一步中的 RDD 所轉換而來的 RDD 中的數值都是確定的,最後通過轉換得到的 RDD 也必定是確定的,所以這就保證了恢復的 RDD 中的數值和原先的 RDD 是完全一樣的。

Performance 
效能
    Compared to the exsiting receiver based Kafka stream, this method can have higher throughput as records from different kafka topic + partitions are pulled through the cluster. 

    對比現有的 Spark Streaming 中處理 Kafka 資料流的 receiver 而言,這種解決方法在處理 Kafka 中不同的 topic + partition 的吞吐量會更高,因為資料是由 cluster 通過 pull 的方式獲取的。

   However , there can also be some loss in throughput as every partition needs to create a consumer and connect to Kafka. 

   然而仍有拉低吞吐率的地方:Spark Streaming 中的 RDD 是按照每個 partition 都會建立其自己的 consumer 並與 kafka 建立連線,連線過多會一定造成效能的損耗。 

   This may be an acceptable tradeoff for achieving exactly-once and no WAL usage .

   即便如此(效能開銷上有些損耗),但可以實現 exactly-once 語義並且不用 Spark-Streaming 的 WAL 這點效能損耗也是可以忍受的。  

Other Advantages 
其它優點
       Few other advantages in the long term/ 從 Kafka 發展長遠角度來看體現的優勢

      No receiver and no data stored in memory, so easy to dynamically scale resources.
      這種處理方法會讓資料接收者接收到的中間資料和傳輸資料及時落盤而不會常駐記憶體,使得動態擴充資源成為了可能。
Natural flow control/back pressure. With receivers, if data comes in faster than the system can process, the data keeps buffering in memory an can finally cause OOMs. With this method, data is pull only when the job using the data is executed. So no chance of OOMing because of temporary glitches in the flow.x .
這種處理方法提供了一種自然優雅的控流緩解壓力的方法。 對於現有的 receiver 而言是被動的接收資料,如果資料到來的速率高於系統能處理的速率,系統便會將無法顧及的資料存放到緩衝區記憶體中,資料不斷增多最終會因耗光記憶體緩衝區而引發 OOM 。 如果使用了這種方法通過主動的方式來從資料來源拉取資料,不會對系統及記憶體緩衝區造成過大的負擔,也不會引起 OOM 的發生
Other Concerns 
其它的一些考慮
Not using the Receiver API - This prevents this design from taking advantage of existing infrastructure of rate limiting, record counting, etc. This isn't such a problem in practivce, the existing PR uses the value of spark.streaming.receiver.maxRate to implement a per-partition maximum batch size for rate limiting. 
並沒有複用現有的 Receiver API, 而是提出瞭解決方法的新思路,這麼做無法複用現有設計架構中限流,record 計數這些方法。 但是在實際使用中並不構成問題,我們可以使用配置資訊中的 spark.streaming.receiver.maxRage 的這個引數來作為限制每個 RDD 中的 parition 每次最多處理的 batch 的大小以達到區域性限流的作用。 
Not updating the Kafka's zookeeper - Since the zookeeper is not used to store the offset, any monitoring tools that monitor Kafka by checking the offsets in zookeeper will not work . However, this can be fixed by the application developer committing offsets maintained in Spark Streaming to Zookeeper.
這種解決方法下,kafka 的消費進度 offset 數值將不會被寫入到 zookeeper 中進行存放,任何通過監控 zookeeper 中記錄的 offset 數值來監控 kafka 上資料消費進度的監控工具都不能用了。然而,這個問題可以通過在 spark streaming 計算運算元中由開發者自行實現將當前條的 offset 資料寫入到 Zookeeper 中來實現。
Multiple transformation on the input stream will unnecessarily pull the data multiple times from Kafka. This is not good. Word around is to call persist() on the input stream . Maybe that can be enabled autocatically for this new DStream.
這種設計方案下會導致在計算過程中,RDD 多次轉換過程中,每一次轉換都會不必要的從 Kafka 上游拉去一次資料(而不是從上次的 RDD 緩衝中獲取)。這樣做很不好,不過看了一圈唯一能起到作用就是在從上游 kafka 讀取資料的之後,通過呼叫 persist() 把資料緩衝到磁碟上,這樣每次就是從磁碟上讀取而不是通過連線藉助 kafka consumer 來讀取資料。 或許在新的 DStream 中可以自動支援這種呼叫 persist() 的方法而不是人為顯示的呼叫來實現會好些。
Programming Interfaces 
程式設計 API 描述
        Here are the public API that we expose to the application developer. 

        接下來我們介紹基於本篇設計文件中的功能將會給開發者開放出去的 APi。

        The existing kafka functionality is expressed through the object org.apache.spark.streaming.kafka.KafkaUtils. 

        Spark Streaming 中現有的處理 kafka 的方法都封裝在 KafkaUtils 中對外提供 API 訪問介面。

       We can add a new set of methods in the same class. Here are the proposed methods.

       我們也可以在相同類中加入一系列新方法,新加入的方法描述如下

1. createExactlyOnceStream(context, kafkaParams, topicSet):
DStream[String, String]
@param context: StreamingContext 
@param kafkaParams: Map[Stirng,Stirng] of Kafka key value properties 
@param topicSet: a set of Kafka topics to consume (all partitions will be consumed) 
@return DStream[Stirng,Stirng]


2. createExactlyOnceStream[K,V, KD<:Decoder[_], VD<: Decoder[_], R](context, kafkaParams, topicPartitionOffsetMap, messagehandler):DStream[R]
@param context: StreamingContext
@param kafkaParams: Map[Stirng,String] of Kafka key-value properties 
@param topicPartitionOffsetMap- Map[TopicAndPartition, Long] of per-topic per-partition offset to start from 
@param messagehandler : MessageAndMetadata[K,V]=> R intercepting function 
@return DStream[R]

      Other points to consier 

      需要考慮的其它一些因素

           1. Starting point - Whether to strart from the earliest or latest Kafka offset should be configurable in the same as it was before -- through kafka params. In our case, we have to write explicit code to read it and accordingly determine our own starting offsets. 

              1. 對於開始消費資料的起點,無論是從 kafka offset 最開始或是最新的資料開始消費這些都必須寫道配置檔案--並通過建立kafka  consumer 例項時所使用的引數來控制。 在我們的這種應用場景下,我們定製化的實現了每次讀取資料是從 kafka offset 最開始進行讀取。

              2. Along with methods for creating streams, we also want to expose utility methods to create RDDs for reading messages from Kafka 

            2. 和建立流資料 API 一併,我們也希望能暴露用於從 kafka 讀取資料構建 RDD 的 API 出來。


createRDD[K,V, KD<: Decoder[_], VD<:Decoder[_], R] ( context, kafkaParams, batch, messageHandler)
@param context: SparkContext 
@param kafkaParams: Map[String, String] of Kafka key-value properties
@param batch: Array[OffsetRange] see below
@param messagehandler: MessageAndMetadata[K,V] => R intercepting function 

   To expose kafka offsets without exposing too much implementation detail, one possibility is a public interface, OffsetRange. 

    為了將 kafka offset 暴露出來的同時又不暴露過多的實現細節,可通過建立公共介面 OffsetRange 的方法來解決。

    This has basically the same accessors as KafkaRDDPartition, for defining the range of offsets for a given topic/partition, but does not actually extend rdd Partition. 

     這個 OffsetRange 有著與 KafkaRDDPartition 相同的父類,目的是為了給指定了 topic 和 partition 的kafka 資料來源設定 offset 的訪問範圍,但是 OffsetRange 卻並未繼承 rdd 的 Partition 這個父類。

     There's public static constructor of a trivial implementation of that interface so that people can use the createRDD method .

      又一個提供了靜態構造方法的介面,通過實現該介面開發者便可呼叫其中的 createRDD 這個方法了。

Another interface, HasOffsetRanges, defines objects that have an accessor for a collection of offset ranges. 

 另一個介面, HasOffsetRanges, 這個介面中提供了一個用來通過 collect 方式獲取 offset  ranges 的父類方法

  KafkaRDD implements that interfaces. 而 KafkaRDD 則實現了該介面。

   That way, people that care about offsets should be able to cast an RDD that happens to be KafkaRDD to something they can get offset from .

     通過這種方法,關心 offset 數值的使用者可以將剛好是 KafkaRDD 例項的 RDD 轉換成某種他們能獲取 offset 數值的 RDD 型別。

Implementations Details 
實現細節
     There are 4 classes invoked: KafkaRDDPartition, KafkaRDD, DeterministicKafkaInputDStream(this needs a better name, perhaps ExplicitkafkaInputDStream), and KafkaCluster. Note that all these classes are private to Spark .

       針對本篇設計文件,我們主要提供了 4 個類他們分別是 KafkaRDDPartition, KafkaRDD,DeterministircKafkaInputDStream(顯然這個類需要一個更貼切的類名,例如 ExlicitkafkaInputDStream)以及 KafkaCluster。 在這裡需要注意的是這些 class 對於 Spark 而言都是私有型別

 org.apache.spark.streaming.kafka.KafkaRDD
    Constructor takes kafka configuration parameters , an array of KafkaRDDPartition, and a function to convert Kafka MessageAndMetadata objects to the desired item type. 

         KafkaRDD 中的構造器用來接收配置引數,該配置引數又 KafkaRDDPartition 型別的陣列構成,和傳入對應的將 KafkaMessageAndMetadata 轉換成 item 型別的方法引數構成

        The compute implementation for each partition uses a Kafka simple consumer to request only the message in the specified range of offsets. 

       對於每個 partition 中所執行的計算便是通過 kafka consumer 讀取 offset 在特定範圍內的訊息。

       Worker or Kafka leader failure is handled by normal spark task retires. On the first attempt, the specified preferred host will be used on future attempts, the leader is looked up .

      計算中的 Worker 和 Kafka leader 中的失敗會被通過普通的 spark task  失敗重試來處理。 在第一次失敗重新計算嘗試中, 會傾向選取特定的 host 用作重試計算的執行機器,並且在後續的多次嘗試中都是通過這種方式來獲取 host 的,而 Kafka 的 leader 則是通過查詢來獲取的。

    This class is public because it is useful on its own, for non-streaming batch jobs that consume from Kafka. 

    KafkaRDD 類是公共類,因為無論是對於 streaming 還是 non-streaming 只要是從 Kafka 消費資料構建 RDD 時都會用到它。

org.apache.spark.streaming.kafka.KafkaRDDPartition

Simple class storing the kafka topic id partition id, inclusive starting offset, exclusive ending offset, and preferred host/port.

KafkaRDDPartition 類的實現很簡單,它主要用來存放 topic id partition id 對於 offset 處理的範圍是 [begin offset, end offset ), 並且可以指定RDD 連線遠端的 Kafka 傳輸資料的主機及埠號碼


org.apache.spark.streaming.kafka.DeterministicKafkaInputDStream

Constructor is similar to KafkaRDD, but instead of taking an array of KafkaRDDPartition describing a single batch, it takes the starting per-TopicAndPartitions that the stream of batches will begin at .

DeterministicKafkaInputDStream 的構造方法和 KafkaRDD 類的類似,但是 DeterministicKafkaInputDStream 並沒有基於用來描述一個單獨批處理計算的任務也就是由 KafkaRDDPartition 所構成的陣列來構建資料流,而是基於每個批處理計算作業中的開頭項(通過 TopicAndPartition來描述)來構建資料流。

The compute implementation for each valid time looks up the latest leader offsets for each kafka partition. 

在執行計算的時候會在有效是假中查出 kafka 中各個 leader 在每個 kafka 分割槽上的最新 offset 的數值。 

It constructs a KafkaRDD spanning from the prior batch's ending offsets(or the user specified starting point, for the first batch) to the lastest leader offsets. 

DeterministicKafkaInputStream 從一個跨度從使用者指定的最開始的一點,就是首次計算批處理的 offset 到 kafka leader 中最新的 offset 對應的 kafka 上的 partition 的資料構建了對應的 RDD 結構。

Leader failure is handled by a number of retries specified in the constructor ( I'm not aware of an existing spark or kafka configuration that makes sense to use for this number instead)

當 Leader 處理出現錯誤時,會通過在構造期中實現重試(我不確定已有的 spark 或是 kafka 的配置項中已有功能能實現相同的功能)

Worker failure is handled as per KafkaRDD. Driver failures for jobs that do not require windowing can be most safely handled by client code restarting the job, specifying their last committed offsets as the starting point. 

Worker 上的計算失敗是以 KafkaRDD 為粒度進行處理恢復的。 對於 Driver 中失敗且無需視窗的 job 而言完全可以由 client 端的程式碼重啟並安全執行,特別是已知這些 job 所處理的起始位移數值的時候。

For jobs that do require windowing, a best effort is made to save generatedRDD partition definitions t the checkpoint.

對於那些需要視窗計算的任務而言,系統會盡最大努力構建出 generatedRDD 分割槽用來將資訊記錄到檢查點鐘

This does mean that windowing info will be lost if the checkpoin is not recoverable.

這就說明對與視窗資訊而言如果記錄它的檢查點檔案損壞不可恢復的話,window 中的資訊也會丟失。

This class is public to allow client code to subclass it to provide different batch generation policies.

這個 DeterministicKafkaInputDStream對外是公共的,這就允許了 client 程式碼繼承該類並根據自己實際的情況來實現不同的計算策略。 

For instance, I have a use case that needs to generate batches for offsets representing some delayed point in the past. 

例如,有這樣一個應用場景: 我需要獲取過於延遲計算批量資料的 offset 數值
org.apache.spark.streaming.kafka.KafkaCluster

Constructor takes kafka configuration parameters

 KafkaCluster 建構函式中存放的是 kafka 的配置引數

This classis convenience wrapper around the Kafka api for metadata .

KafkaCluster 類提供的遍歷之處便是它將 Kafka 中對元資料操作的方法封裝成 API 方便呼叫。 

It is used internally by KafkaRDD and DeterministicKafkaInputDStream. 

KafkaCluster 類被 KafkaRDD 和 DeterministicKafkaInputDStream 在內部呼叫

This is not exposed to the public API in this PR, but may be exposed in the future as a Developer API.

所以說上述提到的 API 並非是對外提供的公共方法,而是私有方法,或許在不就得將來會作為開發者 API 對外提供

It provides methods for 

KafkaCluster 方法中實現瞭如下的功能

- connecting a SimpleConsumer

-  建立到遠端 kafka consumer 的連線

- getting partition metadata, including finding the leaders for partitions(s)

- 讀取分割槽的元資料,包括查詢 leader 的分割槽資料

- getting the leader's offsets 

- 獲取 leader 的 offset 

For clients that are using idempotent storage to implement exactly-once ( for whom lesser guarantees are ok) 

Kafka's api for zookeeper backed client offset storage may be useful .

對於通過冪等儲存來實現 exactly-once 語義的客戶端而言, 通過呼叫 Kafka 的 API 來將消費的資料的 offset 存放到 zookeeper 上作為備份是很有用的。

Kafka Cluster provides methods for 

對於這個方面 Kafka Cluster 提供瞭如下兩種方法

-getting consumer offsets 

- 讀取 consumer 的 offset

-setting consumer offsets

-設定 consumer 的 offset  

Alternate Design (not proposed)

互動設計(考慮欠妥)

To implement the same exactly once semantics using the Receiver API ( but not using WAL), one can potentially extend the 

current ReliableKafkaReceiver. 

如果想通過已有的 Receiver API (沒有使用 WAL 的 API)來實現 exactly once 的語義也是可行的,使用者可以繼承現有的 ReliableKafkaReceiver 來實現。

However there is a more complicated design, as shown below.

然而這樣做也會尤其複雜的地方,如下所示

For the purpose of discussing this alternate design, here is how the exsiting ReliablekafkaReceiver works with WAL .

以討論互動設計為初衷,接下來我們來介紹下 ReliablekafkaReceiver 是如何實現 WAL 的

1. Kafka Reciver receives data from Kafka from multiple topics and partitions

1. Kafka Receiver 從 kafka 中的多個 toipic 和分割槽中接收資料

2. The data from multiple Kafka partitions are interleaved together into blocks of data

2. 來自於 kafka 多個分割槽中的資料通過交叉讀取回去到一起成為了資料塊

3. After a block's data + metadata is committed to WAL, the corresponding Kafka offsets are committed to Kafka/Zookeeper.

   Note that there are two independent places that has to commit information about the progress of the data stream.

    在將塊資料及其元資料記錄條記錄至 WAL 中之後, 對應的 Kafka offset 數值被提交記錄至 Kafka 或是 Zookeeper 上的檔案中進行記錄。 需要記住的是,一共有兩個獨立的地方用來提交記錄資料流消費的進度。

4. On failure rcovery

4. 關於計算失敗後的恢復

  -- Past, reliably receive data is recovered from WAL using the metadata 

  -- 計算失敗之前,已經被確認接收的資料藉助於元資料從 WAL 中恢復

 -- The restarted reciever starts receiving data from the offset saved in kafka/Zookeeper, which re-receives data that was reliably saved before failure. This ensures that no data is lost .

 -- 重新啟動之後的資料接收端根據記錄在 kafka 自身或是 zookeeper 上的資料 offset 來繼續接收資料,在這裡記錄到 Kafka 或是 Zookeeper 上的用來恢復資料傳輸的 offset 資料在上次失敗之前被記錄,目的是確保計算失敗不會造成傳輸資料的丟失。

The updated receiver implementation will work as follows:

升級之後的資料接收端會按照如下的流程來執行:

New Kafka Receiver will receive data from Kafka multiple topics and partitions.

新的資料接收端會從 Kafka 中的多個 topic 中的多個分割槽中讀取消費資料

However this data is inserted into a block in a deterministic interleaving so that the interleaving can be used to recreate the block (upon recovery).

從多個 topic 和分割槽中所讀取的資料被按照特定的順序交叉寫入到一個檔案塊中,之所以按照這種方式來寫入資料目的是為了在恢復截斷能夠通過重新構建 block 的方式來恢復期間丟失的資料

This interleaving informaion can be like - Array (( topic1, partition1, offset-range1), (topic2, partition2, offset-range2) , ...)

這種交叉寫入的資料格式類似於一種陣列,陣列中的每個元素分別有 kafka 中的 topic, 分割槽,以及 offset 對應的範圍 來描述。

The data of the block is NOT saved to WAL. But the block metadata includes the interleaving information and is checkpointed reliably.

上述介紹的資料塊並不會被通過 WAL 的方式來進行記錄。 且描述了交錯寫入資訊的格式被記錄到 metadata 構成的檔案塊中通過檢查點的方式被可靠地記錄下來。

On  failure recovery:

有關錯誤恢復

Past data is recovered from kafka using offset interleaving informtion is saved metadata.

從 kafka 所傳輸的資料資料會通過記錄了 offset 偏移量及交錯記錄資訊的元資料內容來恢復資料。

The restarted receiver starts resets kafka offsets to the lastest  offset in the metadata and the restarted receiver starts receiving from those offsets.

被重啟後的 kafka 資料接收端通過將記錄在 metadata 中的 offset 作為開始讀取資料的最新偏移量來讀取 kafka 的資料

This ensures that there is no duplication ( as there is only one authoritative location whether progress information is committed).

如此以來就不會造成資料的重複傳輸(因為對於資料來源和資料下游而言無論處理的資料資訊 offset 是否被條都只有一個地方用來記錄 offset 的數值)

The implementation of this solution will involve the following changes. 

上述的這種解決方法會涉及到以下幾個地方的變動。

Each of this step is non-trival complexity and therefore chances of bugs. 

上述的每一個步驟所牽涉的功能點變動都很複雜,所以有可能會帶來一些未知的 bug

Update the way the Receiver interleaves data from different topic + partitions, and keep track of offsets.

通過更新來自 kafka 不同 topic 和不同分割槽中資料交錯的方式,並追蹤記錄 offset 的數值變化

Add a communication channel between Receiver and driver to send the offset interleaving information for every block of received data.

通過在資料接收端和 driver(spark-streaming)二者之間建立一個用來通訊的通道為傳輸每個成功接收到的資料塊中的資料偏移  量 

This is so that the driver can save that metadata. 這也就是說 driver 通過通訊通道獲取 metadata 之後會將其保留下來。

Simple to add, probably will definitely be added in the future. But again chances of errors.

雖然上述的這種功能容易理解且方便實現,並且未來也有可能合併到主分支中,但是稍稍有些變化的話這種解決方案就會暴露出一系列問題。

On failure recovery 

關於失敗恢復

Recreate RDDs with the recovered interleaving information and pull the data from different topics and partitions.

關於如何重構 RDD -- 可以通過記錄下來的元資料中獲取資料交錯資訊,根據這些快取資訊(topic,partition, offset-range)來從遠端 kafka 指定的 topic 分割槽和 offset 的指定範圍中讀取對應的資料

Restart receiver from last recovered offset for each partition.

從最新恢復的每個分割槽的 offset 來重啟資料接收端 接收資料。

Both of these are pretty complicated and brittle.

上述提出的兩種解決方法都是相當複雜且解決手段並不可靠。

Overall, this design , although possible, is significantly more complicated and brittle.

綜上所述,這種設計方案,雖然可行,但是實現起來相當複雜且有可能存在一些未知的坑。