Kafka原始碼之服務端分析之SocketServer
前面我們介紹了消費者和生產者,從這篇文章開始我們來看一下它的服務端的設計。 Kafka的網路層是採用多執行緒,多個Selector的設計實現的。核心類是SocketServer,其中包含一個Acceptor 用於接收並處理所有的新連線,每個Acceptor對應多個Processor執行緒,每個Processor執行緒擁有自己的Selector,主要用於從連線中讀取請求和寫回響應。每個Acceptor對應多個Handler執行緒,主要用於處理請求並將產生的響應返回給Processor執行緒。Processor執行緒與Handler執行緒之間通過RequestChannel進行通訊。 下面我們來看SocketServer的具體實現: endpoints:Endpoint集合。一般的伺服器都有多塊網絡卡,可以配置多個IP,Kafka可以同時監聽多個埠。EndPoint類中封裝了需要監聽的host、port及使用的網路協議。每個EndPoint都會建立一個對應的Acceptor物件。 numProcessorThreads:Processor執行緒的個數 totalProcessorThreads:Processor執行緒的總個數 maxQueuedRequests:在RequestChannel的requestQueue中快取的最大請求個數 maxConnectionsPerIp:每個IP上能建立的最大連線數 maxConnectionsPerIpOverrides:Map<String,Int>型別,具體指定某IP上最大的連線數,這裡指定的最大連線數會覆蓋上面的maxConnectionsPerIp欄位的值 requestChannel:Processor執行緒與Handler執行緒之間交換資料的佇列 acceptors:Acceptor物件集合 processors:Processor執行緒的集合 connectionQuotas:ConnectionQuotas型別的物件,提供了控制每個IP上的最大連線數的功能。 我們先來看一下SocketServer的初始化流程:
//建立RequestChannel,其中有totalProcessorThreads個responseQueue個佇列 val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests) //建立Processor陣列 private val processors = new Array[Processor](totalProcessorThreads) //建立Acceptor的集合 private[network] val acceptors = mutable.Map[EndPoint, Acceptor]() //向RequestChannel中新增一個監聽器。此監聽器實現的功能是:當Handler執行緒向某個ResponseQueue中寫入資料時,會喚醒對應的Processor執行緒進行處理 requestChannel.addResponseListener(id => processors(id).wakeup()) //SocketServer初始化的核心程式碼 def startup() { this.synchronized { //建立connectionQuotas connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) //Socket的sendBuffer大小 val sendBufferSize = config.socketSendBufferBytes //Socket的recvBuffer大小 val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId var processorBeginIndex = 0 //遍歷endpoints集合 endpoints.values.foreach { endpoint => val protocol = endpoint.protocolType val processorEndIndex = processorBeginIndex + numProcessorThreads //從processorBeginIndex 到processorEndIndex都是當前endpoint對應的Processor for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, protocol) //為當前endPoint建立一個acceptor val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) //啟動acceptor' Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start() //阻塞等待acceptor啟動完成 acceptor.awaitStartup() processorBeginIndex = processorEndIndex } }
在初始化程式碼中,主要就是對我們前面介紹的那幾個欄位進行了初始化
def shutdown() = {
info("Shutting down")
this.synchronized {
acceptors.values.foreach(_.shutdown)
processors.foreach(_.shutdown)
}
info("Shutdown completed")
}
關閉操作會把所有的acceptor和processor關閉 Acceptor和Processor都繼承了AbstractServerThread,他是實現了Runnable介面的抽象類,提供了一些啟動關閉相關的控制類方法,他有四個關鍵欄位: alive:標識當前執行緒是否存活 shutdownLatch:標識當前執行緒shutdown是否完成 startupLatch:標識了當前執行緒的startup操作是否完成 connectionQuotas:在close方法中,根據傳入的ConnectionId,關閉SocketChannel並減少ConnectionQuotas中記錄的連線數 下main看一下幾個常用的方法:
def shutdown(): Unit = {
//標誌為關閉狀態
alive.set(false)
//喚醒當前執行緒
wakeup()
//阻塞到shutdown完成
shutdownLatch.await()
}
//阻塞等待啟動完成
def awaitStartup(): Unit = startupLatch.await
//啟動完成,喚醒執行緒
protected def startupComplete() = {
startupLatch.countDown()
}
//shutdown完成,喚醒執行緒
protected def shutdownComplete() = shutdownLatch.countDown()
//關閉連線
def close(selector: KSelector, connectionId: String) {
//獲取到當前要關閉的channel
val channel = selector.channel(connectionId)
if (channel != null) {
debug(s"Closing selector connection $connectionId")
val address = channel.socketAddress
//修改連線數
if (address != null)
connectionQuotas.dec(address)
//關閉連線
selector.close(connectionId)
}
}
Acceptor的作用是用來接收連線請求,建立Socket併為它分配一個Processor
//建立NIOSelector
private val nioSelector = NSelector.open()
//建立一個ServerSocketChannel
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
this.synchronized {
//為沒個Processor建立並啟動一個執行緒
processors.foreach { processor =>
Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
}
}
下面看一下Acceptor的核心邏輯run方法:
def run() {
//將當前ServerSocketChannel註冊到selector上,對OP_ACCEPT事件感興趣
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
//標識啟動操作已經完成
startupComplete()
try {
var currentProcessor = 0
//檢測執行狀態
while (isRunning) {
try {
//阻塞500ms輪詢io事件
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
//遍歷輪詢到的io事件
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}
在方法裡面首先實現了對OP_ACCEPT事件的監聽,然後在一個迴圈中不斷輪詢io事件和處理io事件,主要是對accpet事件的處理:
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
//建立socketChannel
val socketChannel = serverSocketChannel.accept()
try {
//增加連線數
connectionQuotas.inc(socketChannel.socket().getInetAddress)
//設定為非阻塞
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
socketChannel.socket().setSendBufferSize(sendBufferSize)
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))
//將當前socketChannel交給processor處理
processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}
Processor主要用於完成讀取請求和寫回響應的操作,Processor不參與具體邏輯的處理 newConnections:儲存了由此Processor處理的新建的SocketChannel inflightResponses:儲存未傳送的響應 selector:KSelector型別 requestChannel:Processor與Handler執行緒之間傳遞資料的佇列
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
首先processor會將分配的socketchannel新增到自己的佇列中,然後喚醒執行緒來工作:
def wakeup = selector.wakeup()
這裡是KSelector,最終會呼叫NSelector來完成喚醒工作 下面我們先來看一下processor的run方法的整體流程: 1、首先呼叫startComplete方法,標識已經初始化完成 2、處理newConnections佇列中的新建的SocketChannel
private def configureNewConnections() {
while (!newConnections.isEmpty) {
//從佇列中彈出一個未處理的socketchannel
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
//獲取channel上的一些資訊
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
//註冊到selector上
selector.register(connectionId, channel)
} catch {
// We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
// throwables will be caught in processor and logged as uncaught exceptions.
case NonFatal(e) =>
// need to close the channel here to avoid a socket leak.
close(channel)
error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e)
}
}
}
public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
//註冊OP_READ事件
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
key.attach(channel);
//新增到集合中
this.channels.put(id, channel);
}
3、獲取RequestChannel中對應的responseQueue佇列,並處理其中快取的Response。如果Response是SendAction型別,表示該Resposne需要傳送給客戶端,則查詢對應的KafkaChannel,為其註冊OP_WRITE事件,並將KafkaChannel.send欄位指向待發送的Response物件。同時還會將Response從responseQueue佇列移除,放入inflightResponses中。如果Response是NoOpAction型別,表示此連線暫無響應需要傳送,則為KafkaChannel註冊OP_READ,允許其繼續讀取請求。如果Response是CloseConnectionAction型別,則關閉對應的連線
private def processNewResponses() {
//獲取當前processor對應的Response
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
//根據Response不同的模式來選擇不同的處理方法
curr.responseAction match {
case RequestChannel.NoOpAction =>
curr.request.updateRequestMetrics
trace("Socket server received empty response to send, registering for read: " + curr)
selector.unmute(curr.request.connectionId)
case RequestChannel.SendAction =>
sendResponse(curr)
case RequestChannel.CloseConnectionAction =>
curr.request.updateRequestMetrics
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id)
}
}
}
4、呼叫SocketServer.poll方法讀取請求,傳送響應。poll方法底層呼叫的是KSelector.poll方法
private def poll() {
try selector.poll(300)
catch {
case e @ (_: IllegalStateException | _: IOException) =>
error(s"Closing processor $id due to illegal state or IO exception")
swallow(closeAll())
shutdownComplete()
throw e
}
}
每次呼叫都會將讀取的請求、傳送成功的請求以及斷開的連線放入其completedReceives、completeSends、disconnected佇列中等待處理 5、呼叫processCompletedReceives方法處理KSelector.completedReceives佇列。
private def processCompletedReceives() {
//遍歷這個佇列
selector.completedReceives.asScala.foreach { receive =>
try {
//獲取對應的KafkaChannel
val channel = selector.channel(receive.source)
//建立KafkaChannel對應的Session物件,與許可權控制相關
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
//將NetworkReceive、ProcessorId、身份認證資訊封裝成RequestChannel.Request物件
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
//將RequestChannel.Request放入RequestChannel.requestQueue佇列中等待處理
requestChannel.sendRequest(req)
//取消註冊的OP_READ事件,連線不再讀取資料
selector.mute(receive.source)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}
6、呼叫processCompletedSends方法處理KSelector.completedSends佇列。首先inflightResponses中儲存的對應的Response刪除。之後,為對應連線重新註冊OP_READ事件,允許從該連線讀取資料
private def processCompletedSends() {
//遍歷佇列
selector.completedSends.asScala.foreach { send =>
//此相應已經發送出去,從佇列中刪除
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
//允許此連線讀取資料
selector.unmute(send.destination)
}
}
7、呼叫processDisconnected方法處理KSelector.disconnected佇列。先從inflightResponses中刪除該連線對應的所有Response。然後,減少ConnectionQuotas中記錄的連線數。
private def processDisconnected() {
selector.disconnected.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(InetAddress.getByName(remoteHost))
}
}
8、當呼叫SocketServer.shutdown關閉整個SocketServer時,將alive欄位設定為false。 下面我們來看具體的實現:
override def run() {
//標識processor啟動完成
startupComplete()
while (isRunning) {
try {
// 處理新的連線
configureNewConnections()
// 處理RequestChannel中快取的響應
processNewResponses()
//讀取請求
poll()
//處理不同佇列中的請求
processCompletedReceives()
processCompletedSends()
processDisconnected()
} catch {
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}
debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}
RequestChannel Processor執行緒與Handler執行緒之間傳遞資料是通過RequestChannel完成的。在RequestChannel中包含了一個requestQueue佇列和多個responseQueues佇列,每個Processor執行緒對應一個responseQueue。Processor執行緒將讀取到的請求存入requestQueue中,Handler執行緒從requestQueue中,取出請求進行處理;Handler執行緒處理請求產生的響應會存放到Processor對應的responseQueue中,Processor執行緒會從對應的responseQueue中取出響應併發送給客戶端。 我們先來看一下RequestChannel的幾個核心欄位: requestQueue:Processor執行緒向Handler執行緒傳遞請求的佇列。 responseQueues:一個數組,Handler執行緒向Processor執行緒傳遞響應的佇列 numProcessors:Processor執行緒的數目 queueSize:快取請求的最大個數 responseListeners:監聽器列表
//向responseQueue佇列中新增SendAction型別的Response
def sendResponse(response: RequestChannel.Response) {
responseQueues(response.processor).put(response)
//呼叫監聽器
for(onResponse <- responseListeners)
onResponse(response.processor)
}