1. 程式人生 > >從Kafka的一次broker假死介紹Kafka架構和DefaultPartitioner

從Kafka的一次broker假死介紹Kafka架構和DefaultPartitioner

最近公司的kafka叢集出現了節點已經失效但是節點程序和埠都還在的情況,目前我們的系統監控只是做到了程序監控,即為整個叢集的每臺機群建立程序快照,如果程序(如NameNodekakfa broker)丟失,則報警並立刻自動重啟程序。但是這次的kafka事故程序和埠都還在,因此報警系統沒有能夠及時報警,因此對此次事故發生的過程和解決方式做詳細的分析。 首先,我們一個同學使用kafka的過程中發現訊息無法消費,因此進入進群進行如下檢查: 程序和埠:我們的kafka的3個broker,程序和埠都在,正常使用kakfa-console-producer進行訊息的生產,丟擲異常 使用kakfa-console-consumer

進行訊息的消費,丟擲異常 使用kafka-topics --describe進行topic的詳細情況的分析,發現,partition 和 Isr(In-Sync Replication)竟然只剩下一臺機器 我們知道,kafka在建立topic的時候會指定partition數量和replication數量,對於每一個partition,都會有一個broker作為leader broker,剩餘的broker作為slave broker。我們猜想在我們的程式碼中生產的訊息應該已經丟失。因此進行驗證。在緊急重啟了假死的兩臺broker以後,我們開始對訊息丟失情況進行驗證,令人驚訝的是,沒有發生訊息丟失。但是,為了以防萬一,無論訊息是否丟失,我們都必須找到足夠的證據。

我們的topic屬性是2個partition、2個replication組成,當我們發現從這個topic消費訊息發生異常的時候,我們列印了這個topic的描述資訊:

[[email protected]-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181
Topic:wuchang1  PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: wuchang1
Partition: 0 Leader: 110 Replicas: 110,50 Isr: 110 Topic: wuchang1 Partition: 1 Leader: -1 Replicas: 50,82 Isr:

這個資訊其實是我們在測試環境復現出來的現場。我覺得,一個資深的軟體工程師,非常注重對事故現場的復現,因為只有成功地復現問題,才能根本地解決問題。在正常情況下,3個broker工作正常,它的描述資訊是這樣的:

[[email protected]-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181
Topic:wuchang1  PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: wuchang1 Partition: 0    Leader: 110     Replicas: 110,50        Isr: 110,50
        Topic: wuchang1 Partition: 1    Leader: 50      Replicas: 50,82 Isr: 50,82

為了讓我們的kafka cluster能夠容忍部分機器宕機,我們的生產環境和測試環境打開了leader 自動選舉:auto.leader.rebalance.enable=true
這樣,當任何一個TopicPartition的leader丟失,Controller會啟動一個監控執行緒監控所有partition的Leader狀態,如果發現某個Topic-Partition的leader丟失,則該執行緒會為該Leader啟動重新選舉,程式碼在KafkaController.scala中:

  def onControllerFailover() {
   //省略
      if (config.autoLeaderRebalanceEnable) {
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
    }
    else
      info("Controller has been shut down, aborting startup/failover")
  }

方法checkAndTriggerPartitionRebalance()就是完成對所有Topic-Partition的leader檢查,這個作為引數被定時呼叫,注意,Controller與Leader不同,Leader是針對某個Topic-Partition而言,而Controller是整個叢集的Controller
為了能夠在自動leader選舉開啟的情況下讓某個Topic-Partition失去leader,我們將這個topic的partition 1對應的兩個replication 全部kill,這樣,即使自動leader檢查開啟,由於partition-1 已經不存在任何一個活著的replication,因此無從選舉出一個leader,此時,這個partition已經不再工作,partition-0也只有僅剩的一個broker來作為leader。
我們當時在測試環境復現問題的時候,在自動leader選舉開啟的情況下,只要某個partition的replication中有一個還活著(即ISR中還有任何一個broker),這個broker就會被自動選舉為leader。這就是Kafka高可用性的一個體現。只有當一個topic的全部replication全部丟失,這個kafka的這個Topic-Partitioin才會變為不可用狀態。
反過來看,如果我們的topic的replication-factor設定為2,那麼,在自動leader rebalance開啟的情況下,任何兩臺broker丟失,都不會對任何partition造成影響,除非這個Topic-Partition的三個replication全部掛掉。

現在現場已經復現,我們就來驗證這種情況下訊息是否丟失。

我們線上環境生產訊息的程式碼來源於nginx lua外掛,用來將nginx收的的使用者訪問資訊傳送到kafka:

    topic = args["pbtype"]
    -- topic = "LivyRoomMsg"
    if (topic == "LivyRoomMsg") then
        msgJson = Convert_GjsWebLiveRoomWechatUserLogin(msgJson,args["messageSentTime"])
    end
    local ok, err = bp:send(topic, nil, msgJson)

從程式碼片段中我們可以看到,傳送訊息的時候,key為null。我們使用java程式碼,同樣設定key為null,傳送訊息到我們測試環境的現場,的確訊息未丟失,傳送的所有訊息都被打倒到了leader存在的partition上面了。
我們知道,Kafka通過key資訊決定了訊息傳送到哪個broker,我們使用的是預設的Partitioner, Kafka預設的Partitioner是DefaultPartitioner,核心方法是partition():

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) { //從可用的partitioner中選擇一個
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {//從所有叢集中選擇一個
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {// 只要有key , 就按照key去確定
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

  /**
     * topicCounterMap維護了每個topic的一個計數器,這個計數器用來通過Round-Robin方式選擇一個partition
     * @param topic
     * @return
     */
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(new Random().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

預設partitioner的訊息分派邏輯是:

  1. 如果存在key,則通過Round-Robin的方式,從該topic的所有partition中選擇一個分割槽,即,無論現在分割槽的狀態如何,一旦key確定,對應的broker就確定了,到5;
  2. 如果key為空,則到3;
  3. 如果這個topic還存在可用的Partition(還存在leader的partition),則通過Round-Robin的方式,以這個topic遞增的隨機數作為種子,從這些可用的partition中選擇一個partition,將訊息傳送到這個partition,否則到4;
  4. 如果這個topic沒有任何一個可用的Partition,則通過Round-Robin的方式,以這個topic遞增的隨機數作為種子,從所有partition中選擇一個partition傳送訊息。很顯然,如果選擇的partition不可用,訊息傳送失敗;
  5. 退出

DefaultPartition 使用 topicCounterMap來維護每個topic用來通過Round-Robin方式選擇partition的序列號,key是所有topic的名字,value是一個整數計數器,每次進行一次選擇則自增1,保證所有partition被依次使用到。

在我們使用bin/kafka-console-producer.sh的命令列工具生產訊息的時候,其實最終也是呼叫了。本文中,我並不急於讓大家知道問題原因,而希望逐步撥雲見日,讓大家從程式碼層面循序漸漸,逐步接近問題真想。這樣做,不僅僅能夠找到問題原因,更能夠學到知識,而不僅僅是確認了或者解決了一個問題。

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "[email protected]"

執行kakfa-console-producer.sh時,預設java程序堆記憶體大小為512M,當然,如果我們可以自行設定和修改。實際執行的,是ConsoleProducer類,顯然,ConsoleProducer負責從命令列中讀取我們輸入的訊息,然後生產到Kafka Server。看過$KAFKA_HOME/bin目錄下面的指令碼程式碼你就會知道,kafka的程式碼重用做得非常好,即使是指令碼,也充分重用。$KAFKA_HOME/bin/kafka-run-class.sh是一個公共啟動類,無論我們呼叫kakfa-console-producer.shkakfa-console-consumer.shkafka-topics.sh等等指令碼,都是通過kafka-run-class.sh執行起來的,只需要告訴kafka-run-class.sh需要啟動的java類以及額外的啟動引數,kafka-run-class.sh就會執行這個java類,新增上這些額外的啟動引數,以及一些共用的、必須的classpath。 因此,我們繼續來看ConsoleProducer的實現,瞭解這個類的實現機制,直接關係到我們最常用的kafka-console-producer的命令的行為:

  def main(args: Array[String]) {

    try {
        val config = new ProducerConfig(args)
        val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
        reader.init(System.in, getReaderProps(config))

        val producer =
          if(config.useOldProducer) {
            new OldProducer(getOldProducerProps(config))
          } else {
            //NewShinyProducer只是對org.apache.kafka.clients.producer.KafkaProducer
            //進行了簡單封裝,底層還是用org.apache.kafka.clients.producer.KafkaProducer傳送訊息
            new NewShinyProducer(getNewProducerProps(config))
          }
        //省略
        var message: ProducerRecord[Array[Byte], Array[Byte]] = null
        do {
          //reader的預設實現類是LineMessageReader,一行一行讀取使用者在命令列中的輸入
          message = reader.readMessage()
          if (message != null)
            //在沒有特殊指定訊息的key的情況下,key為空
            producer.send(message.topic, message.key, message.value)
        } while (message != null)
    } catch {
      //省略
    }
    Exit.exit(0)
  }

LineMessageReader是訊息讀取的實現類,用來讀取我們在命令列中輸入的Kafka訊息:

  class LineMessageReader extends MessageReader {
    var topic: String = null
    var reader: BufferedReader = null
    var parseKey = false
    var keySeparator = "\t"
    var ignoreError = false
    var lineNumber = 0

    override def init(inputStream: InputStream, props: Properties) {
      topic = props.getProperty("topic")
      //如果需要指定key,則在kafka-console-producer中增加引數--property "parse.key=true",--property "key.separator=:"
      //用來告訴kafka是否使用key以及分割訊息和key的分隔符
      if (props.containsKey("parse.key"))
        parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
      if (props.containsKey("key.separator"))
        keySeparator = props.getProperty("key.separator")
      if (props.containsKey("ignore.error"))//是否忽略錯誤
        ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
      reader = new BufferedReader(new InputStreamReader(inputStream))
    }

    override def readMessage() = {
      lineNumber += 1
      print(">")
      (reader.readLine(), parseKey) match {
        case (null, _) => null
        case (line, true) =>
          line.indexOf(keySeparator) match {
            case -1 => //在使用者輸入的訊息中沒有找到keySeparator
              if (ignoreError) new ProducerRecord(topic, line.getBytes)
              else throw new KafkaException(s"No key found on line $lineNumber: $line")
            case n => //找到keySeparator定義的字元,則提取訊息體和key,組裝成為ProducerRecord物件
              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
              new ProducerRecord(topic, line.substring(0, n).getBytes, value)
          }
        case (line, false) =>//使用者沒有開啟parse.key功能,則設定key為null
          new ProducerRecord(topic, line.getBytes)
      }
    }
  }

LineMessageReader的主要職責是讀取命令列中使用者的輸入,然後使用KafkaProducer把訊息傳送出去。我們可以通過引數parse.key=true以及key.separator=:來告訴kakfa我們會顯式指定key。絕大多數情況下,除非有特殊需求,我們都不會使用如此繁複冗長的引數。因此,實際上,我使用如下命令進行訊息的生產時,效果和我在程式碼中使用KafkaProducer進行訊息的生產並將key設定為null的效果一樣,Kafka都會通過使用DefaultPartitioner來進行訊息的分派,由於key為null,將選擇任何一個活著的broker,因此,雖然我們Kafka的某個topic的部分partition的leader丟失,訊息卻不會丟失。

其實,我們在使用Kafka過程中,我們會以為我們使用比如一個每次遞增1的key,可以實現訊息分派的負載均衡,即訊息會幾乎均勻地分佈到所有的partition上面去。但是其實這樣做可能會造成訊息的丟失,更好的做法,就是直接不指定key,此時Kafka會幫助我們在所有或者的broker中選擇一個進行訊息分派,不會造成訊息丟失,同時負載均衡Kafka也幫我們完成了。
Kafka的key的使用是用來滿足定製化的分派規則而不是訊息均勻分派,比如:
1. 我們希望這個topic的所有訊息打到同一個partition,這時候我們可以指定一個不變的任意的key,根據DefaultPartitioner的實現,訊息會固定打到某個partition;
2. 我們希望根據訊息的內容完全定製化地控制這個訊息對應的partition,這時候我們需要自己實現一個Partitioner。如果我們看過DefaultPartitioner的實現,那麼實現自己的定製化的Partitioner就太簡單了。

總體來說本文的這些程式碼難度不是很大,但是對於我們理解Kafka的執行機制從而正確地、毫無誤解地使用Kafka非常有幫助。相比我在部落格中介紹的Hadoop、Yarn的排程求、資源管理器程式碼,這段程式碼非常容易理解,但是也可以從中看到Kafka程式碼的優雅和規範,良好的介面定義帶來良好的可擴充套件性。

相關推薦

Kafkabroker介紹Kafka架構DefaultPartitioner

最近公司的kafka叢集出現了節點已經失效但是節點程序和埠都還在的情況,目前我們的系統監控只是做到了程序監控,即為整個叢集的每臺機群建立程序快照,如果程序(如NameNode、kakfa broker)丟失,則報警並立刻自動重啟程序。但是這次的kafka事故程序

線上mysql鎖分析

一、現象 發運車次呼叫發車介面時發生異常,後臺丟擲資料庫死鎖日誌。   二、原因分析   通過日誌可以看出事務T1等待 heap no 8的行鎖 (X locks 排他鎖)                 事務T2持有heap no 8的行鎖,等待heap no 7的行鎖 兩個更新運

原始碼徹底理解Android的訊息機制

情景重現 button.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v

cgi服務卡的問題排查記錄

問題現象 cgi服務無法處理請求,cpu偶爾飆高。 問題排查記錄 檢視呼叫棧 首先jstack 檢視程序的當前呼叫棧,發現很多執行緒處於Blocked狀態。 jstack pid > stack.txt 檢視gc情況 cpu偶

Logstash資料庫同步多張表

一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。  博友前天線上提問,說明這塊理解的還不夠透徹。  我整理下,  一是為了儘快解決博友問題,  二是加深記憶,便於未來產品開發中快速上手。 1、同步原理 原有ES專欄中有詳解,不再贅述。詳細請參

查詢sqlserver鎖的經歷

查詢bug是程式設計師的家常便飯,我身邊的人喜歡讓使用者來重現問題。當然他們也會從正式伺服器上下載錯誤log,然後嘗試分析log,不過當錯誤不是那種不經思考就可識別的情況,他們就會將問題推向使用者,甚至怪罪程式依賴的平臺。他們常用的藉口就是“這個問題很難重現,需要持續監控

被“呼你”電話騷擾的反騷擾經歷

一、事件回放 2018 年 7 月 23 日下午 6:23,接到了來自 010-53565784 的電話。對方聲稱是愛上街催收的,要求我通知薛**及時還清在愛上街 app 上借款。同時,還告知說薛**借款時將我的手機號填寫為緊急聯絡人。一肚子氣啊,自己交友不慎啊。就掛了電話

Mysql線上

Mysql死鎖日誌解讀(SHOW ENGINE INNODB STATUS;)2018-02-01 09:20:25 2b113e040700 INNODB MONITOR OUTPUT ===================================== Per se

MySQL 線上鎖分析實戰

> 關鍵詞:MySQL Index Merge ## 前言 MySQL 的鎖機制相信大家在學習 MySQL 的時候都有簡單的瞭解過,那既然有鎖就必定繞不開死鎖這個問題。其實 MySQL 在大部分場景下是不會存在死鎖問題的(比如併發量不高,SQL 寫得不至於太拉胯的情況),但是在高併發的業務場景下,一

記錄apache服務器啟動報錯解決方法

受限 png www img oot 端口 使用 rwx 環境 問題描述:在liunx系統上安裝軟件時需要較大的權限,一般用戶是不能隨便安裝的。為了省事,在安裝lamp環境時,整個過程都是以root身份安裝各種軟件的。最後整個環境是安裝成功,但是像apache這樣的服務器如

React 重要的重構:認識非同步渲染架構 Fiber

Diff 演算法 熟悉 react 的朋友都知道,在 react 中有個核心的演算法,叫 diff 演算法。web 介面由 dom 樹組成,不同的 dom 樹會渲染出不同的介面。react 使用 virtual dom 來表示 dom 樹,而 diff 演算法就是用於比較 virtual dom 樹的區別,

刪資料而認識的CountDownLatchCyclicBarrier

公司之前有個任務,要求刪除一張資料庫表裡面2018/2/1之前的資料。這張表裡面存放的是車輛定位資料,一輛車每天能產生4000+條定位資料,所以整個表蠻大的,有65億+條資料。而且還有要求:根據每個地區要統計出來這個地區刪除了多少條資料。其中2月1號之前的有10億多條。當然這

徹底解決Java的值傳遞引用傳遞

本文旨在用最通俗的語言講述最枯燥的基本知識 學過Java基礎的人都知道:值傳遞和引用傳遞是初次接觸Java時的一個難點,有時候記得了語法卻記不得怎麼實際運用,有時候會的了運用卻解釋不出原理,而且坊間討論的話題又是充滿爭議:有的論壇帖子說Java只有值傳遞,有的部落格說兩者皆

記我的配置Apache伺服器的域名解析泛域名解析過程

配置apache的多域名解析,需要用到下面的東西: C:\WINDOWS\system32\drivers\etc\hosts   (DNS域名解析的檔案) Apache2.2.11\conf\httpd.conf Apache2.2.11\conf\extra\httpd

重新學習Python——錯誤、除錯測試

錯誤 一種用try...except...finally捕獲錯誤並用raise丟擲 除錯 assert 斷言 凡是用print來輔助檢視的地方,都可以用斷言(assert)來替代: # err.py def foo(s): n = int(s)

sql語句優化進行多條記錄的-----插入修改

更新: update t_student set name = 'timy' where id = 10 現在我要更新ID為10、12 、13的age等於10、12、13 UPDATE t_student SET age= CASEWHEN id 10 THEN10WHE

關於失敗的專案開發的反思總結

這次教訓比較深刻,磨刀不誤砍柴工也是這個道理,最大的體會就是:相比較技術而言,解決問題的思想和方式更為重要。在開發一項公司活動產品的過程中,我因為建表太過膚淺,不規範,導致後期開發的過程中,程式碼越來越

記錄錯誤處理 (xml序列化反序列化相關)

vfl last events all 長度 pat vid pac ria XML序列化後,反序列化時出現錯誤 報錯現象 System.InvalidOperationException: XML 文檔(40, 11)中有錯誤。 ---> System.Xml.X

徹底搞懂Vue針對陣列雙向繫結(MVVM)的處理方式

歡迎關注我的部落格:https://github.com/wangweianger/myblog Vue內部實現了一組觀察陣列的

libzip開發筆記():libzip庫介紹、編譯工程模板

  前言   Qt使用一些壓縮解壓功能,選擇libzip庫,libzip庫比較原始,也是很多其他庫的基礎支撐庫。   libzip   libzip是一個C庫,用於讀取,建立和修改zip檔案。可以從資料緩衝區,檔案或直接從其他zip歸檔檔案直接複製的壓縮資料中新增檔案。在不關