1. 程式人生 > >跟我學Kafka之NIO通訊機制

跟我學Kafka之NIO通訊機制

很久沒有做技術方面的分享了,今天閒來有空寫一篇關於Kafka通訊方面的文章與大家共同學習。

一、Kafka通訊機制的整體結構

這個圖採用的就是我們之前提到的SEDA多執行緒模型,連結如下:
http://www.jianshu.com/p/e184fdc0ade4
1、對於broker來說,客戶端連線數量有限,不會頻繁新建大量連線。因此一個Acceptor thread執行緒處理新建連線綽綽有餘。
2、Kafka高吐吞量,則要求broker接收和傳送資料必須快速,因此用proccssor thread執行緒池處理,並把讀取客戶端資料轉交給緩衝區,不會導致客戶端請求大量堆積。
3、Kafka磁碟操作比較頻繁會且有io阻塞或等待,IO Thread執行緒數量一般設定為proccssor thread num兩倍,可以根據執行環境需要進行調節。

二、SocketServer整體設計時序圖

Kafka 通訊時序圖.jpg

說明:

Kafka SocketServer是基於Java NIO來開發的,採用了Reactor的模式,其中包含了1個Acceptor負責接受客戶端請求,N個Processor執行緒負責讀寫資料,M個Handler來處理業務邏輯。在Acceptor和Processor,Processor和Handler之間都有佇列來緩衝請求。

下面我們就針對以上整體設計思路分開講解各個不同部分的原始碼。

2.1 啟動初始化工作

def startup() {
    val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
    for
(i <- 0 until numProcessorThreads) { processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, newMeter("IdlePercent", "percent"
, TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, connectionsMaxIdleMs) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } }) // register the processor threads for notification of responses requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) // start accepting connections this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) Utils.newThread("kafka-socket-acceptor", acceptor, false).start() acceptor.awaitStartup info("Started") }

說明:

ConnectionQuotas物件負責管理連線數/IP, 建立一個Acceptor偵聽者執行緒,初始化N個Processor執行緒,processors是一個執行緒陣列,可以作為執行緒池使用,預設是三個,Acceptor執行緒和N個Processor執行緒中每個執行緒都獨立建立Selector.open()多路複用器,相關程式碼在下面:

val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue));

val serverChannel = openServerSocket(host, port);

範圍可以設定從1到Int的最大值。

2.2 Acceptor執行緒

def run() {
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    startupComplete()
    var currentProcessor = 0
    while(isRunning) {
      val ready = selector.select(500)
      if(ready > 0) {
        val keys = selector.selectedKeys()
        val iter = keys.iterator()
        while(iter.hasNext && isRunning) {
          var key: SelectionKey = null
          try {
            key = iter.next
            iter.remove()
            if(key.isAcceptable)
               accept(key, processors(currentProcessor))
            else
               throw new IllegalStateException("Unrecognized key state for acceptor thread.")

            // round robin to the next processor thread
            currentProcessor = (currentProcessor + 1) % processors.length
          } catch {
            case e: Throwable => error("Error while accepting connection", e)
          }
        }
      }
    }
    debug("Closing server socket and selector.")
    swallowError(serverChannel.close())
    swallowError(selector.close())
    shutdownComplete()
  }

2.1.1 註冊OP_ACCEPT事件

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

2.1.2 內部邏輯

此處採用的是同步非阻塞邏輯,每隔500MS輪詢一次,關於同步非阻塞的知識點在http://www.jianshu.com/p/e9c6690c0737
當有請求到來的時候採用輪詢的方式獲取一個Processor執行緒處理請求,程式碼如下:

currentProcessor = (currentProcessor + 1) % processors.length

之後將程式碼新增到newConnections佇列之後返回,程式碼如下:

def accept(socketChannel: SocketChannel) {  newConnections.add(socketChannel)  wakeup()}

//newConnections是一個執行緒安全的佇列,存放SocketChannel通道
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

2.3 kafka.net.Processor

override def run() {
    startupComplete()
    while(isRunning) {
      // setup any new connections that have been queued up
      configureNewConnections()
      // register any new responses for writing
      processNewResponses()
      val startSelectTime = SystemTime.nanoseconds
      val ready = selector.select(300)
      currentTimeNanos = SystemTime.nanoseconds
      val idleTime = currentTimeNanos - startSelectTime
      idleMeter.mark(idleTime)
      // We use a single meter for aggregate idle percentage for the thread pool.
      // Since meter is calculated as total_recorded_value / time_window and
      // time_window is independent of the number of threads, each recorded idle
      // time should be discounted by # threads.
      aggregateIdleMeter.mark(idleTime / totalProcessorThreads)

      trace("Processor id " + id + " selection time = " + idleTime + " ns")
      if(ready > 0) {
        val keys = selector.selectedKeys()
        val iter = keys.iterator()
        while(iter.hasNext && isRunning) {
          var key: SelectionKey = null
          try {
            key = iter.next
            iter.remove()
            if(key.isReadable)
              read(key)
            else if(key.isWritable)
              write(key)
            else if(!key.isValid)
              close(key)
            else
              throw new IllegalStateException("Unrecognized key state for processor thread.")
          } catch {
            case e: EOFException => {
              info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
              close(key)
            } case e: InvalidRequestException => {
              info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
              close(key)
            } case e: Throwable => {
              error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
              close(key)
            }
          }
        }
      }
      maybeCloseOldestConnection
    }
    debug("Closing selector.")
    closeAll()
    swallowError(selector.close())
    shutdownComplete()
  }

先來重點看一下configureNewConnections這個方法:

private def configureNewConnections() {
    while(newConnections.size() > 0) {
      val channel = newConnections.poll()
      debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
      channel.register(selector, SelectionKey.OP_READ)
    }
  }

迴圈判斷NewConnections的大小,如果有值則彈出,並且註冊為OP_READ讀事件。
再回到主邏輯看一下read方法。

def read(key: SelectionKey) {
    lruConnections.put(key, currentTimeNanos)
    val socketChannel = channelFor(key)
    var receive = key.attachment.asInstanceOf[Receive]
    if(key.attachment == null) {
      receive = new BoundedByteBufferReceive(maxRequestSize)
      key.attach(receive)
    }
    val read = receive.readFrom(socketChannel)
    val address = socketChannel.socket.getRemoteSocketAddress();
    trace(read + " bytes read from " + address)
    if(read < 0) {
      close(key)
    } else if(receive.complete) {
      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
      requestChannel.sendRequest(req)
      key.attach(null)
      // explicitly reset interest ops to not READ, no need to wake up the selector just yet
      key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
    } else {
      // more reading to be done
      trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_READ)
      wakeup()
    }
  }

說明

1、把當前SelectionKey和事件迴圈時間放入LRU對映表中,將來檢查時回收連線資源。
2、建立BoundedByteBufferReceive物件,具體讀取操作由這個物件的readFrom方法負責進行,返回讀取的位元組大小。

  • 如果讀取完成,則修改狀態為receive.complete,並通過requestChannel.sendRequest(req)將封裝好的Request物件放到RequestQueue佇列中。
  • 如果沒有讀取完成,則讓selector繼續偵聽OP_READ事件。

2.4 kafka.server.KafkaRequestHandler

def run() {
    while(true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = SystemTime.nanoseconds
          req = requestChannel.receiveRequest(300)
          val idleTime = SystemTime.nanoseconds - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if(req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(
            id, brokerId))
          return
        }
        req.requestDequeueTimeMs = SystemTime.milliseconds
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)
      } catch {
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }

說明

KafkaRequestHandler也是一個事件處理執行緒,不斷的迴圈讀取requestQueue佇列中的Request請求資料,其中超時時間設定為300MS,並將請求傳送到apis.handle方法中處理,並將請求響應結果放到responseQueue佇列中去。
程式碼如下:

try{
      trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
      request.requestId match {
        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
        case RequestKeys.FetchKey => handleFetchRequest(request)
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } catch {
      case e: Throwable =>
        request.requestObj.handleError(e, requestChannel, request)
        error("error when handling request %s".format(request.requestObj), e)
    } finally
      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  }

說明如下:

引數 說明 對應方法
RequestKeys.ProduceKey producer請求 ProducerRequest
RequestKeys.FetchKey consumer請求 FetchRequest
RequestKeys.OffsetsKey topic的offset請求 OffsetRequest
RequestKeys.MetadataKey topic元資料請求 TopicMetadataRequest
RequestKeys.LeaderAndIsrKey leader和isr資訊更新請求 LeaderAndIsrRequest
RequestKeys.StopReplicaKey 停止replica請求 StopReplicaRequest
RequestKeys.UpdateMetadataKey 更新元資料請求 UpdateMetadataRequest
RequestKeys.ControlledShutdownKey controlledShutdown請求 ControlledShutdownRequest
RequestKeys.OffsetCommitKey commitOffset請求 OffsetCommitRequest
RequestKeys.OffsetFetchKey consumer的offset請求 OffsetFetchRequest

2.5 Processor響應資料處理

private def processNewResponses() {  
  var curr = requestChannel.receiveResponse(id)  
  while(curr != null) {  
    val key = curr.request.requestKey.asInstanceOf[SelectionKey]  
    curr.responseAction match {  
      case RequestChannel.SendAction => {  
        key.interestOps(SelectionKey.OP_WRITE)  
        key.attach(curr)  
      }  
    }  
  curr = requestChannel.receiveResponse(id)  
  }  
}

我們回到Processor執行緒類中,processNewRequest()方法是傳送請求,那麼會呼叫processNewResponses()來處理Handler提供給客戶端的Response,把requestChannel中responseQueue的Response取出來,註冊OP_WRITE事件,將資料返回給客戶端。


小程故事多

小程故事多

支付領域專家,關注分散式,大資料等技術
個人技術部落格:flychao88.iteye.com