1. 程式人生 > >如何實現一個簡單易用且可靠的訊息佇列框架?

如何實現一個簡單易用且可靠的訊息佇列框架?

作者:李豔鵬

編輯:Gary

訊息佇列在網際網路領域裡得到了廣泛的應用,它多應用在非同步處理、模組之間的解偶和高併發的消峰等場景,訊息佇列中表現最好的當屬Apache開源專案Kafka,Kafka使用支援高併發的Scala語言開發,利用作業系統的快取原理達到高效能,並且天生具有可分割槽,分散式的特點,而且有不同語言的客戶端,使用起來非常的方便。

Kclient是Kafka生產者客戶端和消費者客戶端的一個簡單易用的框架,它具有高效整合、高效能、高穩定的高階特點。

在繼續閱讀kclient的功能特性、架構設計和使用方法之前,讀者需要對Kafka進行基本的學習和了解。如果讀者是Kafka的初學者,而且英文還不錯,也可以直接參考Kafka官方線上文件:Kafka 0.8.2 Documentation,如果對英文不感性趣,可以在網上搜索Kakfa的中文資料進行學習,內容需要涵蓋Kafka的使用嚮導以及利用作業系統快取的“空中接力”、持久化、分片機制、高可用等原理。

本文包含了背景介紹、功能特性、架構設計、使用指南、API簡介、後臺監控和管理、訊息處理機模板專案、以及效能壓測相關章節。如果你想使用kclient快速的構建Kafka處理機服務,請參考訊息處理機模板專案章節; 如果你想了解kclient的其他使用方式、功能特性、監控和管理等,請參考功能特性、使用指南、API簡介、後臺監控和管理等章節; 如果你想更深入的理解kclient的架構設計和效能指標,請參考架構設計和效能壓測章節。

功能特性

1. 簡單易用

簡化了Kafka客戶端API的使用方法, 特別是對消費端開發,消費端開發者只需要實現MessageHandler介面或者相關子類,在實現中處理訊息完成業務邏輯,並且在主執行緒中啟動封裝的消費端伺服器即可。它提供了各種常用的MessageHandler,框架自動轉換訊息到領域物件模型或者JSON物件等資料結構,讓開發者更專注於業務處理。如果使用服務原始碼註解的方式宣告訊息處理機的後臺,可以將一個通用的服務方法直接轉變成具有完善功能的處理Kafka訊息佇列的處理機,使用起來極其簡單,程式碼看起來一目瞭然,在框架級別通過多種執行緒池技術保證了處理機的高效能。

在使用方面,它提供了多種使用方式:

  1. 直接使用Java API;
  2. 與Spring環境無縫整合;
  3. 服務原始碼註解,通過註解宣告方式啟動Kafka訊息佇列的處理機。

除此之外,它基於註解提供了訊息處理機的模板專案,可以根據模板專案通過配置快速開發Kafka的訊息處理機。

2. 高效能

為了在不同的業務場景下實現高效能, 它提供不同的執行緒模型:

為了在不同的業務場景下實現高效能, 它提供不同的執行緒模型:

  1. 適合輕量級服務的同步執行緒模型;
  2. 適合IO密集型服務的非同步執行緒模型(細分為所有消費者流共享執行緒池和每個流獨享執行緒池)。

另外,非同步模型中的執行緒池也支援確定數量執行緒的執行緒池和執行緒數量可伸縮的執行緒池。

3. 高穩定性

框架級別處理了常見的異常,計入錯誤日誌,可用於錯誤手工恢復或者洗資料,並實現了優雅關機和重啟等功能。

架構設計

1. 執行緒模型

同步執行緒模型

在這種執行緒模型中,客戶端為每一個消費者流使用一個執行緒,每個執行緒負責從Kafka佇列裡消費訊息,並且在同一個執行緒裡進行業務處理。我們把這些執行緒稱為消費執行緒,把這些執行緒所在的執行緒池叫做訊息消費執行緒池。這種模型之所以在訊息消費執行緒池處理業務,是因為它多用於處理輕量級別的業務,例如:快取查詢、本地計算等。

非同步執行緒模型

在這種執行緒模型中,客戶端為每一個消費者流使用一個執行緒,每個執行緒負責從Kafka佇列裡消費訊息,並且傳遞消費得到的訊息到後端的非同步執行緒池,在非同步執行緒池中處理業務。我們仍然把前面負責消費訊息的執行緒池稱為訊息消費執行緒池,把後面的非同步執行緒池稱為非同步業務執行緒池。這種執行緒模型適合重量級的業務,例如:業務中有大量的IO操作、網路IO操作、複雜計算、對外部系統的呼叫等。

後端的非同步業務執行緒池又細分為所有消費者流共享執行緒池和每個流獨享執行緒池。下面詳細介紹下。

所有消費者流共享執行緒池:所有消費者流共享執行緒池對比每個流獨享執行緒池,建立更少的執行緒池物件,能節省些許的記憶體,但是,由於多個流共享同一個執行緒池,在資料量較大的時候,流之間的處理可能互相影響。例如,一個業務使用2個區和兩個流,他們一一對應,通過生產者指定定製化的雜湊函式替換預設的key-hash, 實現一個流(區)用來處理普通使用者,另外一個流(區)用來處理VIP使用者,如果兩個流共享一個執行緒池,當普通使用者的訊息大量產生的時候,VIP使用者只有少量,並且排在了佇列的後頭,就會產生餓死的情況。這個場景是可以使用多個topic來解決,一個普通使用者的topic,一個VIP使用者的topic,但是這樣又要多維護一個topic,客戶端傳送的時候需要顯式的進行判斷topic目標,也沒有多少好處。

每個流獨享執行緒池:每個流獨享執行緒池使用不同的非同步業務執行緒池來處理不同的流裡面的訊息,互相隔離,互相獨立,不互相影響,對於不同的流(區)的優先順序不同的情況,或者訊息在不同流(區)不均衡的情況下表現會更好,當然,建立多個執行緒池會多使用些許記憶體,但是這並不是一個大問題。

另外,非同步業務執行緒池支援確定數量執行緒的執行緒池和執行緒數量可伸縮的執行緒池。

  • 核心業務硬體資源有保證,核心服務有專享的資源池,或者線上流量可預測,請使用固定數量的執行緒池。
  • 非核心業務一般混布,資源互相調配,線上流量不固定等情況請使用執行緒數量可伸縮的執行緒池。

2. 異常處理

對於訊息處理過程中產生的業務異常,當前在業務處理的上層捕捉了Throwable, 在專用的錯誤恢復日誌中記錄出錯的訊息,後續可根據錯誤恢復日誌人工處理錯誤訊息,也可重做或者洗資料。TODO:考慮實現異常Listener體系結構, 對異常處理實現監聽者模式,異常處理器可插拔等,預設列印錯誤日誌。

由於預設的異常處理中,捕捉異常,在專用的錯誤回覆日誌中記錄錯誤,並且繼續處理下一個訊息。考慮到可能上線失敗,或者上游訊息格式出錯,導致所有訊息處理都出錯,堆滿錯誤恢復日誌的情況,我們需要訴諸於報警和監控系統來解決。

3. 優雅關機

由於消費者本身是一個事件驅動的伺服器,類似Tomcat,Tomcat接收HTTP請求返回HTTP響應,Consumer則接收Kafka訊息,然後處理業務後返回,也可以將處理結果傳送到下一個訊息佇列。所以消費者本身是非常複雜的,除了執行緒模型,異常處理,效能,穩定性,可用性等都是需要思考點。既然消費者是一個後臺的伺服器,我們需要考慮如何優雅的關機,也就是在消費者伺服器在處理訊息的時候,如果關機才能不導致處理的訊息中斷而丟失。

優雅關機的重點在於解決如下3個問題:

  • 如何知道JVM要退出?
  • 如何阻止Daemon的執行緒在JVM退出被殺掉而導致訊息丟失?
  • 如果Worker執行緒在阻塞,如何喚起並退出?

第一個問題: 如果一個後臺程式執行在控制檯的前臺,通過Ctrl + C可以傳送退出訊號給JVM,也可以通過kill -2 PS_IS 或者 kill -15 PS_IS傳送退出訊號,但是不能傳送kill -9 PS_IS, 否則程序會無條件強制退出。JVM收到退出訊號後,會呼叫註冊的鉤子,我們通過的註冊的JVM退出鉤子進行優雅關機。

第二個問題: 執行緒分為Daemon執行緒和非Daemon執行緒,一個執行緒預設繼承父執行緒的Daemon屬性,如果當前執行緒池是由Daemon執行緒建立的,則Worker執行緒是Daemon執行緒。如果Worker執行緒是Daemon執行緒,我們需要在JVM退出鉤子中等待Worker執行緒完成當前手頭處理的訊息,再退出JVM。如果不是Daemon執行緒,即使JVM收到退出訊號,也得等待Worker執行緒退出後再退出,不會丟掉正在處理的訊息。

第三個問題: 在Worker執行緒從Kafka伺服器消費訊息的時候,Worker執行緒可能處於阻塞,這時需要中斷執行緒以退出,沒有訊息被丟掉。在Worker執行緒處理業務時有可能有阻塞,例如:IO,網路IO,在指定退出時間內沒有完成,我們也需要中斷執行緒退出,這時會產生一個InterruptedException, 在異常處理的預設處理器中被捕捉,並寫入錯誤日誌,Worker執行緒隨後退出。

使用指南

kclient提供了三種使用方法,對於每一種方法,按照下面的步驟可快速構建Kafka生產者和消費者程式。

前置步驟

1) 下載原始碼後在專案根目錄執行如下命令安裝打包檔案到你的Maven本地庫。

mvn install

2) 在你的專案pom.xml檔案中新增對kclient的依賴。

kclient

3) 根據Kafka官方文件搭建Kafka環境,並建立兩個Topic, test1和test2。

4) 然後,從Kafka安裝目錄的config目錄下拷貝kafka-consumer.properties和kafka-producer.properties到你的專案類路徑下,通常是src/main/resources目錄。

Java API

Java API提供了最直接,最簡單的使用kclient的方法。

構建Producer示例:

Producer

構建Consumer示例:

Consumer

Spring環境整合

kclient可以與Spring環境無縫整合,你可以像使用Spring Bean一樣來使用KafkaProducer和KafkaConsumer。

構建Producer示例:

Producer

構建Consumer示例:

Consumer

服務原始碼註解

kclient提供了類似Spring宣告式的程式設計方法,使用註解宣告Kafka處理器方法,所有的執行緒模型、異常處理、服務啟動和關閉等都由後臺服務自動完成,極大程度的簡化了API的使用方法,提高了開發者的工作效率。

註解宣告Kafka訊息處理器:

Kafka

註解啟動程式:

註解Spring環境配置:

API簡介

Producer API

KafkaProducer類提供了豐富的API來發送不同型別的訊息,它支援傳送字串訊息,傳送一個普通的Bean,以及傳送JSON物件等。在這些API中可以指定傳送到某個Topic,也可以不指定而使用預設的Topic。對於傳送的資料,支援帶Key值的訊息和不帶Key值的訊息。

傳送字串訊息:

 Producer API

傳送Bean訊息:

Bean

傳送JSON物件訊息:

JSON

Consumer API

KafkaConsumer類提供了豐富的建構函式用來指定Kafka消費者伺服器的各項引數,包括執行緒池策略,執行緒池型別,流數量等等。

使用PROPERTIES檔案初始化:

使用PROPERTIES物件初始化以及訊息處理器註解、訊息處理機模板專案可以檢視以下連結繼續閱讀:

效能壓測

Benchmark應該覆蓋推送QPS、接收處理QPS以及單執行緒、多執行緒生產者的用例。

用例1: 輕量級服務同步執行緒模型和非同步執行緒模型的效能對比。

用例2: 重量級服務同步執行緒模型和非同步執行緒模型的效能對比。

用例3: 重量級服務非同步執行緒模型中所有消費者流共享執行緒池和每個流獨享執行緒池的效能對比。

用例4: 重量級服務非同步執行緒模型中每個流獨享執行緒池的對比的確定數量執行緒的執行緒池和執行緒數量可伸縮的執行緒池的效能對比。

由於筆者在發文的時候還沒有時間完成前面四種場景的壓測,暫時留給讀者親自動手,作為一個練習。

更多思考

儘管本文設計和實現的kclient專案提供了許多高階功能,並且使用起來方便,而且筆者在幾家公司裡在線上進行了應用,已經發揮了不少的作用,但是,還有一些細節需要提高。

用例1:kclient處理器專案中管理Restful服務暫時只提供了獲得狀態的API,需要進行進一步的豐富,增加對執行緒池的監控,以及訊息處理效能的監控。

用例2:當前註解@ErrorHandler裡面的exception引數為必選,完全可以使用方法第一引數進行推導,簡化開發人員配置的工作。

用例3:模板專案還不完善,需要增加啟動和關閉指令碼,這樣讀者可以直接拷貝使用。

用例4:儘管線上應用已經證明了kclient沒有效能問題,但是開發一款中介軟體系統是需要閉環的,需要儘快把效能壓測這塊昨晚並且形成壓測報表。

文章來自微信公眾號:聊聊架構