1. 程式人生 > >Apache Kafka:下一代分布式消息系統

Apache Kafka:下一代分布式消息系統

嚴重 依賴 ring 簡介 mes 傳統 aid .com bit

簡介

Apache Kafka是分布式發布-訂閱消息系統。它最初由LinkedIn公司開發,之後成為Apache項目的一部分。Kafka是一種快速、可擴展的、設計內在就是分布式的,分區的和可復制的提交日誌服務。

Apache Kafka與傳統消息系統相比,有以下不同:

  • 它被設計為一個分布式系統,易於向外擴展;
  • 它同時為發布和訂閱提供高吞吐量;
  • 它支持多訂閱者,當失敗時能自動平衡消費者;
  • 它將消息持久化到磁盤,因此可用於批量消費,例如ETL,以及實時應用程序。

本文我將重點介紹Apache Kafka的架構、特性和特點,幫助我們理解Kafka為何比傳統消息服務更好。

我將比較Kafak和傳統消息服務RabbitMQ、Apache ActiveMQ的特點,討論一些Kafka優於傳統消息服務的場景。在最後一節,我們將探討一個進行中的示例應用,展示Kafka作為消息服務器的用途。這個示例應用的完整源代碼在GitHub。關於它的詳細討論在本文的最後一節。

架構

首先,我介紹一下Kafka的基本概念。它的架構包括以下組件:

  • 話題(Topic)是特定類型的消息流。消息是字節的有效負載(Payload),話題是消息的分類名或種子(Feed)名。
  • 生產者(Producer)是能夠發布消息到話題的任何對象。
  • 已發布的消息保存在一組服務器中,它們被稱為代理(Broker)或Kafka集群。
  • 消費者可以訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發布的消息。

技術分享

圖1:Kafka生產者、消費者和代理環境

生產者可以選擇自己喜歡的序列化方法對消息內容編碼。為了提高效率,生產者可以在一個發布請求中發送一組消息。下面的代碼演示了如何創建生產者並發送消息。

生產者示例代碼:

producer = new Producer(…); 
message = new Message(“test message str”.getBytes()); 
set = new MessageSet(message); 
producer.send(“topic1”, set); 

為了訂閱話題,消費者首先為話題創建一個或多個消息流。發布到該話題的消息將被均衡地分發到這些流。每個消息流為不斷產生的消息提供了叠代接口。然後消費者叠代流中的每一條消息,處理消息的有效負載。與傳統叠代器不同,消息流叠代器永不停止。如果當前沒有消息,叠代器將阻塞,直到有新的消息發布到該話題。Kafka同時支持點到點分發模型(Point-to-point delivery model),即多個消費者共同消費隊列中某個消息的單個副本,以及發布-訂閱模型(Publish-subscribe model),即多個消費者接收自己的消息副本。下面的代碼演示了消費者如何使用消息。

消費者示例代碼:

streams[] = Consumer.createMessageStreams(“topic1”, 1) 
for (message : streams[0]) { 
bytes = message.payload(); 
// do something with the bytes 
} 

Kafka的整體架構如圖2所示。因為Kafka內在就是分布式的,一個Kafka集群通常包括多個代理。為了均衡負載,將話題分成多個分區,每個代理存儲一或多個分區。多個生產者和消費者能夠同時生產和獲取消息。

技術分享

圖2:Kafka架構

Kafka存儲

Kafka的存儲布局非常簡單。話題的每個分區對應一個邏輯日誌。物理上,一個日誌為相同大小的一組分段文件。每次生產者發布消息到一個分區,代理就將消息追加到最後一個段文件中。當發布的消息數量達到設定值或者經過一定的時間後,段文件真正寫入磁盤中。寫入完成後,消息公開給消費者。

與傳統的消息系統不同,Kafka系統中存儲的消息沒有明確的消息Id。

消息通過日誌中的邏輯偏移量來公開。這樣就避免了維護配套密集尋址,用於映射消息ID到實際消息地址的隨機存取索引結構的開銷。消息ID是增量的,但不連續。要計算下一消息的ID,可以在其邏輯偏移的基礎上加上當前消息的長度。

消費者始終從特定分區順序地獲取消息,如果消費者知道特定消息的偏移量,也就說明消費者已經消費了之前的所有消息。消費者向代理發出異步拉請求,準備字節緩沖區用於消費。每個異步拉請求都包含要消費的消息偏移量。Kafka利用sendfile API高效地從代理的日誌段文件中分發字節給消費者。

技術分享

圖3:Kafka存儲架構

Kafka代理

與其它消息系統不同,Kafka代理是無狀態的。這意味著消費者必須維護已消費的狀態信息。這些信息由消費者自己維護,代理完全不管。這種設計非常微妙,它本身包含了創新。

  • 從代理刪除消息變得很棘手,因為代理並不知道消費者是否已經使用了該消息。Kafka創新性地解決了這個問題,它將一個簡單的基於時間的SLA應用於保留策略。當消息在代理中超過一定時間後,將會被自動刪除。
  • 這種創新設計有很大的好處,消費者可以故意倒回到老的偏移量再次消費數據。這違反了隊列的常見約定,但被證明是許多消費者的基本特征。

ZooKeeper與Kafka

考慮一下有多個服務器的分布式系統,每臺服務器都負責保存數據,在數據上執行操作。這樣的潛在例子包括分布式搜索引擎、分布式構建系統或者已知的系統如Apache Hadoop。所有這些分布式系統的一個常見問題是,你如何在任一時間點確定哪些服務器活著並且在工作中。最重要的是,當面對這些分布式計算的難題,例如網絡失敗、帶寬限制、可變延遲連接、安全問題以及任何網絡環境,甚至跨多個數據中心時可能發生的錯誤時,你如何可靠地做這些事。這些正是Apache ZooKeeper所關註的問題,它是一個快速、高可用、容錯、分布式的協調服務。你可以使用ZooKeeper構建可靠的、分布式的數據結構,用於群組成員、領導人選舉、協同工作流和配置服務,以及廣義的分布式數據結構如鎖、隊列、屏障(Barrier)和鎖存器(Latch)。許多知名且成功的項目依賴於ZooKeeper,其中包括HBase、Hadoop 2.0、Solr Cloud、Neo4J、Apache Blur(Incubating)和Accumulo。

ZooKeeper是一個分布式的、分層級的文件系統,能促進客戶端間的松耦合,並提供最終一致的,類似於傳統文件系統中文件和目錄的Znode視圖。它提供了基本的操作,例如創建、刪除和檢查Znode是否存在。它提供了事件驅動模型,客戶端能觀察特定Znode的變化,例如現有Znode增加了一個新的子節點。ZooKeeper運行多個ZooKeeper服務器,稱為Ensemble,以獲得高可用性。每個服務器都持有分布式文件系統的內存復本,為客戶端的讀取請求提供服務。

技術分享

圖4:ZooKeeper Ensemble架構

上圖4展示了典型的ZooKeeper ensemble,一臺服務器作為Leader,其它作為Follower。當Ensemble啟動時,先選出Leader,然後所有Follower復制Leader的狀態。所有寫請求都通過Leader路由,變更會廣播給所有Follower。變更廣播被稱為原子廣播。

Kafka中ZooKeeper的用途:正如ZooKeeper用於分布式系統的協調和促進,Kafka使用ZooKeeper也是基於相同的原因。ZooKeeper用於管理、協調Kafka代理。每個Kafka代理都通過ZooKeeper協調其它Kafka代理。當Kafka系統中新增了代理或者某個代理故障失效時,ZooKeeper服務將通知生產者和消費者。生產者和消費者據此開始與其它代理協調工作。Kafka整體系統架構如圖5所示。

技術分享

圖5:Kafka分布式系統的總體架構

Apache Kafka對比其它消息服務

讓我們了解一下使用Apache Kafka的兩個項目,以對比其它消息服務。這兩個項目分別是LinkedIn和我的項目:

LinkedIn的研究

LinkedIn團隊做了個實驗研究,對比Kafka與Apache ActiveMQ V5.4和RabbitMQ V2.4的性能。他們使用ActiveMQ默認的消息持久化庫Kahadb。LinkedIn在兩臺Linux機器上運行他們的實驗,每臺機器的配置為8核2GHz、16GB內存,6個磁盤使用RAID10。兩臺機器通過1GB網絡連接。一臺機器作為代理,另一臺作為生產者或者消費者。

生產者測試

LinkedIn團隊在所有系統中配置代理,異步將消息刷入其持久化庫。對每個系統,運行一個生產者,總共發布1000萬條消息,每條消息200字節。Kafka生產者以1和50批量方式發送消息。ActiveMQ和RabbitMQ似乎沒有簡單的辦法來批量發送消息,LinkedIn假定它的批量值為1。結果如下面的圖6所示:

技術分享

圖6:LinkedIn的生產者性能實驗結果

Kafka性能要好很多的主要原因包括:

  • Kafka不等待代理的確認,以代理能處理的最快速度發送消息。
  • Kafka有更高效的存儲格式。平均而言,Kafka每條消息有9字節的開銷,而ActiveMQ有144字節。其原因是JMS所需的沈重消息頭,以及維護各種索引結構的開銷。LinkedIn註意到ActiveMQ一個最忙的線程大部分時間都在存取B-Tree以維護消息元數據和狀態。

消費者測試

為了做消費者測試,LinkedIn使用一個消費者獲取總共1000萬條消息。LinkedIn讓所有系統每次拉請求都預獲取大約相同數量的數據,最多1000條消息或者200KB。對ActiveMQ和RabbitMQ,LinkedIn設置消費者確認模型為自動。結果如圖7所示。

技術分享

圖7:LinkedIn的消費者性能實驗結果

Kafka性能要好很多的主要原因包括:

  • Kafka有更高效的存儲格式;在Kafka中,從代理傳輸到消費者的字節更少。
  • ActiveMQ和RabbitMQ兩個容器中的代理必須維護每個消息的傳輸狀態。LinkedIn團隊註意到其中一個ActiveMQ線程在測試過程中,一直在將KahaDB頁寫入磁盤。與此相反,Kafka代理沒有磁盤寫入動作。最後,Kafka通過使用sendfile API降低了傳輸開銷。

目前,我正在工作的一個項目提供實時服務,從消息中快速並準確地提取場外交易市場(OTC)定價內容。這是一個非常重要的項目,處理近25種資產類別的財務信息,包括債券、貸款和ABS(資產擔保證券)。項目的原始信息來源涵蓋了歐洲、北美、加拿大和拉丁美洲的主要金融市場領域。下面是這個項目的一些統計,說明了解決方案中包括高效的分布式消息服務是多麽重要:

  • 每天處理的消息數量超過1,300,000;
  • 每天解析的OTC價格數量超過12,000,000;
  • 支持超過25種資產類別;
  • 每天解析的獨立票據超過70,000。

消息包含PDF、Word文檔、Excel及其它格式。OTC定價也可能要從附件中提取。

由於傳統消息服務器的性能限制,當處理大附件時,消息隊列變得非常大,我們的項目面臨嚴重的問題,JMSqueue一天需要啟動2-3次。重啟JMS隊列可能丟失隊列中的全部消息。項目需要一個框架,不論解析器(消費者)的行為如何,都能夠保住消息。Kafka的特性非常適用於我們項目的需求。

當前項目具備的特性:

  1. 使用Fetchmail獲取遠程郵件消息,然後由Procmail過濾並處理,例如單獨分發基於附件的消息。
  2. 每條消息從單獨的文件獲取,該文件被處理(讀取和刪除)為一條消息插入到消息服務器中。
  3. 消息內容從消息服務隊列中獲取,用於解析和提取信息。

示例應用

這個示例應用是基於我在項目中使用的原始應用修改後的版本。我已經刪除日誌的使用和多線程特性,使示例應用的工件盡量簡單。示例應用的目的是展示如何使用Kafka生產者和消費者的API。應用包括一個生產者示例(簡單的生產者代碼,演示Kafka生產者API用法並發布特定話題的消息),消費者示例(簡單的消費者代碼,用於演示Kafka消費者API的用法)以及消息內容生成API(在特定路徑下生成消息內容到文件的API)。下圖展示了各組件以及它們與系統中其它組件間的關系。

技術分享

圖8:示例應用組件架構

示例應用的結構與Kafka源代碼中的例子程序相似。應用的源代碼包含Java源程序文件夾‘src’和‘config‘文件夾,後者包括幾個配置文件和一些Shell腳本,用於執行示例應用。要運行示例應用,請參照ReadMe.md文件或GitHub網站Wiki頁面的說明。

程序構建可以使用Apache Maven,定制也很容易。如果有人想修改或定制示例應用的代碼,有幾個Kafka構建腳本已經過修改,可用於重新構建示例應用代碼。關於如何定制示例應用的詳細描述已經放在項目GitHub的Wiki頁面。

現在,讓我們看看示例應用的核心工件。

Kafka生產者代碼示例

/** 
 * Instantiates a new Kafka producer. 
 * 
 * @param topic the topic 
 * @param directoryPath the directory path 
 */ 
public KafkaMailProducer(String topic, String directoryPath) { 
       props.put("serializer.class", "kafka.serializer.StringEncoder"); 
       props.put("metadata.broker.list", "localhost:9092"); 
       producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); 
       this.topic = topic; 
       this.directoryPath = directoryPath; 
} 

public void run() { 
      Path dir = Paths.get(directoryPath); 
      try { 
           new WatchDir(dir).start(); 
           new ReadDir(dir).start(); 
      } catch (IOException e) { 
           e.printStackTrace(); 
      } 
} 

上面的代碼片斷展示了Kafka生產者API的基本用法,例如設置生產者的屬性,包括發布哪個話題的消息,可以使用哪個序列化類以及代理的相關信息。這個類的基本功能是從郵件目錄讀取郵件消息文件,然後作為消息發布到Kafka代理。目錄通過java.nio.WatchService類監視,一旦新的郵件消息Dump到該目錄,就會被立即讀取並作為消息發布到Kafka代理。

Kafka消費者代碼示例

public KafkaMailConsumer(String topic) { 
       consumer = 
Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
       this.topic = topic; 
} 

/** 
 * Creates the consumer config. 
 * 
 * @return the consumer config 
 */ 
private static ConsumerConfig createConsumerConfig() { 
      Properties props = new Properties(); 
      props.put("zookeeper.connect", KafkaMailProperties.zkConnect); 
      props.put("group.id", KafkaMailProperties.groupId); 
      props.put("zookeeper.session.timeout.ms", "400"); 
      props.put("zookeeper.sync.time.ms", "200"); 
      props.put("auto.commit.interval.ms", "1000"); 
      return new ConsumerConfig(props); 
} 

public void run() { 
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
      topicCountMap.put(topic, new Integer(1)); 
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
      KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
      ConsumerIterator<byte[], byte[]> it = stream.iterator();
      while (it.hasNext()) 
      System.out.println(new String(it.next().message())); 
}

上面的代碼演示了基本的消費者API。正如我們前面提到的,消費者需要設置消費的消息流。在Run方法中,我們進行了設置,並在控制臺打印收到的消息。在我的項目中,我們將其輸入到解析系統以提取OTC定價。

在當前的質量保證系統中,我們使用Kafka作為消息服務器用於概念驗證(Proof of Concept,POC)項目,它的整體性能優於JMS消息服務。其中一個我們感到非常興奮的特性是消息的再消費(re-consumption),這讓我們的解析系統可以按照業務需求重新解析某些消息。基於Kafka這些很好的效果,我們正計劃使用它,而不是用Nagios系統,去做日誌聚合與分析。

總結

Kafka是一種處理大量數據的新型系統。Kafka基於拉的消費模型讓消費者以自己的速度處理消息。如果處理消息時出現了異常,消費者始終可以選擇再消費該消息。

Apache Kafka:下一代分布式消息系統