1. 程式人生 > >Kafka—Storm之KafkaSpout和KafkaBolt原始碼解釋

Kafka—Storm之KafkaSpout和KafkaBolt原始碼解釋

轉載來自:http://blog.csdn.net/ransom0512/article/details/50497261

另一個比較詳細的KafkaSpout詳解見:http://www.cnblogs.com/cruze/p/4241181.html

Storm-Kafka原始碼解析

說明:本文所有程式碼基於Storm 0.10版本,本文描述內容只涉及KafkaSpout和KafkaBolt相關,不包含trident特性。

Kafka Spout

KafkaSpout的建構函式如下:

public KafkaSpout(SpoutConfig spoutConf) {
    _spoutConfig = spoutConf;
}
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

其構造引數來自於SpoutConfig物件,Spout中用到的所有引數都來自於該物件。該物件引數說明如下:

SpoutConfig

SpoutConfig繼承自KafkaConfig。兩個類內部所有引數及說明如下:

/**
 * Kafka地址和分割槽關係對應資訊
 * 在kafka的分割槽資訊和地址資訊都很清楚的情況下,可以以直接使用StaticHosts
 * 但是該物件引數很難構建,需要的資訊很多,所以我們一般情況下並不使用它。
 * 我們主要用的是ZKHosts的例項。可以在其中設定Zookeeper地址等資訊,然後動態獲取kafka元資料
 * ZKHost的引數資訊見下面一段。
 * 必選引數
 **/
public final BrokerHosts hosts; /** * 要從kafka中讀取的topic佇列名稱 * 必選引數 **/ public final String topic; /** * Kafka的客戶端id引數,該引數一般不需要設定 * 預設值為kafka.api.OffsetRequest.DefaultClientId() * 空字串 **/ public final String clientId; /** * Kafka Consumer每次請求獲取的資料量大小 * 每次獲取的資料消費完畢之後,才會再獲取資料 * 預設1MB **/ public int
fetchSizeBytes = 1024 * 1024; /** * Kafka SimpleConsumer 客戶端和服務端連線的超時時間 * 單位:毫秒 **/ public int socketTimeoutMs = 10000; /** * Consumer每次獲取資料的超時時間 * 單位:毫秒 **/ public int fetchMaxWait = 10000; /** * Consumer通過網路IO獲取資料的socket buffet大小, * 預設1MB **/ public int bufferSizeBytes = 1024 * 1024; /** * 該引數有兩個作用: * 1:申明輸出的資料欄位 declareoutputFileds * 2:對從kafka中讀到的資料進行反序列化,即將byte位元組陣列轉為tuple物件。 * 對kafka存入資料的key和message都比較關心的,可以使用KeyValueSchemeAsMultiScheme, * 如果不關心,可以使用SchemeAsMultiScheme * 預設介面實現一般都只會輸出一個欄位或者兩個欄位,很多時候,我們需要直接從kafka中讀取到資料之後,就將每個欄位解析了,然後進行簡單處理再emit * 這個時候,建議自己實現MultiScheme介面 * 必選引數 **/ public MultiScheme scheme = new RawMultiScheme(); /** * 在拓撲提交之後,KafkaSpout會從zookeeper中讀取以前的offset值,以便沿著上次位置繼續讀取資料。 * KafkaSpout會檢查拓撲ID和zookeeper中儲存的拓撲id是否相同。 * 如果不同,並且ignoreZkOffsets=true,那麼就會從startOffsetTime引數位置讀取資料 * 否則,沿著zookeeper中儲存的offset位置繼續讀取資料。 * 也就是說,當ignoreZkOffsets=true的時候,kafkaspout只能保證在拓撲不殺掉的情況下,當worker程序異常退出的時候,會沿著上次讀取位置繼續讀取資料,當拓撲重新提交的時候,就會從佇列最早位置開始讀取資料。 * 這樣就會存在重複讀取資料的問題,所以正式場景,該引數還是應該設定為false。以保證任何場景資料的只被讀取一次。 **/ public boolean ignoreZkOffsets = false; /** * 拓撲第一次提交,zookeeper中沒有儲存對應offset的情況下,預設從kafka中讀取的offset位置。預設從佇列最早位置開始讀取資料,即從佇列最開始位置讀取資料。 **/ public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); /** * * 如果當前的(offset值-failed offsets中最小值) < maxOffsetBehind * 那麼就會清理failed列表中所有大於maxOffsetBehind的offset值。 * 這是為了防止failed過多,重發太多導致記憶體溢位 * 不過預設為了保證資料不丟失,所以maxOffsetBehind設定的最大 **/ public long maxOffsetBehind = Long.MAX_VALUE; /** * 當KafkaSpout初始化之後,使用從zookeeper中讀取的上次記錄的offset * 從kafka中獲取資料失敗,返回offsetOutofRange錯誤之後, * 是否使用startOffset從佇列最早位置重新獲取資料。 * offsetOutofrange一般發生在topic被重建,分片被刪除的場景。 **/ public boolean useStartOffsetTimeIfOffsetOutOfRange = true; /** * metric監控資訊採集間隔 **/ public int metricsTimeBucketSizeInSecs = 60; /** * KafkaSpout儲存offset的zookeeper所在地址 * 獨立出來這個屬性是為了防止offset儲存位置不在kafka叢集中 * 如果kafka和storm在一個叢集,該屬性可以忽略 **/ public List<String> zkServers = null; /** * KafkaSpout儲存offset的zookeeper埠 * 如果kafka和storm在一個叢集,該屬性可以忽略 **/ public Integer zkPort = null; /** * offset在zookeeper中儲存的路徑 * 路徑計算方式為:${zkRoot}/${id}/${partitionId} * 必選引數 **/ public String zkRoot = null; /** * kafkaSpout儲存offset的不同客戶端區分標誌 * 建議每個拓撲使用固定的,不同的引數,以保證拓撲重新提交之後,可以從上次位置繼續讀取資料 * 如果兩個拓撲公用同一個id,那麼可能會被重複讀取 * 如果在拓撲中使用了動態生成的uuid來作為id,那麼每次提交的拓撲,都會從佇列最開始位置讀取資料 * 必選引數 **/ public String id = null; /** * offset重新整理到zookeeper中的時間間隔 * 單位:毫秒 **/ public long stateUpdateIntervalMs = 2000; /** * 資料傳送失敗之後重試策略相關引數 **/ public long retryInitialDelayMs = 0; /** * 資料傳送失敗之後重試策略相關引數 **/ public double retryDelayMultiplier = 1.0; /** * 資料傳送失敗之後重試策略相關引數 **/ public long retryDelayMaxMs = 60 * 1000;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126

ZKHost中儲存了kafka叢集所在的zookeeper地址等資訊

ZKHost

/**
 * kafka叢集zookeeper地址,允許包含chroot
 * 比如:192.168.0.10:2181,192.168.0.11:2181,192.168.0.12:2181/kafka
 **/
public String brokerZkStr = null;
/**
 * kafka叢集中broker元資料所在地址
 * 預設為/brokers
 * 如果配置了chroot,那麼就是/kafka/brokers
 * 這個和kakfa服務端配置預設是一樣的,如果服務端採用預設配置,該屬性也可以使用預設值
 **/
public String brokerZkPath = null; // e.g., /kafka/brokers
/**
 * kafka broker分割槽資訊重新整理時間間隔,
 * 單位:秒
 * 當kafka有broker節點重啟或者分割槽資訊發生變化而導致資料讀取失敗的時候,
 * 都會重新觸發一次分割槽資訊重新整理
 **/
public int refreshFreqSecs = 60;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

KafkaSpout初始化

public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
        _collector = collector;

        Map stateConf = new HashMap(conf);
        /*
         * offset儲存位置的zookeeper地址
         * 如果該地址為空,則預設使用Storm叢集的zookeeper
         */
        List<String> zkServers = _spoutConfig.zkServers;
        if (zkServers == null) {
            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
        }
        Integer zkPort = _spoutConfig.zkPort;
        if (zkPort == null) {
            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
        }
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
        //儲存offset資訊到zookeeper
        _state = new ZkState(stateConf);

        //kafka叢集的聯結器
        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));

        // using TransactionalState like this is a hack
        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
        if (_spoutConfig.hosts instanceof StaticHosts) {
            _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        } else {
        //從zookeeper中讀取kafka的broker資訊,只儲存自身例項需要用到的分割槽資訊
            _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        }

        //兩個metrics監控資訊,忽略
        context.registerMetric("kafkaOffset", new IMetric() { ...}, _spoutConfig.metricsTimeBucketSizeInSecs);

        context.registerMetric("kafkaPartition", new IMetric() {...}, _spoutConfig.metricsTimeBucketSizeInSecs);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

以上是kafkaSpout的初始化方法,主要是完成對自身管理分割槽資訊的重新整理。 
這裡有一個問題,就是會建立3個zookeeper客戶端連線,一個用來從kafka中讀取資料,一個儲存offset,一個是metrics監控資訊,每個zookeeper客戶端連線會建立3個執行緒,這樣,光一個kafkaSpout就會存在9個zookeeper執行緒!當worker程序中有多個spout例項的時候,就會產生更多的執行緒,這就會很消耗效能,這個還是建議對zookeeper連線進行合併處理。

系統通過KafkaUtils.calculatePartitionsForTask方法來獲取自己需要管理的分割槽列表:

for (int i = taskIndex; i < numPartitions; i += totalTasks) {
            Partition taskPartition = partitions.get(i);
            taskPartitions.add(taskPartition);
        }
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

其中,taskIndex就對應自身spout例項的序號,比如該spout併發度為3,那麼這個spout例項就可能為0,1,2。當kafka的topic有5個分割槽的時候,第一個spout例項管理0,3的分割槽;第二個spout例項管理編號為1,4的分割槽,第三個spout例項管理編號為2的分割槽。 
taskId儲存在Spout的Open方法的context引數中。context.getThisTaskIndex()

KafkaSpout從Kafka中如何讀取資料併發送

kafkaSpout主要在nextTuple方法中讀取資料並emit。
 public void nextTuple() {
        //獲取自身例項管理的分割槽列表
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {

            try {
                //_currPartitionIndex永遠小於manager的大小
                // in case the number of managers decreased
                _currPartitionIndex = _currPartitionIndex % managers.size();
                //獲取資料並emit
                EmitState state = managers.get(_currPartitionIndex).next(_collector);
                /*
                 * 檢查此次資料傳送狀態
                 * 如果沒有取到資料或者取到的資料都已經emit完畢
                 * 那麼就增加_currPartitionIndex值,然後就可以從下個分割槽中讀取資料了。
                 */
                if (state != EmitState.EMITTED_MORE_LEFT) {
                    _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
                }

                /*
                 * 如果還有資料沒有emit,就退出此次迴圈,等待下次nexttuple呼叫
                 * 然後仍然從當前分割槽中取獲取資料並emit
                 */
                if (state != EmitState.NO_EMITTED) {
                    break;
                }
            } catch (FailedFetchException e) {
                LOG.warn("Fetch failed", e);
                _coordinator.refresh();
            }
        }
        //定期儲存offset資料到zookeeper
        long now = System.currentTimeMillis();
        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
            commit();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

資料傳送狀態EmitState一共有三種狀態

  • EMITTED_MORE_LEFT 
    上次取到的資料還沒有emit完畢
  • EMITTED_END, 
    上次取到的資料已經全部emit完畢
  • NO_EMITTED 
    本次沒有取到資料,沒有可供emit的資料

再來看下PartitionManager.next方法,裡面就包含如何獲取資料已經如何emit

public EmitState next(SpoutOutputCollector collector) {
        //如果等待發送的佇列為空,那麼就從kafka中再取一次資料
        if (_waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
        //從等待發送的佇列中獲取第一個資料
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
            //如果沒有可供傳送的資料,那麼返回emit狀態為沒有可以emit的資料
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
            //根據KeyValueSchemeAsMultiScheme介面實現,將kafka中取到的資料轉為tuple
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
            if (tups != null) {
            //傳送所有的tuple,因為kafka一條資料可能對應storm的多條
                for (List<Object> tup : tups) {
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break;
            } else {
            //如果tuple轉化失敗,返回null,直接告訴storm該條已經處理成功,即忽略資料錯誤
                ack(toEmit.offset);
            }
        }
        /*
         * 每次從等待佇列中取一條資料反序列化並emit,
         * 然後判斷等待佇列是否還有資料,
         * 如果還有資料,就告訴spout,資料還沒有傳送完,不要切換分割槽
         * 如果資料已經發送完畢,就告訴spout,資料已經發送完畢,可以切換到下個分割槽了。
         */
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

當有資料傳送失敗的時候,失敗的資料又會重新加入到_waitingToEmit佇列中,這樣就會產生一個問題,就是當資料傳送失敗的時候,kakfaSpout會永遠只讀一個分割槽,前天分割槽都不會讀取,從而產生資料消費不均勻的問題。

在0.9.6以前老版本的時候喲一個問題,就是當較多資料emit失敗的時候,會有很多的資料在不斷重試,然後重試不斷超時,又不斷重新加入重試列表,從而導致一個數據傳送的死迴圈。這個問題也就是offset超時的問題。見Storm-643, 這個問題目前在最新版本中已經解決。

KafkaBolt

KafkaBolt就比較簡單,0.10版本還是使用old Producer API。 
Storm所有的配置屬性,都在kafka.broker.properties中儲存著,這就要求在submitTopology的時候,在topologyConf中再put一個kafka.broker.properties屬性,形成一個map中套map的結構。這樣有一點不好的就是一個拓撲中資料只能寫到一個kafka叢集中,不支援同事寫到多個kafka叢集中。不過這個在0.11新版本中已經解決了,kafka.broker.properties被作為了一個區域性變數,可以在不同的bolt例項中儲存不同的配置屬性。 
資料寫入方法如下:

 public void execute(Tuple input) {
        if (TupleUtils.isTick(input)) {
          collector.ack(input);
          return; // Do not try to send ticks to Kafka
        }

        K key = null;
        V message = null;
        String topic = null;
        try {
            //訊息的鍵值,不同的值在kafka中對應不同的分發方式,這個在KafkaBolt的FAQ中有介紹。
            key = mapper.getKeyFromTuple(input);
            //訊息體
            message = mapper.getMessageFromTuple(input);
            //topic名稱
            topic = topicSelector.getTopic(input);
            if(topic != null ) {
                producer.send(new KeyedMessage<K, V>(topic, key, message));
            } else {
                LOG.warn("skipping key = " + key + ", topic selector returned null.");
            }
            collector.ack(input);
        } catch (Exception ex) {
            collector.reportError(ex);
            collector.fail(input);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

Storm-Kafka FAQ

KafkaSpout

  • KafkaSpout excutor數量和Kafka topic分割槽數量的關係 
    當executor併發度大於topic數量的時候,就會存在有的spout例項可以讀到資料, 有的spout例項讀不到資料。 
    當executor併發度小於topic數量的時候,就會存在一個spout例項對應多個分割槽的情況;kafka會先從一個分割槽中取一次資料,當這次獲取的資料emit完畢之後,就會再從下個分割槽中取資料。 
    當executor併發度等於topic數量的時候,一個spout例項對應一個分割槽。在實際應用中,我們也推薦這種配置方式。
  • 如何從kafka中讀取資料,每次讀取多少資料 
    根據fetchSizeBytes引數的配置,預設每次取1MB資料。
  • 資料讀取失敗如何處理 
    KafkaSpout每個PartitionManager內部儲存一個重試佇列,當資料傳送失敗的時候,加入重試佇列,然後重新發送,直到成功為止。 
    通過maxOffsetBehind引數來解決failed數量過多導致記憶體溢位問題。
  • Topic不存在如何處理 
    直接報錯。
  • 拓撲重新提交,會不會接著上次位置繼續讀取資料 
    重新提交的時候,只要id這個引數不變,那麼就會沿著上次位置繼續讀取資料。
  • zookeeper中儲存的kafka的offset位置有錯誤怎麼辦? 
    會丟擲offsetOutofRange異常,然後預設從kafka分割槽佇列最早位置開始讀取資料。
  • 能不能在一個spout中從多個topic讀取資料? 
    在0.10版本不行,在0.11版本中,支援按照正則方式匹配topic名稱,可以從所有滿足正則條件的topic中讀取資料。
  • topic分割槽主備資訊發生變化,如何處理 
    丟擲異常,然後馬上更新分割槽資訊,再次讀取資料。

KafkaBolt

  • 寫入資料,kafka topic不存在怎麼辦? 
    如果kakfa服務端允許自動建立topic,那麼就會自動建立topic。 
    如果不允許自動建立,那麼就會丟擲異常
  • 如何寫資料到指定分割槽? 
    取決於tupleToKafkaMapper的介面實現。 
    kafka 0.10版本使用的是old producer的API,0.11版本使用的是new Producer的API 
    對於old Producer 
    如果key == null,那麼在kafka中,會隨機寸照一個分割槽去寫入資料,之後只要不重啟,就都會往這個分割槽寫入資料 
    如果key != null,那麼就會在寫入資料的時候,以utils.abs(key.hashCode)%numPartitions規則計算分割槽id 
    對於New Producer 
    如果key = null,那麼就會使用一個遞增的int值,每次傳送資料的時候遞增,然後執行utils.abs(nextValue)%availablePartitions.size(),資料寫入會比較均衡。 
    如果key != null,那麼就會按照Utils.abs(Utils.murmur2(record.key()))%numPartitions的規則計算分割槽。 
    當然,New Producer API也可以手工指定分割槽id。