1. 程式人生 > >Kafka: Exactly-once Semantics

Kafka: Exactly-once Semantics

https://www.confluent.io/blog/enabling-exactly-kafka-streams/

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

 

Exactly Once Delivery and Transactional Messaging in Kafka

https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#

 

Overview

Kafka stream其實就是重用的samza,流pipeline上的所有節點都是解耦合的,所以所有節點的snapshot和恢復策略都是local的。

其實Global或local的checkpoint策略沒有好壞之分,是全域性還是區域性,關鍵是在哪裡replay資料

如果你只能在source去replay資料,那麼就必須要採用global的snapshot,否則無法保證全域性一致

但是如果我們可以在每個處理節點去replay資料,那就沒有必要做global snapshot,

而kafka天然作為replay資料的基礎設施,如果pipeline中的每個步驟都要kafka來串聯,那麼就可以簡單通過local snapshot來保障一致性

這個就是當年samza做的,但是他只能保障at-least once,而不能保證exactly once

比如下面的場景,就會導致資料重複,

一種是topic B已經寫成功,但是返回的ack丟了,或是process收到ack後,沒來得及更新offset就crash了

那麼如何才能保證exactly once?

這裡就要引入transaction的概念,transaction關鍵就是原子性,所以要保證下面的資料是被原子更新的

 

 這裡的state update關鍵是記錄下change log,這個在samza就是這樣的機制

Second of all, in Kafka Streams state updates can also be translated as a sequence of change capture messages.
Here is why: in the Kafka Streams library, all state stores capture their updates by default into some special Kafka topics called the changelog topics

.
Each store keeps its updates in a separate changelog topic: whenever an update is applied to that store, a new record capturing this update will be sent to the corresponding changelog topic. A state store’s changelog topic is highly available through replication and is treated as the source-of-truth of the state store’s update history. This topic can hence be used to bootstrap a replica of the state store in another processor upon load balancing, fault recovery, etc.

我們通過changelog可以恢復出state,所以state update可以看成是將update寫入changelog topic

 

所以上面的transaction可以看成是,對多個topic的partition的原子寫入

 

With the transactions API, we can enable producer clients to atomically send to multiple Kafka topic partitions. Messages written to multiple Kafka topics within the same transaction will be readable by consumers as a whole when the transaction is successfully committed, or none of them will not be readable at all if the transaction is aborted. By using this mechanism, Kafka Streams can ensure that records are sent to to the sink topics, the changelog topics, and the offset topics atomically.

並且這裡在這個transaction成功前,下一級的consumer是無法看到任何這個transaction內的tuple的,這樣保證資料只會被讀一遍

 

為了完成去重,所有的message都會有唯一的標識,有如下資料組成,

  • PID => int64,代表來自哪個producer

  • Epoch => int16,防止舊的殭屍producer

  • Sequence number => int32,每個producer對於每個partititon都會維護seq,保證message在partition層面是按順序處理

這樣對於第一種fail的場景,topic B已經完成寫入,但是返回的ack失敗了

那麼process會重新發送,但這個時候,topic B通過pid和seq就可以知道這個資料是dup的 

對於第二種fail的場景會複雜一些,process掛了,那麼就是更新offset失敗了,那麼就是整個transaction都失敗了,這樣coordinator會abort這個transaction

abort操作會將這個transaction所造成的影響都回滾掉,就是這部分資料會被標識為abort,即不可見

這樣由於上次offset沒有更新,繼續上次replay資料,就能保證資料exactly once

 

Dataflow

1. 首先通過任意brokers找到transaction coordinator

2. Producer需要獲取PID

producer是通過initPidRequest來獲取PID,兩種cases,

第一種使用者沒有配置producer的transaction id,那麼這種情況,可以從任意broker獲取隨機的PID,沒必要經過transaction coordinator;這樣在這次session內這個producer可以做到exactly once語義,但是一旦掛掉,failover後就無法恢復,因為下次他獲取的隨機PID是和當前的PID對不上的
第二種使用者配置了producer的transaction id,那麼TransactionalId和transaction timeout都會通過InitPidRequest傳送到transaction coordinator
coordinator會首先生成PID,並把TransactionalId和PID的對應關係寫到transaction log,這樣後面無論你producer怎麼重啟,相同的TransactionalId都會得到相同的PID
然後coordinator會把這個PID的epoch加一,以防止殭屍producer;
最後把這個PID所對應的所有未完成的transaction完成掉,rolls forward or rolls back

這裡transaction log是用於transaction coordinator做HA恢復,新的coordinator可以replay transaction log來恢復所有transaction當前的狀態

3. producer開始新的transaction

通過beginTransaction(),這裡做的僅僅是producer在local標識新的transaction開始

4. 開始consume-transform-produce,真正的transaction過程,分成多個步驟,

4.1 當第一次往TopicPartition傳送資料時,producer會發送AddPartitionsToTxnRequest到coordinator,coordinator會記錄下這個partition到transaction log中,因為後面需往相應的partition中寫入 commit or abort markers來標識transaction完成;並且當coordinator第一次收到AddPartitionsToTxnRequest,它才認為transaction開始,會啟動一個transaction timer,用於transaction超時。

4.2 ProduceRequests,開始往partition去produce資料message,message帶有PID, epoch, and sequence number,用於去重

4.3 Producer會呼叫sendOffsets API,傳送AddOffsetsToTxnRequests到transaction coordinator,request中包含需要commit的offset和consume group id,coordinator會把這個資訊記錄到transaction log中

4.4 Producer在sendOffsets中,繼續傳送TxnOffsetCommitRequest 到consumer coordinator to persist the offsets in the __consumer_offsets topic。

5 完成transaction
通過呼叫producer的commitTransaction or abortTransaction methods,無論呼叫哪個都是往transaction coordinator傳送EndTxnRequest,通過欄位標識是commit還是abort
coordinator收到這個request後,做如下操作,

  1. 往transaction log寫入 PREPARE_COMMIT or PREPARE_ABORT message 

  2. coordinator通過WriteTxnMarkerRequest往之前註冊的所有partition裡面寫入COMMIT (or ABORT) markers 

    每個broker收到request後,會往相應的partition中寫入 COMMIT(PID) or ABORT(PID) 控制訊息,稱為marker,以表示給定PID發來的資料是應該被deliver或drop;
    Consumer會buffer某個PID傳送過來的message,直到收到commit或abort,才判斷將buffer的資料傳送還是丟棄
    這個過程對於__consumer_offsets也同樣適用,這個內部topic也會寫入commit或abort marker,最終consumer coordinator來判斷是commit還是ignore這次offset更新

  3. 往transaction log寫入 COMMITTED (or ABORTED) message

 

 

1. Finding a transaction coordinator -- the FindCoordinatorRequest

Since the transaction coordinator is at the center assigning PIDs and managing transactions,the first thing a producer has to do is issue a FindCoordinatorRequest (previously known as GroupCoordinatorRequest, but renamed for general usage) to any broker to discover the location of its coordinator. Note that if no TransactionalId is specified in the configuration, this step can be skipped.

 

2. Getting a producer Id -- the InitPidRequest

The producer must send an InitPidRequest to get idempotent delivery or to use transactions. Which semantics are allowed depends on whether or not the transactional.id configuration is provided or not.


2.1 When a TransactionalId is specified

After discovering the location of its coordinator, the next step is to retrieve the producer’s PID. This is achieved by sending an InitPidRequest to the transaction coordinator.

The TransactionalId is passed in the InitPidRequest along with the transaction timeout, and the mapping to the corresponding PID is logged in the transaction log in step 2a. This enables us to return the same PID for the TransactionalId to future instances of the producer, and hence enables recovering or aborting previously incomplete transactions.


In addition to returning the PID, the InitPidRequest performs the following tasks:

  1. Bumps up the epoch of the PID, so that any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.

  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer.

 

The handling of the InitPidRequest is synchronous. Once it returns, the producer can send data and start new transactions.


2.2 When a TransactionalId is not specified

If no TransactionalId is specified in the configuration, the InitPidRequest can be sent to any broker. A fresh PID is assigned, and the producer only enjoys idempotent semantics and transactional semantics within a single session.


3. Starting a Transaction -- the beginTransaction API

The new KafkaProducer will have a beginTransaction() method which has to be called to signal the start of a new transaction. The producer records local state indicating that the transaction has begun, but the transaction won’t begin from the coordinator’s perspective until the first record is sent.

4. The consume-transform-produce loop

In this stage, the producer begins to consume-transform-produce the messages that comprise the transaction. This is a long phase and is potentially comprised of multiple requests.


4.1 AddPartitionsToTxnRequest

The producer sends this request to the transaction coordinator the first time a new TopicPartition is written to as part of a transaction. The addition of this TopicPartition to the transaction is logged by the coordinator in step 4.1a. We need this information so that we can write the commit or abort markers to each TopicPartition (see section 5.2 for details). If this is the first partition added to the transaction, the coordinator will also start the transaction timer.


4.2 ProduceRequest

The producer writes a bunch of messages to the user’s TopicPartitions through one or more ProduceRequests (fired from the send method of the producer). These requests include the PID, epoch, and sequence number as denoted in 4.2a.


4.3 AddOffsetsToTxnRequest

The producer has a new sendOffsets API method, which enables the batching of consumed and produced messages. This method takes a map of the offsets to commit and a groupId argument, which corresponds to the name of the associated consumer group.

The sendOffsets method sends an AddOffsetsToTxnRequests with the groupId to the transaction coordinator, from which it can deduce the TopicPartition for this consumer group in the internal __consumer_offsets topic. The transaction coordinator logs the addition of this topic partition to the transaction log in step 4.3a.


4.4 TxnOffsetCommitRequest

Also as part of sendOffsets, the producer will send a TxnOffsetCommitRequest to the consumer coordinator to persist the offsets in the __consumer_offsets topic (step 4.4a). The consumer coordinator validates that the producer is allowed to make this request (and is not a zombie) by using the PID and producer epoch which are sent as part of this request.

The consumed offsets are not visible externally until the transaction is committed, the process for which we will discuss now.


5. Committing or Aborting a Transaction

Once the data has been written, the user must call the new commitTransaction or abortTransaction methods of the KafkaProducer. These methods will begin the process of committing or aborting the transaction respectively.


5.1 EndTxnRequest

When a producer is finished with a transaction, the newly introduced KafkaProducer.commitTranaction or KafkaProducer.abortTransaction must be called. The former makes the data produced in step 4 above available to downstream consumers. The latter effectively erases the produced data from the log: it will never be accessible to the user (at the READ_COMMITTED isolation level), ie. downstream consumers will read and discard the aborted messages.

Regardless of which producer method is called, the producer issues an EndTxnRequest to the transaction coordinator, with a field indicating whether the transaction is to be committed or aborted. Upon receiving this request, the coordinator:

  1. Writes a PREPARE_COMMIT or PREPARE_ABORT message to the transaction log. (step 5.1a)

  2. Begins the process of writing the command messages known as COMMIT (or ABORT) markers to the user logs through the WriteTxnMarkerRequest. (see section 5.2 below).

  3. Finally writes the COMMITTED (or ABORTED) message to transaction log. (see 5.3 below).

 

5.2 WriteTxnMarkerRequest

This request is issued by the transaction coordinator to the the leader of each TopicPartition which is part of the transaction. Upon receiving this request, each broker will write a COMMIT(PID) or ABORT(PID) control message to the log. (step 5.2a)

This message indicates to consumers whether messages with the given PID should be delivered or dropped. As such, the consumer will buffer messages which have a PID until it reads a corresponding COMMIT or ABORT message, at which point it will deliver or drop the messages respectively.

Note that, if the __consumer_offsets topic is one of the TopicPartitions in the transaction, the commit (or abort) marker is also written to the log, and the consumer coordinator is notified that it needs to materialize these offsets in the case of a commit or ignore them in the case of an abort (step 5.2a on the left).


5.3 Writing the final Commit or Abort Message

After all the commit or abort markers are written the data logs, the transaction coordinator writes the final COMMITTED or ABORTED message to the transaction log, indicating that the transaction is complete (step 5.3 in the diagram). At this point, most of the messages pertaining to the transaction in the transaction log can be removed.

We only need to retain the PID of the completed transaction along with a timestamp, so we can eventually remove the TransactionalId->PID mapping for the producer. See the Expiring PIDs section below.

 

應用例子 

通過上面的流程,例子就很容易理解
public class KafkaTransactionsExample {
  
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
 
 
    // Note that the ‘transactional.id’ configuration _must_ be specified in the
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
 
    // We need to initialize transactions once per producer instance. To use transactions,
    // it is assumed that the application id is specified in the config with the key
    // transactional.id.
    //
    // This method will recover or abort transactions initiated by previous instances of a
    // producer with the same app id. Any other transactional messages will report an error
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will
    // require a new producer  instance. See the documentation for TransactionMetadata for a
    // list of error codes.
    producer.initTransactions();
     
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed
        // records as well
        // as an records produced as a result of processing the input records.
        //
        // We need to check the response to make sure that this producer is able to initiate
        // a new transaction.
        producer.beginTransaction();
         
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord);
        }
         
        // To ensure that the consumed and produced messages are batched, we need to commit
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
         
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
         
      
        // Now that we have consumed, processed, and produced a batch of messages, let's
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransaction();
      }
    }
  }
}