1. 程式人生 > >Kafka原始碼深度解析-序列9 -Consumer -SubscriptionState內部結構分析

Kafka原始碼深度解析-序列9 -Consumer -SubscriptionState內部結構分析

在前面講了,KafkaConsumer的一個重要部件就是SubscriptionState,這個部件維護了Consumer的消費狀態,本篇對其內部結構進行分析。

2種訂閱策略

在第1篇講過,consumer可以自己指定要消費哪個partition,而不是讓consumer leader自動分配,對應的,也就是呼叫
KakfaConsumer::assign(List partitions)函式。

另外1種策略是呼叫subscrible,只指定要消費的topic,然後由前面所講的coordinator協議,自動分配partition。

下面的SubscriptionState的結構,就反映了這2種不同的策略:

public class SubscriptionState {
    //該consumer訂閱的所有topics
    private final Set<String> subscription;

    //該consumer所屬的group中,所有consumer訂閱的topic。該欄位只對consumer leader有用
    private final Set<String> groupSubscription;

    //策略1:consumer 手動指定partition, 該欄位不為空
    //策略2:consumer leader自動分配,該欄位為空
private final Set<TopicPartition> userAssignment; //partition分配好之後,該欄位記錄每個partition的消費狀態(策略1和策略2,都需要這個欄位) private final Map<TopicPartition, TopicPartitionState> assignment; 。。。

這裡一個關鍵點:策略1和策略2是互斥的,也就是說,如果調了assign函式,再調subscrible,會直接拋異常出來:

public void subscribe(List<String> topics, ConsumerRebalanceListener listener) 
{
 if
(listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); if (!this.userAssignment.isEmpty() || this.subscribedPattern != null) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); //關鍵點 this.listener = listener; changeSubscription(topics); }

2個offset

在前面我們講了,一個TopicPartition其實有2個offset,一個是當前要消費的offset(poll的時候),一個是消費確認過的offset。

因此在上面的TopicPartitionState這個結構中,有2個欄位:

//SubscriptionState中的欄位
 private final Map<TopicPartition, TopicPartitionState> assignment;

//TopicPartitionState內部結構
    private static class TopicPartitionState {
        private Long position;  //欄位1:記錄當前要消費的offset
        private OffsetAndMetadata committed; //欄位2:記錄已經commit過的offset
        ...
    }

public class OffsetAndMetadata implements Serializable {
    private final long offset;
    private final String metadata; //額外欄位,可以不用。比如客戶端可以記錄哪個client, 什麼時間點做的這個commit
    ...
 }

其中欄位1是在上面Fetcher的第3步 fetchedRecords裡面進行更新的:

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.subscriptions.partitionAssignmentNeeded()) {
            return Collections.emptyMap();
        } else {
            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
            throwIfOffsetOutOfRange();
            throwIfUnauthorizedTopics();
            throwIfRecordTooLarge();

            for (PartitionRecords<K, V> part : this.records) {
                if (!subscriptions.isAssigned(part.partition)) {
                    log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
                    continue;
                }

                long position = subscriptions.position(part.partition);
                if (!subscriptions.isFetchable(part.partition)) {
                    log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
                } else if (part.fetchOffset == position) {
                   //關鍵:計算下1個offset
                    long nextOffset = part.records.get(part.records.size() - 1).offset() + 1;
                    ...

                    //更新SubscriptionState中的欄位1
                    subscriptions.position(part.partition, nextOffset);
                } else {

                    log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
                            part.partition, part.fetchOffset, position);
                }
            }
            this.records.clear();
            return drained;
        }
    }

欄位2,顯然是在手動commit或者自動commit之後,進行更新(關於這2種commit策略,前面已經講述)

總結

結合序列8,此處總結一下consumer的幾個方面的策略:

(1)assign vs. subscribe (手動指定partition vs. 自動為其分配partition)

(2)手動指定初始offset(seek) vs. 自動獲取初始offset(傳送OffsetFetchRequest請求)

(3)手動消費確認 vs. 自動消費確認(AutoCommitTask)