1. 程式人生 > >Kafka原始碼之服務端分析之SocketServer

Kafka原始碼之服務端分析之SocketServer

前面我們介紹了消費者和生產者,從這篇文章開始我們來看一下它的服務端的設計。 Kafka的網路層是採用多執行緒,多個Selector的設計實現的。核心類是SocketServer,其中包含一個Acceptor 用於接收並處理所有的新連線,每個Acceptor對應多個Processor執行緒,每個Processor執行緒擁有自己的Selector,主要用於從連線中讀取請求和寫回響應。每個Acceptor對應多個Handler執行緒,主要用於處理請求並將產生的響應返回給Processor執行緒。Processor執行緒與Handler執行緒之間通過RequestChannel進行通訊。 下面我們來看SocketServer的具體實現: endpoints:Endpoint集合。一般的伺服器都有多塊網絡卡,可以配置多個IP,Kafka可以同時監聽多個埠。EndPoint類中封裝了需要監聽的host、port及使用的網路協議。每個EndPoint都會建立一個對應的Acceptor物件。 numProcessorThreads:Processor執行緒的個數 totalProcessorThreads:Processor執行緒的總個數 maxQueuedRequests:在RequestChannel的requestQueue中快取的最大請求個數 maxConnectionsPerIp:每個IP上能建立的最大連線數 maxConnectionsPerIpOverrides:Map<String,Int>型別,具體指定某IP上最大的連線數,這裡指定的最大連線數會覆蓋上面的maxConnectionsPerIp欄位的值 requestChannel:Processor執行緒與Handler執行緒之間交換資料的佇列 acceptors:Acceptor物件集合 processors:Processor執行緒的集合 connectionQuotas:ConnectionQuotas型別的物件,提供了控制每個IP上的最大連線數的功能。 我們先來看一下SocketServer的初始化流程:

//建立RequestChannel,其中有totalProcessorThreads個responseQueue個佇列
 val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
 //建立Processor陣列
 private val processors = new Array[Processor](totalProcessorThreads)
 //建立Acceptor的集合
 private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
 //向RequestChannel中新增一個監聽器。此監聽器實現的功能是:當Handler執行緒向某個ResponseQueue中寫入資料時,會喚醒對應的Processor執行緒進行處理
 requestChannel.addResponseListener(id => processors(id).wakeup())
 //SocketServer初始化的核心程式碼
 def startup() {
    this.synchronized {
	//建立connectionQuotas
      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
	  //Socket的sendBuffer大小
      val sendBufferSize = config.socketSendBufferBytes
      //Socket的recvBuffer大小
      val recvBufferSize = config.socketReceiveBufferBytes
      val brokerId = config.brokerId
	
      var processorBeginIndex = 0
      //遍歷endpoints集合
      endpoints.values.foreach { endpoint =>
        val protocol = endpoint.protocolType
        val processorEndIndex = processorBeginIndex + numProcessorThreads
		//從processorBeginIndex 到processorEndIndex都是當前endpoint對應的Processor
        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, protocol)
		//為當前endPoint建立一個acceptor
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        //啟動acceptor'
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        //阻塞等待acceptor啟動完成
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }

在初始化程式碼中,主要就是對我們前面介紹的那幾個欄位進行了初始化

def shutdown() = {
    info("Shutting down")
    this.synchronized {
      acceptors.values.foreach(_.shutdown)
      processors.foreach(_.shutdown)
    }
    info("Shutdown completed")
  }

關閉操作會把所有的acceptor和processor關閉 Acceptor和Processor都繼承了AbstractServerThread,他是實現了Runnable介面的抽象類,提供了一些啟動關閉相關的控制類方法,他有四個關鍵欄位: alive:標識當前執行緒是否存活 shutdownLatch:標識當前執行緒shutdown是否完成 startupLatch:標識了當前執行緒的startup操作是否完成 connectionQuotas:在close方法中,根據傳入的ConnectionId,關閉SocketChannel並減少ConnectionQuotas中記錄的連線數 下main看一下幾個常用的方法:

def shutdown(): Unit = {
	//標誌為關閉狀態
    alive.set(false)
    //喚醒當前執行緒
    wakeup()
    //阻塞到shutdown完成
    shutdownLatch.await()
  }
//阻塞等待啟動完成
def awaitStartup(): Unit = startupLatch.await
//啟動完成,喚醒執行緒
protected def startupComplete() = {
    startupLatch.countDown()
  }

//shutdown完成,喚醒執行緒

protected def shutdownComplete() = shutdownLatch.countDown()
//關閉連線
def close(selector: KSelector, connectionId: String) {
	//獲取到當前要關閉的channel
    val channel = selector.channel(connectionId)
    if (channel != null) {
      debug(s"Closing selector connection $connectionId")
      val address = channel.socketAddress
      //修改連線數
      if (address != null)
        connectionQuotas.dec(address)
       //關閉連線
      selector.close(connectionId)
    }
  }

Acceptor的作用是用來接收連線請求,建立Socket併為它分配一個Processor

  //建立NIOSelector
  private val nioSelector = NSelector.open()
  //建立一個ServerSocketChannel
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)

  this.synchronized {
  	//為沒個Processor建立並啟動一個執行緒
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start()
    }
  }

下面看一下Acceptor的核心邏輯run方法:

def run() {
	//將當前ServerSocketChannel註冊到selector上,對OP_ACCEPT事件感興趣
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    //標識啟動操作已經完成
    startupComplete()
    try {
      var currentProcessor = 0
      //檢測執行狀態
      while (isRunning) {
        try {
        	//阻塞500ms輪詢io事件
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            //遍歷輪詢到的io事件
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally {
      debug("Closing server socket and selector.")
      swallowError(serverChannel.close())
      swallowError(nioSelector.close())
      shutdownComplete()
    }
  }

在方法裡面首先實現了對OP_ACCEPT事件的監聽,然後在一個迴圈中不斷輪詢io事件和處理io事件,主要是對accpet事件的處理:

def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    //建立socketChannel
    val socketChannel = serverSocketChannel.accept()
    try {
    //增加連線數
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      //設定為非阻塞
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)

      debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))
		//將當前socketChannel交給processor處理
      processor.accept(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
        close(socketChannel)
    }
  }

Processor主要用於完成讀取請求和寫回響應的操作,Processor不參與具體邏輯的處理 newConnections:儲存了由此Processor處理的新建的SocketChannel inflightResponses:儲存未傳送的響應 selector:KSelector型別 requestChannel:Processor與Handler執行緒之間傳遞資料的佇列

def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

首先processor會將分配的socketchannel新增到自己的佇列中,然後喚醒執行緒來工作:

def wakeup = selector.wakeup()

這裡是KSelector,最終會呼叫NSelector來完成喚醒工作 下面我們先來看一下processor的run方法的整體流程: 1、首先呼叫startComplete方法,標識已經初始化完成 2、處理newConnections佇列中的新建的SocketChannel

private def configureNewConnections() {
    while (!newConnections.isEmpty) {
    	//從佇列中彈出一個未處理的socketchannel
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        //獲取channel上的一些資訊
        val localHost = channel.socket().getLocalAddress.getHostAddress
        val localPort = channel.socket().getLocalPort
        val remoteHost = channel.socket().getInetAddress.getHostAddress
        val remotePort = channel.socket().getPort
        val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
        //註冊到selector上
        selector.register(connectionId, channel)
      } catch {
        // We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
        // throwables will be caught in processor and logged as uncaught exceptions.
        case NonFatal(e) =>
          // need to close the channel here to avoid a socket leak.
          close(channel)
          error(s"Processor $id closed connection from ${channel.getRemoteAddress}", e)
      }
    }
  }
public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
		//註冊OP_READ事件
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        key.attach(channel);
        //新增到集合中
        this.channels.put(id, channel);
    }

3、獲取RequestChannel中對應的responseQueue佇列,並處理其中快取的Response。如果Response是SendAction型別,表示該Resposne需要傳送給客戶端,則查詢對應的KafkaChannel,為其註冊OP_WRITE事件,並將KafkaChannel.send欄位指向待發送的Response物件。同時還會將Response從responseQueue佇列移除,放入inflightResponses中。如果Response是NoOpAction型別,表示此連線暫無響應需要傳送,則為KafkaChannel註冊OP_READ,允許其繼續讀取請求。如果Response是CloseConnectionAction型別,則關閉對應的連線

private def processNewResponses() {
	//獲取當前processor對應的Response
    var curr = requestChannel.receiveResponse(id)
    while (curr != null) {
      try {
      //根據Response不同的模式來選擇不同的處理方法
        curr.responseAction match {
          case RequestChannel.NoOpAction =>
            curr.request.updateRequestMetrics
            trace("Socket server received empty response to send, registering for read: " + curr)
            selector.unmute(curr.request.connectionId)
          case RequestChannel.SendAction =>
            sendResponse(curr)
          case RequestChannel.CloseConnectionAction =>
            curr.request.updateRequestMetrics
            trace("Closing socket connection actively according to the response code.")
            close(selector, curr.request.connectionId)
        }
      } finally {
        curr = requestChannel.receiveResponse(id)
      }
    }
  }

4、呼叫SocketServer.poll方法讀取請求,傳送響應。poll方法底層呼叫的是KSelector.poll方法

private def poll() {
    try selector.poll(300)
    catch {
      case e @ (_: IllegalStateException | _: IOException) =>
        error(s"Closing processor $id due to illegal state or IO exception")
        swallow(closeAll())
        shutdownComplete()
        throw e
    }
  }

每次呼叫都會將讀取的請求、傳送成功的請求以及斷開的連線放入其completedReceives、completeSends、disconnected佇列中等待處理 5、呼叫processCompletedReceives方法處理KSelector.completedReceives佇列。

private def processCompletedReceives() {
	//遍歷這個佇列
    selector.completedReceives.asScala.foreach { receive =>
      try {
      	//獲取對應的KafkaChannel
        val channel = selector.channel(receive.source)
        //建立KafkaChannel對應的Session物件,與許可權控制相關
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
          channel.socketAddress)
        //將NetworkReceive、ProcessorId、身份認證資訊封裝成RequestChannel.Request物件
        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
        //將RequestChannel.Request放入RequestChannel.requestQueue佇列中等待處理
        requestChannel.sendRequest(req)
        //取消註冊的OP_READ事件,連線不再讀取資料
        selector.mute(receive.source)
      } catch {
        case e @ (_: InvalidRequestException | _: SchemaException) =>
          // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
          error(s"Closing socket for ${receive.source} because of error", e)
          close(selector, receive.source)
      }
    }
  }

6、呼叫processCompletedSends方法處理KSelector.completedSends佇列。首先inflightResponses中儲存的對應的Response刪除。之後,為對應連線重新註冊OP_READ事件,允許從該連線讀取資料

private def processCompletedSends() {
	//遍歷佇列
    selector.completedSends.asScala.foreach { send =>
    	//此相應已經發送出去,從佇列中刪除
      val resp = inflightResponses.remove(send.destination).getOrElse {
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
      }
      resp.request.updateRequestMetrics()
      //允許此連線讀取資料
      selector.unmute(send.destination)
    }
  }

7、呼叫processDisconnected方法處理KSelector.disconnected佇列。先從inflightResponses中刪除該連線對應的所有Response。然後,減少ConnectionQuotas中記錄的連線數。

private def processDisconnected() {
    selector.disconnected.asScala.foreach { connectionId =>
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
      }.remoteHost
      inflightResponses.remove(connectionId).foreach(_.request.updateRequestMetrics())
      // the channel has been closed by the selector but the quotas still need to be updated
      connectionQuotas.dec(InetAddress.getByName(remoteHost))
    }
  }

8、當呼叫SocketServer.shutdown關閉整個SocketServer時,將alive欄位設定為false。 下面我們來看具體的實現:

override def run() {
	//標識processor啟動完成
    startupComplete()
    while (isRunning) {
      try {
        // 處理新的連線
        configureNewConnections()
        // 處理RequestChannel中快取的響應
        processNewResponses()
        //讀取請求
        poll()
        //處理不同佇列中的請求
        processCompletedReceives()
        processCompletedSends()
        processDisconnected()
      } catch {
        case e: ControlThrowable => throw e
        case e: Throwable =>
          error("Processor got uncaught exception.", e)
      }
    }

    debug("Closing selector - processor " + id)
    swallowError(closeAll())
    shutdownComplete()
  }

RequestChannel Processor執行緒與Handler執行緒之間傳遞資料是通過RequestChannel完成的。在RequestChannel中包含了一個requestQueue佇列和多個responseQueues佇列,每個Processor執行緒對應一個responseQueue。Processor執行緒將讀取到的請求存入requestQueue中,Handler執行緒從requestQueue中,取出請求進行處理;Handler執行緒處理請求產生的響應會存放到Processor對應的responseQueue中,Processor執行緒會從對應的responseQueue中取出響應併發送給客戶端。 我們先來看一下RequestChannel的幾個核心欄位: requestQueue:Processor執行緒向Handler執行緒傳遞請求的佇列。 responseQueues:一個數組,Handler執行緒向Processor執行緒傳遞響應的佇列 numProcessors:Processor執行緒的數目 queueSize:快取請求的最大個數 responseListeners:監聽器列表

//向responseQueue佇列中新增SendAction型別的Response
def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    //呼叫監聽器
    for(onResponse <- responseListeners)
      onResponse(response.processor)
  }