1. 程式人生 > >Tomcat 連線數與執行緒池詳解 | BIO/NIO有何不同 | 簡談Kafka中的NIO網路通訊模型

Tomcat 連線數與執行緒池詳解 | BIO/NIO有何不同 | 簡談Kafka中的NIO網路通訊模型

前言

在使用tomcat時,經常會遇到連線數、執行緒數之類的配置問題,要真正理解這些概念,必須先了解Tomcat的聯結器(Connector)。

在前面的文章 詳解Tomcat配置檔案server.xml 中寫到過:Connector的主要功能,是接收連線請求,建立Request和Response物件用於和請求端交換資料;然後分配執行緒讓Engine(也就是Servlet容器)來處理這個請求,並把產生的Request和Response物件傳給Engine。當Engine處理完請求後,也會通過Connector將響應返回給客戶端。

可以說,Servlet容器處理請求,是需要Connector進行排程和控制的,Connector是Tomcat處理請求的主幹,因此Connector的配置和使用對Tomcat的效能有著重要的影響。這篇文章將從Connector入手,討論一些與Connector有關的重要問題,包括NIO/BIO模式、執行緒池、連線數等。

根據協議的不同,Connector可以分為HTTP Connector、AJP Connector等,本文只討論HTTP Connector。

一、Nio、Bio、APR

1、Connector的protocol

Connector在處理HTTP請求時,會使用不同的protocol。不同的Tomcat版本支援的protocol不同,其中最典型的protocol包括BIO、NIO和APR(Tomcat7中支援這3種,Tomcat8增加了對NIO2的支援,而到了Tomcat8.5和Tomcat9.0,則去掉了對BIO的支援)。

BIO是Blocking IO,顧名思義是阻塞的IO;NIO是Non-blocking IO,則是非阻塞的IO。而APR是Apache Portable Runtime,是Apache可移植執行庫,利用本地庫可以實現高可擴充套件性、高效能;Apr是在Tomcat上執行高併發應用的首選模式,但是需要安裝apr、apr-utils、tomcat-native等包。點選檢視 Tomcat Server 配置檔案詳解。

2、如何指定protocol

Connector使用哪種protocol,可以通過元素中的protocol屬性進行指定,也可以使用預設值。

指定的protocol取值及對應的協議如下:

HTTP/1.1:預設值,使用的協議與Tomcat版本有關

org.apache.coyote.http11.Http11Protocol:BIO

org.apache.coyote.http11.Http11NioProtocol:NIO

org.apache.coyote.http11.Http11Nio2Protocol:NIO2

org.apache.coyote.http11.Http11AprProtocol:APR

如果沒有指定protocol,則使用預設值HTTP/1.1,其含義如下:在Tomcat7中,自動選取使用BIO或APR(如果找到APR需要的本地庫,則使用APR,否則使用BIO);在Tomcat8中,自動選取使用NIO或APR(如果找到APR需要的本地庫,則使用APR,否則使用NIO)。

3、BIO/NIO有何不同

無論是BIO,還是NIO,Connector處理請求的大致流程是一樣的:

在accept佇列中接收連線(當客戶端向伺服器傳送請求時,如果客戶端與OS完成三次握手建立了連線,則OS將該連線放入accept佇列);在連線中獲取請求的資料,生成request;呼叫servlet容器處理請求;返回response。為了便於後面的說明,首先明確一下連線與請求的關係:連線是TCP層面的(傳輸層),對應socket;請求是HTTP層面的(應用層),必須依賴於TCP的連線實現;一個TCP連線中可能傳輸多個HTTP請求。

在BIO實現的Connector中,處理請求的主要實體是JIoEndpoint物件。JIoEndpoint維護了Acceptor和Worker:Acceptor接收socket,然後從Worker執行緒池中找出空閒的執行緒處理socket,如果worker執行緒池沒有空閒執行緒,則Acceptor將阻塞。其中Worker是Tomcat自帶的執行緒池,如果通過配置了其他執行緒池,原理與Worker類似。

在NIO實現的Connector中,處理請求的主要實體是NIoEndpoint物件。NIoEndpoint中除了包含Acceptor和Worker外,還是用了Poller,處理流程如下圖所示(圖片來源:http://gearever.iteye.com/blog/1844203)。

乾貨|Tomcat 連線數與執行緒池詳解

Acceptor接收socket後,不是直接使用Worker中的執行緒處理請求,而是先將請求傳送給了Poller,而Poller是實現NIO的關鍵。Acceptor向Poller傳送請求通過佇列實現,使用了典型的生產者-消費者模式。在Poller中,維護了一個Selector物件;當Poller從佇列中取出socket後,註冊到該Selector中;然後通過遍歷Selector,找出其中可讀的socket,並使用Worker中的執行緒處理相應請求。與BIO類似,Worker也可以被自定義的執行緒池代替。點選檢視 Tomcat Server 配置檔案詳解。

通過上述過程可以看出,在NIoEndpoint處理請求的過程中,無論是Acceptor接收socket,還是執行緒處理請求,使用的仍然是阻塞方式;但在“讀取socket並交給Worker中的執行緒”的這個過程中,使用非阻塞的NIO實現,這是NIO模式與BIO模式的最主要區別(其他區別對效能影響較小,暫時略去不提)。而這個區別,在併發量較大的情形下可以帶來Tomcat效率的顯著提升:

目前大多數HTTP請求使用的是長連線(HTTP/1.1預設keep-alive為true),而長連線意味著,一個TCP的socket在當前請求結束後,如果沒有新的請求到來,socket不會立馬釋放,而是等timeout後再釋放。如果使用BIO,“讀取socket並交給Worker中的執行緒”這個過程是阻塞的,也就意味著在socket等待下一個請求或等待釋放的過程中,處理這個socket的工作執行緒會一直被佔用,無法釋放;因此Tomcat可以同時處理的socket數目不能超過最大執行緒數,效能受到了極大限制。而使用NIO,“讀取socket並交給Worker中的執行緒”這個過程是非阻塞的,當socket在等待下一個請求或等待釋放時,並不會佔用工作執行緒,因此Tomcat可以同時處理的socket數目遠大於最大執行緒數,併發效能大大提高。

二、3個引數:acceptCount、maxConnections、maxThreads

再回顧一下Tomcat處理請求的過程:在accept佇列中接收連線(當客戶端向伺服器傳送請求時,如果客戶端與OS完成三次握手建立了連線,則OS將該連線放入accept佇列);在連線中獲取請求的資料,生成request;呼叫servlet容器處理請求;返回response。

相對應的,Connector中的幾個引數功能如下:

1、acceptCount

accept佇列的長度;當accept佇列中連線的個數達到acceptCount時,佇列滿,進來的請求一律被拒絕。預設值是100。

2、maxConnections

Tomcat在任意時刻接收和處理的最大連線數。當Tomcat接收的連線數達到maxConnections時,Acceptor執行緒不會讀取accept佇列中的連線;這時accept佇列中的執行緒會一直阻塞著,直到Tomcat接收的連線數小於maxConnections。如果設定為-1,則連線數不受限制。

預設值與聯結器使用的協議有關:NIO的預設值是10000,APR/native的預設值是8192,而BIO的預設值為maxThreads(如果配置了Executor,則預設值是Executor的maxThreads)。

在windows下,APR/native的maxConnections值會自動調整為設定值以下最大的1024的整數倍;如設定為2000,則最大值實際是1024。

3、maxThreads

請求處理執行緒的最大數量。預設值是200(Tomcat7和8都是的)。如果該Connector綁定了Executor,這個值會被忽略,因為該Connector將使用繫結的Executor,而不是內建的執行緒池來執行任務。

maxThreads規定的是最大的執行緒數目,並不是實際running的CPU數量;實際上,maxThreads的大小比CPU核心數量要大得多。這是因為,處理請求的執行緒真正用於計算的時間可能很少,大多數時間可能在阻塞,如等待資料庫返回資料、等待硬碟讀寫資料等。因此,在某一時刻,只有少數的執行緒真正的在使用物理CPU,大多數執行緒都在等待;因此執行緒數遠大於物理核心數才是合理的。

換句話說,Tomcat通過使用比CPU核心數量多得多的執行緒數,可以使CPU忙碌起來,大大提高CPU的利用率。

4、引數設定

(1)maxThreads的設定既與應用的特點有關,也與伺服器的CPU核心數量有關。通過前面介紹可以知道,maxThreads數量應該遠大於CPU核心數量;而且CPU核心數越大,maxThreads應該越大;應用中CPU越不密集(IO越密集),maxThreads應該越大,以便能夠充分利用CPU。當然,maxThreads的值並不是越大越好,如果maxThreads過大,那麼CPU會花費大量的時間用於執行緒的切換,整體效率會降低。

(2)maxConnections的設定與Tomcat的執行模式有關。如果tomcat使用的是BIO,那麼maxConnections的值應該與maxThreads一致;如果tomcat使用的是NIO,那麼類似於Tomcat的預設值,maxConnections值應該遠大於maxThreads。

(3)通過前面的介紹可以知道,雖然tomcat同時可以處理的連線數目是maxConnections,但伺服器中可以同時接收的連線數為maxConnections+acceptCount 。acceptCount的設定,與應用在連線過高情況下希望做出什麼反應有關係。如果設定過大,後面進入的請求等待時間會很長;如果設定過小,後面進入的請求立馬返回connection refused。點選檢視 Tomcat Server 配置檔案詳解。

三、執行緒池Executor

Executor元素代表Tomcat中的執行緒池,可以由其他元件共享使用;要使用該執行緒池,元件需要通過executor屬性指定該執行緒池。

Executor是Service元素的內嵌元素。一般來說,使用執行緒池的是Connector元件;為了使Connector能使用執行緒池,Executor元素應該放在Connector前面。Executor與Connector的配置舉例如下:

Executor的主要屬性包括:

name:該執行緒池的標記

maxThreads:執行緒池中最大活躍執行緒數,預設值200(Tomcat7和8都是)

minSpareThreads:執行緒池中保持的最小執行緒數,最小值是25

maxIdleTime:執行緒空閒的最大時間,當空閒超過該值時關閉執行緒(除非執行緒數小於minSpareThreads),單位是ms,預設值60000(1分鐘)

daemon:是否後臺執行緒,預設值true

threadPriority:執行緒優先順序,預設值5

namePrefix:執行緒名字的字首,執行緒池中執行緒名字為:namePrefix+執行緒編號

四、檢視當前狀態

上面介紹了Tomcat連線數、執行緒數的概念以及如何設定,下面說明如何檢視伺服器中的連線數和執行緒數。

檢視伺服器的狀態,大致分為兩種方案:(1)使用現成的工具,(2)直接使用Linux的命令檢視。

現成的工具,如JDK自帶的jconsole工具可以方便的檢視執行緒資訊(此外還可以檢視CPU、記憶體、類、JVM基本資訊等),Tomcat自帶的manager,收費工具New Relic等。下圖是jconsole檢視執行緒資訊的介面:

乾貨|Tomcat 連線數與執行緒池詳解

下面說一下如何通過Linux命令列,檢視伺服器中的連線數和執行緒數。

1、連線數

假設Tomcat接收http請求的埠是8083,則可以使用如下語句檢視連線情況:

netstat –nat | grep 8083

結果如下所示:

乾貨|Tomcat 連線數與執行緒池詳解

可以看出,有一個連線處於listen狀態,監聽請求;除此之外,還有4個已經建立的連線(ESTABLISHED)和2個等待關閉的連線(CLOSE_WAIT)。

2、執行緒

ps命令可以檢視程序狀態,如執行如下命令:

ps –e | grep java

結果如下圖:

乾貨|Tomcat 連線數與執行緒池詳解

可以看到,只打印了一個程序的資訊;27989是執行緒id,java是指執行的java命令。這是因為啟動一個tomcat,內部所有的工作都在這一個程序裡完成,包括主執行緒、垃圾回收執行緒、Acceptor執行緒、請求處理執行緒等等。

通過如下命令,可以看到該程序內有多少個執行緒;其中,nlwp含義是number of light-weight process。

ps –o nlwp 27989

乾貨|Tomcat 連線數與執行緒池詳解

可以看到,該程序內部有73個執行緒;但是73並沒有排除處於idle狀態的執行緒。要想獲得真正在running的執行緒數量,可以通過以下語句完成:

ps -eLo pid ,stat | grep 27989 | grep running | wc -l

其中ps -eLo pid ,stat可以找出所有執行緒,並列印其所在的程序號和執行緒當前的狀態;兩個grep命令分別篩選程序號和執行緒狀態;wc統計個數。其中,ps -eLo pid ,stat | grep 27989輸出的結果如下:

乾貨|Tomcat 連線數與執行緒池詳解

圖中只截圖了部分結果;Sl表示大多數執行緒都處於空閒狀態。

=========================================

簡談Kafka中的NIO網路通訊模型

基本上已經較為詳細地將RocketMQ這款分散式訊息佇列的RPC通訊部分的協議格式、訊息編解碼、通訊方式(同步/非同步/單向)、訊息收發流程和Netty的Reactor多執行緒分離處理架構講了一遍。同時,聯想業界大名鼎鼎的另一款開源分散式訊息佇列—Kafka,具備高吞吐量和高併發的特性,其網路通訊層是如何做到訊息的高效傳輸的呢?為了解開自己心中的疑慮,就查閱了Kafka的Network通訊模組的原始碼,乘機會寫本篇文章。

本文主要通過對Kafka原始碼的分析來簡述其Reactor的多執行緒網路通訊模型和總體框架結構,同時簡要介紹Kafka網路通訊層的設計與具體實現。

一、Kafka網路通訊模型的整體框架概述

Kafka的網路通訊模型是基於NIO的Reactor多執行緒模型來設計的。這裡先引用Kafka原始碼中註釋的一段話:

An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.

相信大家看了上面的這段引文註釋後,大致可以瞭解到Kafka的網路通訊層模型,主要採用了1(1個Acceptor執行緒)+N(N個Processor執行緒)+M(M個業務處理執行緒)。下面的表格簡要的列舉了下(這裡先簡單的看下後面還會詳細說明):

執行緒數 執行緒名 執行緒具體說明
1 kafka-socket-acceptor_%x Acceptor執行緒,負責監聽Client端發起的請求
N kafka-network-thread_%d Processor執行緒,負責對Socket進行讀寫
M kafka-request-handler-_%d Worker執行緒,處理具體的業務邏輯並生成Response返回

Kafka網路通訊層的完整框架圖如下圖所示:

訊息中介軟體—簡談Kafka中的NIO網路通訊模型

剛開始看到上面的這個框架圖可能會有一些不太理解,並不要緊,這裡可以先對Kafka的網路通訊層框架結構有一個大致瞭解。本文後面會結合Kafka的部分重要原始碼來詳細闡述上面的過程。這裡可以簡單總結一下其網路通訊模型中的幾個重要概念:

(1) Acceptor:1個接收執行緒,負責監聽新的連線請求,同時註冊OPACCEPT 事件,將新的連線按照"round robin"方式交給對應的 Processor 執行緒處理;

(2) Processor:N個處理器執行緒,其中每個 Processor 都有自己的 selector,它會向 Acceptor 分配的 SocketChannel 註冊相應的 OPREAD 事件,N 的大小由“num.networker.threads”決定;

(3) KafkaRequestHandler:M個請求處理執行緒,包含線上程池—KafkaRequestHandlerPool內部,從RequestChannel的全域性請求佇列—requestQueue中獲取請求資料並交給KafkaApis處理,M的大小由“num.io.threads”決定;

(4) RequestChannel:其為Kafka服務端的請求通道,該資料結構中包含了一個全域性的請求佇列 requestQueue和多個與Processor處理器相對應的響應佇列responseQueue,提供給Processor與請求處理執行緒KafkaRequestHandler和KafkaApis交換資料的地方;

(5) NetworkClient:其底層是對 Java NIO 進行相應的封裝,位於Kafka的網路介面層。Kafka訊息生產者物件—KafkaProducer的send方法主要呼叫NetworkClient完成訊息傳送;

(6) SocketServer:其是一個NIO的服務,它同時啟動一個Acceptor接收執行緒和多個Processor處理器執行緒。提供了一種典型的Reactor多執行緒模式,將接收客戶端請求和處理請求相分離;

(7) KafkaServer:代表了一個Kafka Broker的例項;其startup方法為例項啟動的入口;

(8) KafkaApis:Kafka的業務邏輯處理Api,負責處理不同型別的請求;比如“傳送訊息”、“獲取訊息偏移量—offset”和“處理心跳請求”等;

二、Kafka網路通訊層的設計與具體實現

這一節將結合Kafka網路通訊層的原始碼來分析其設計與實現,這裡主要詳細介紹網路通訊層的幾個重要元素——SocketServer、Acceptor、Processor、RequestChannel、KafkaRequestHandler 和 KafkaApis。本文分析的原始碼部分均基於 Kafka 的 0.11.0 版本。

1、SocketServer

SocketServer是接收客戶端Socket請求連線、處理請求並返回處理結果的核心類,Acceptor及Processor的初始化、處理邏輯都是在這裡實現的。在KafkaServer例項啟動時會呼叫其startup的初始化方法,會初始化1個 Acceptor和N個Processor執行緒(每個EndPoint都會初始化,一般來說一個Server只會設定一個埠),其實現如下:

  1. def startup {

  2. this.synchronized {

  3. connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

  4. val sendBufferSize = config.socketSendBufferBytes

  5. val recvBufferSize = config.socketReceiveBufferBytes

  6. val brokerId = config.brokerId

  7. var processorBeginIndex = 0

  8. // 一個broker一般只設置一個埠

  9. config.listeners.foreach { endpoint =>

  10. val listenerName = endpoint.listenerName

  11. val securityProtocol = endpoint.securityProtocol

  12. val processorEndIndex = processorBeginIndex + numProcessorThreads

  13. //N 個 processor

  14. for (i <- processorBeginIndex until processorEndIndex)

  15. processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)

  16. //1個 Acceptor

  17. val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,

  18. processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)

  19. acceptors.put(endpoint, acceptor)

  20. KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start

  21. acceptor.awaitStartup

  22. processorBeginIndex = processorEndIndex

  23. }

  24. }

2、Acceptor

Acceptor是一個繼承自抽象類AbstractServerThread的執行緒類。Acceptor的主要任務是監聽並且接收客戶端的請求,同時建立資料傳輸通道—SocketChannel,然後以輪詢的方式交給一個後端的Processor執行緒處理(具體的方式是新增socketChannel至併發佇列並喚醒Processor執行緒處理)。

在該執行緒類中主要可以關注以下兩個重要的變數:

(1) nioSelector:通過NSelector.open方法建立的變數,封裝了JAVA NIO Selector的相關操作;

(2) serverChannel:用於監聽埠的服務端Socket套接字物件;

下面來看下Acceptor主要的run方法的原始碼:

  1. def run {

  2. //首先註冊OP_ACCEPT事件

  3. serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

  4. startupComplete

  5. try {

  6. var currentProcessor = 0

  7. //以輪詢方式查詢並等待關注的事件發生

  8. while (isRunning) {

  9. try {

  10. val ready = nioSelector.select(500)

  11. if (ready > 0) {

  12. val keys = nioSelector.selectedKeys

  13. val iter = keys.iterator

  14. while (iter.hasNext && isRunning) {

  15. try {

  16. val key = iter.next

  17. iter.remove

  18. if (key.isAcceptable)

  19. //如果事件發生則呼叫accept方法對OP_ACCEPT事件處理

  20. accept(key, processors(currentProcessor))

  21. else

  22. throw new IllegalStateException("Unrecognized key state for acceptor thread.")

  23. //輪詢演算法

  24. // round robin to the next processor thread

  25. currentProcessor = (currentProcessor + 1) % processors.length

  26. } catch {

  27. case e: Throwable => error("Error while accepting connection", e)

  28. }

  29. }

  30. }

  31. }

  32. //程式碼省略

  33. }

  34. def accept(key: SelectionKey, processor: Processor) {

  35. val serverSocketChannel = key.channel.asInstanceOf[ServerSocketChannel]

  36. val socketChannel = serverSocketChannel.accept

  37. try {

  38. connectionQuotas.inc(socketChannel.socket.getInetAddress)

  39. socketChannel.configureBlocking(false)

  40. socketChannel.socket.setTcpNoDelay(true)

  41. socketChannel.socket.setKeepAlive(true)

  42. if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)

  43. socketChannel.socket.setSendBufferSize(sendBufferSize)

  44. processor.accept(socketChannel)

  45. } catch {

  46. //省略部分程式碼

  47. }

  48. }

  49. def accept(socketChannel: SocketChannel) {

  50. newConnections.add(socketChannel)

  51. wakeup

  52. }

在上面原始碼中可以看到,Acceptor執行緒啟動後,首先會向用於監聽埠的服務端套接字物件—ServerSocketChannel上註冊OPACCEPT 事件。然後以輪詢的方式等待所關注的事件發生。如果該事件發生,則呼叫accept方法對OPACCEPT事件進行處理。這裡,Processor是通過round robin方法選擇的,這樣可以保證後面多個Processor執行緒的負載基本均勻。 Acceptor的accept方法的作用主要如下:

(1) 通過SelectionKey取得與之對應的serverSocketChannel例項,並呼叫它的accept方法與客戶端建立連線;

(2) 呼叫connectionQuotas.inc方法增加連線統計計數;並同時設定第 (1) 步中建立返回的socketChannel屬性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等) ;

(3) 將socketChannel交給processor.accept方法進行處理。這裡主要是將socketChannel加入Processor處理器的併發佇列newConnections佇列中,然後喚醒Processor執行緒從佇列中獲取socketChannel並處理。其中,newConnections會被Acceptor執行緒和Processor執行緒併發訪問操作,所以newConnections是ConcurrentLinkedQueue佇列(一個基於連結節點的無界執行緒安全佇列)

3、Processor

Processor同Acceptor一樣,也是一個執行緒類,繼承了抽象類AbstractServerThread。其主要是從客戶端的請求中讀取資料和將KafkaRequestHandler處理完響應結果返回給客戶端。在該執行緒類中主要關注以下幾個重要的變數:

(1) newConnections:在上面的Acceptor一節中已經提到過,它是一種ConcurrentLinkedQueue[SocketChannel]型別的佇列,用於儲存新連線交由Processor處理的socketChannel;

(2) inflightResponses:是一個Map[String, RequestChannel.Response]型別的集合,用於記錄尚未傳送的響應;

(3) selector:是一個型別為KSelector變數,用於管理網路連線; 下面先給出Processor處理器執行緒run方法執行的流程圖:

訊息中介軟體—簡談Kafka中的NIO網路通訊模型

從上面的流程圖中能夠可以看出Processor處理器執行緒在其主流程中主要完成了這樣子幾步操作:

(1) 處理newConnections佇列中的socketChannel。遍歷取出佇列中的每個socketChannel並將其在selector上註冊OPREAD事件;

(2) 處理RequestChannel中與當前Processor對應響應佇列中的Response。在這一步中會根據responseAction的型別(NoOpAction/SendAction/CloseConnectionAction)進行判斷,若為“NoOpAction”,表示該連線對應的請求無需響應;若為“SendAction”,表示該Response需要傳送給客戶端,則會通過“selector.send”註冊OPWRITE事件,並且將該Response從responseQueue響應佇列中移至inflightResponses集合中;“CloseConnectionAction”,表示該連線是要關閉的;

(3) 呼叫selector.poll方法進行處理。該方法底層即為呼叫nioSelector.select方法進行處理。

(4) 處理已接受完成的資料包佇列—completedReceives。在processCompletedReceives方法中呼叫“requestChannel.sendRequest”方法將請求Request新增至requestChannel的全域性請求佇列—requestQueue中,等待KafkaRequestHandler來處理。同時,呼叫“selector.mute”方法取消與該請求對應的連線通道上的OPREAD事件;

(5) 處理已傳送完的佇列—completedSends。當已經完成將response傳送給客戶端,則將其從inflightResponses移除,同時通過呼叫“selector.unmute”方法為對應的連線通道重新註冊OPREAD事件;

(6) 處理斷開連線的佇列。將該response從inflightResponses集合中移除,同時將connectionQuotas統計計數減1;

4、RequestChannel

在Kafka的網路通訊層中,RequestChannel為Processor處理器執行緒與KafkaRequestHandler執行緒之間的資料交換提供了一個數據緩衝區,是通訊過程中Request和Response快取的地方。因此,其作用就是在通訊中起到了一個數據緩衝佇列的作用。Processor執行緒將讀取到的請求新增至RequestChannel的全域性請求佇列—requestQueue中;KafkaRequestHandler執行緒從請求佇列中獲取並處理,處理完以後將Response新增至RequestChannel的響應佇列—responseQueue中,並通過responseListeners喚醒對應的Processor執行緒,最後Processor執行緒從響應佇列中取出後傳送至客戶端。

5、KafkaRequestHandler

KafkaRequestHandler也是一種執行緒類,在KafkaServer例項啟動時候會例項化一個執行緒池—KafkaRequestHandlerPool物件(包含了若干個KafkaRequestHandler執行緒),這些執行緒以守護執行緒的方式在後臺執行。在KafkaRequestHandler的run方法中會迴圈地從RequestChannel中阻塞式讀取request,讀取後再交由KafkaApis來具體處理。

6、KafkaApis

KafkaApis是用於處理對通訊網路傳輸過來的業務訊息請求的中心轉發元件。該元件反映出Kafka Broker Server可以提供哪些服務。

三、總結

仔細閱讀Kafka的NIO網路通訊層的原始碼過程中還是可以收穫不少關於NIO網路通訊模組的關鍵技術。Apache的任何一款開源中介軟體都有其設計獨到之處,值得借鑑和學習。對於任何一位使用Kafka這款分散式訊息佇列的同學來說,如果能夠在一定實踐的基礎上,再通過閱讀其原始碼能起到更為深入理解的效果,對於大規模Kafka叢集的效能調優和問題定位都大有裨益。 對於剛接觸Kafka的同學來說,想要自己掌握其NIO網路通訊層模型的關鍵設計,還需要不斷地使用本地環境進行debug除錯和閱讀原始碼反覆思考