1. 程式人生 > >【Spark】Spark 訊息通訊架構

【Spark】Spark 訊息通訊架構

本篇結構:

  • 前言
  • 幾個重要概念
  • Spark RpcEnv
  • Spark RpcEndpoint
  • Spark RpcEndpointRef
  • RpcEnv 和 RpcEndpoint 關係類圖
  • Dispatcher 和 Inbox
  • Outbox
  • 時序圖

一、前言

在 Spark 事件匯流排 一篇中有介紹元件內之間的通訊,那 Spark 叢集內,各元件之間又是怎麼通訊的? Spark 有內建的 RPC 框架(在 Spark 2.0.0 之前,是藉助 Akka 來實現的,雖然沒有具體用過,但社群評論都贊 Akka 是非常優秀的開源分散式框架。那 Spark 為什麼棄用了 Akka?主要原因是解決使用者的Spark Application 中 Akka 版本和 Spark 內建的 Akka版本衝突的問題。)用於元件之間通訊。

首先要說明,之前也沒有玩過 Netty,所以現階段的分析不會太底層,主要介紹 Spark 的 RPC 框架大致的樣子,至於 Spark 的 RPC 框架底層是怎麼用 Netty 的,不會在分析範圍內。

水平有限,慢慢成長吧。

二、幾個重要概念

  • RpcEnv:RPC 環境,每個 Rpc 端點執行時依賴的環境稱之為 RpcEnv。
  • RpcEndpoint:RPC 端點 ,Spark 將每個通訊實體都都稱之一個Rpc端點,且都實現 RpcEndpoint 介面,內部根據不同端點的需求,設計不同的訊息和不同的業務處理。
  • Dispatcher:訊息分發器,負責將 RpcMessage 分發至對應的 RpcEndpoint。Dispatcher 中包含一個 MessageLoop,它讀取 LinkedBlockingQueue 中的投遞 RpcMessage,根據客戶端指定的 Endpoint 標識,找到 Endpoint 的 Inbox,然後投遞進去,由於是阻塞佇列,當沒有訊息的時候自然阻塞,一旦有訊息,就開始工作。Dispatcher 的 ThreadPool 負責消費這些 Message。
  • Inbox:一個本地端點對應一個收件箱,Inbox 裡面有一個 InboxMessage 的連結串列,InboxMessage 有很多子類,可以是遠端呼叫過來的 RpcMessage,可以是遠端呼叫過來的 fire-and-forget 的單向訊息 OneWayMessage,還可以是各種服務啟動,鏈路建立斷開等 Message,這些 Message 都會在 Inbox 內部的方法內做模式匹配,呼叫相應的 RpcEndpoint 的函式。
  • Outbox:一個遠端端點對應一個發件箱,NettyRpcEnv 中包含一個 ConcurrentHashMap[RpcAddress, Outbox]。當訊息放入 Outbox 後,緊接著將訊息通過 TransportClient 傳送出去。

三、Spark RpcEnv

編寫 Spark 程式,都是從 SparkContext 開始,在 SparkContext 初始化時,會創通過 createSparkEnv 方法建立 SparkEnv。在建立 SparkEnv 的過程中,有一步是建立 RpcEnv。

val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
  securityManager, clientMode = !isDriver)

可以看到 Spark 的通訊是從 RpcEnv 開始的。

3.1、RpcEnv 的建立

RpcEnv,顧名思義,RPC environment。它是一個抽象類,定義了 Rpc 框架啟動、停止和關閉等抽象方法。在其伴生物件 RpcEnv 中,定義了兩個 create 方法:

  def create(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean = false): RpcEnv = {
    create(name, host, host, port, conf, securityManager, clientMode)
  }

  def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean): RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      clientMode)
    new NettyRpcEnvFactory().create(config)
  }

create() 方法的重心就是通過工廠方法由 RpcEnvFactory 建立 RpcEnv。

RpcEnv 在 Spark 中有一個實現 NettyRpcEnv,RpcEnvFactory 也有一個實現 NettyRpcEnvFactory。典型的工廠模式。

3.2、RpcEnv 的啟動

來到 NettyRpcEnvFactory 的 create 方法:

private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {

  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
    // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }
}

NettyRpcEnvFactory.create 方法一旦呼叫就會立即在 bind 的 address 和 port 上啟動 server。

這裡的 clientMode 引數預設是 false,追蹤呼叫的話在 SparkEnv 中有 clientMode = !isDriver,具體是什麼意思,原始碼中也沒有註釋,暫時不知怎麼回事。

總之預設是會進入 nettyEnv.startServer(config.bindAddress, actualPort) 該方法:

  def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
    server = transportContext.createServer(bindAddress, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

3.3、setupEndpoint

RpcEnv 中要重點關注 setupEndpoint 方法,該方法將 RpcEndpoint 註冊到 dispatcher 中,註冊時必須指定名稱,客戶端路由就靠這個名稱來找 endpoint。

  /**
   * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not
   * guarantee thread-safety.
   */
  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

Endpoint 是什麼?下面介紹。

四、Spark RpcEndPoint

有通訊的總體框架,還應該有通訊的實體,在 Spark 中,RpcEndpoint 是對所有通訊實體的抽象。RpcEndpoint 是一個特徵,其中定義了一些函式,這些函式在收到訊息後被觸發。

private[spark] trait RpcEndpoint {

  /**
   * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
   */
  val rpcEnv: RpcEnv

  /**
   * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
   * called. And `self` will become `null` when `onStop` is called.
   *
   * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
   * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
   */
  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

  /**
   * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
   * unmatched message, [[SparkException]] will be thrown and sent to `onError`.
   */
  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

  /**
   * Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
   * [[SparkException]] will be thrown and sent to `onError`.
   */
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }

  /**
   * Invoked when any exception is thrown during handling messages.
   */
  def onError(cause: Throwable): Unit = {
    // By default, throw e and let RpcEnv handle it
    throw cause
  }

  /**
   * Invoked when `remoteAddress` is connected to the current node.
   */
  def onConnected(remoteAddress: RpcAddress): Unit = {
    // By default, do nothing.
  }

  /**
   * Invoked when `remoteAddress` is lost.
   */
  def onDisconnected(remoteAddress: RpcAddress): Unit = {
    // By default, do nothing.
  }

  /**
   * Invoked when some network error happens in the connection between the current node and
   * `remoteAddress`.
   */
  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
    // By default, do nothing.
  }

  /**
   * Invoked before [[RpcEndpoint]] starts to handle any message.
   */
  def onStart(): Unit = {
    // By default, do nothing.
  }

  /**
   * Invoked when [[RpcEndpoint]] is stopping. `self` will be `null` in this method and you cannot
   * use it to send or ask messages.
   */
  def onStop(): Unit = {
    // By default, do nothing.
  }

  /**
   * A convenient method to stop [[RpcEndpoint]].
   */
  final def stop(): Unit = {
    val _self = self
    if (_self != null) {
      rpcEnv.stop(_self)
    }
  }
}

4.1、RpcEndpoint 方法

  • receive:接收訊息並處理,但不回覆
  • receiveAndReply:接收訊息處理後,並給客戶端回覆
  • onError:發生異常時,呼叫
  • onConnected:當客戶端與當前節點連線上後呼叫
  • onDisconnected:當客戶端與當前節點失去連線上後呼叫
  • onNetworkError:當網路連線發生錯誤進行處理
  • onStart:在 RpcEndpoint 處理訊息前呼叫,可以在 RpcEndpoint 正式工作前做一些準備工作
  • onStop:在停止 RpcEndpoint 前呼叫,可以在 RpcEndpoint 結束前做一些收尾工作

4.2、RpcEndpoint 繼承體系


如圖:Master、Worker 等都是 RpcEndpoint,至於每個類具體是什麼,這一堆就不一一列舉具體是什麼,有什麼用。等用到了再做介紹(笑哭,其實我也不知道具體是什麼呀,後面繼續學習)。

五、Spark RpcEndpointRef

Spark RpcEndpointRef 是對 RpcEndpoint 的引用,要向遠端的一個 RpcEndpoint 發起請求,必須拿到其引用 RpcEndpointRef。

RpcEndpointRef 指定了 ip 和 port,是一個類似 spark://host:port/name 這種的地址。

看看其具體類:

private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {

  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

  /**
   * return the address for the [[RpcEndpointRef]]
   */
  def address: RpcAddress

  def name: String

  /**
   * Sends a one-way asynchronous message. Fire-and-forget semantics.
   */
  def send(message: Any): Unit

  /**
   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
   * receive the reply within the specified timeout.
   *
   * This method only sends the message once and never retries.
   */
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  /**
   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
   * receive the reply within a default timeout.
   *
   * This method only sends the message once and never retries.
   */
  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

  /**
   * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
   * timeout, or throw a SparkException if this fails even after the default number of retries.
   * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
   * method retries, the message handling in the receiver side should be idempotent.
   *
   * Note: this is a blocking action which may cost a lot of time,  so don't call it in a message
   * loop of [[RpcEndpoint]].
   *
   * @param message the message to send
   * @tparam T type of the reply message
   * @return the reply message from the corresponding [[RpcEndpoint]]
   */
  def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)

  /**
   * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
   * specified timeout, throw a SparkException if this fails even after the specified number of
   * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
   * retries, the message handling in the receiver side should be idempotent.
   *
   * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
   * loop of [[RpcEndpoint]].
   *
   * @param message the message to send
   * @param timeout the timeout duration
   * @tparam T type of the reply message
   * @return the reply message from the corresponding [[RpcEndpoint]]
   */
  def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    // TODO: Consider removing multiple attempts
    var attempts = 0
    var lastException: Exception = null
    while (attempts < maxRetries) {
      attempts += 1
      try {
        val future = ask[T](message, timeout)
        val result = timeout.awaitResult(future)
        if (result == null) {
          throw new SparkException("RpcEndpoint returned null")
        }
        return result
      } catch {
        case ie: InterruptedException => throw ie
        case e: Exception =>
          lastException = e
          logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
      }

      if (attempts < maxRetries) {
        Thread.sleep(retryWaitMs)
      }
    }

    throw new SparkException(
      s"Error sending message [message = $message]", lastException)
  }

}

其主要方法:

  • address:返回對應的地址
  • send:單向非同步傳送,傳送後就忘記,不會有狀態記錄,也不期望回覆
  • ask[T: ClassTag](message: Any):以預設的超時時間,呼叫 ask[T: ClassTag](message: Any, timeout: RpcTimeout),並得到一個 T 型別的 reply
  • askWithRetry[T: ClassTag](message: Any):相比 ask[T: ClassTag](message: Any),多了一個重試次數

六、RpcEnv 和 RpcEndpoint 關係類圖

對於服務端來說,RpcEnv 是 RpcEndpoint 的執行環境,負責 Endpoint 的整個生命週期管理,它可以註冊或Endpoint,解析 TCP 層的資料包並反序列化,封裝成 RpcMessage,並且路由請求到指定的 Endpoint,呼叫業務邏輯程式碼,如果 Endpoint 需要響應,把返回的物件序列化後通過 TCP 層再傳輸到遠端對端,如果 Endpoint 發生異常,那麼呼叫 RpcCallContext.sendFailure 來把異常傳送回去。

對客戶端來說,通過 RpcEnv 可以獲取 RpcEndpoint 引用,也就是 RpcEndpointRef 的。

RpcEnv 的建立由 RpcEnvFactory 負責,RpcEnvFactory 目前只有一個子類是 NettyRpcEnvFactory。NettyRpcEnvFactory.create 方法一旦呼叫就會立即在 bind 的 address 和 port 上啟動 server。

NettyRpcEnv 由 NettyRpcEnvFactory.create 建立,這是整個Spark core 和 org.apache.spark.spark-network-common 的橋樑。其中核心方法 setupEndpoint 會在 Dispatcher 中註冊 Endpoint,setupEndpointRef 會先去呼叫 RpcEndpointVerifier 嘗試驗證本地或者遠端是否存在某個 endpoint,然後再建立 RpcEndpointRef。

七、Dispatcher 和 Inbox

NettyRpcEnv 中包含 Dispatcher,主要針對服務端,幫助路由到正確的 RpcEndpoint,並且呼叫其業務邏輯。

7.1、Dispatcher 中重要的屬性

  private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    val inbox = new Inbox(ref, endpoint)
  }

  private val endpoints: ConcurrentMap[String, EndpointData] =
    new ConcurrentHashMap[String, EndpointData]
  private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
    new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

  // Track the receivers whose inboxes may contain messages.
  private val receivers = new LinkedBlockingQueue[EndpointData]
  • 類EndpointData:是一個包含 name/RpcEndpoint/RpcEndpointRef/Inbox,這樣每個 RpcEndpoint 都有一個 Inbox,用於存放 InboxMessage。
  • endpoints:ConcurrentHashMap,維繫所有的 name -> EndpointData 的對映。
  • endpointRefs:ConcurrentMap,維繫所有 RpcEndpoint -> RpcEndpointRef 的對映。
  • receivers:LinkedBlockingQueue,維繫所有的 EndpointData。

7.2、Dispatcher 排程原理

  private val threadpool: ThreadPoolExecutor = {
    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
      math.max(2, Runtime.getRuntime.availableProcessors()))
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    for (i <- 0 until numThreads) {
      pool.execute(new MessageLoop)
    }
    pool
  }

  /** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            val data = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

在構建 Dispatcher 時會建立一個執行緒池,執行緒池數量為 spark.rpc.netty.dispatcher.numThreads 設定的值或者是 math.max(2, Runtime.getRuntime.availableProcessors())。

該執行緒池迴圈從 receivers 阻塞佇列取出 EndpointData 處理,如果 receivers 中沒有 EndpointData,就阻塞。有 EndpointData 就從該 EndpointData 的 Inbox 中取出訊息進行消費。

至於 Inbox 中是如何處理的,篇幅有限,就不列了。

7.3、Inbox 的訊息源

MessageLoop 執行緒不斷消費各個 EndpointData 中 Inbox 中的訊息,那這些訊息是怎麼來的?

  • (1) registerRpcEndpoint:註冊 RpcEndpoint
  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      val data = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

註冊 Endpoint 時會將 EndpointData 放入 receivers,每次 new EndpointData 時都會建立一個與之對應的 Inbox,在 Inbox 中會將 OnStart 訊息加入其 messages 列表,此時 MessageLoop 執行緒就會消費該訊息。

  // OnStart should be the first message to process
  inbox.synchronized {
    messages.add(OnStart)
  }
  • (2) unregisterRpcEndpoint:解註冊 RpcEndpoint
  // Should be idempotent
  private def unregisterRpcEndpoint(name: String): Unit = {
    val data = endpoints.remove(name)
    if (data != null) {
      data.inbox.stop()
      receivers.offer(data)  // for the OnStop message
    }
    // Don't clean `endpointRefs` here because it's possible that some messages are being processed
    // now and they can use `getRpcEndpointRef`. So `endpointRefs` will be cleaned in Inbox via
    // `removeRpcEndpointRef`.
  }

解註冊時將 EndpointData 從 endpoints 中移除,之後,inbox 往 messages 傳送 OnStop 訊息,receivers.offer(data) 後 MessageLoop 執行緒進行處理。

  def stop(): Unit = inbox.synchronized {
    // The following codes should be in `synchronized` so that we can make sure "OnStop" is the last
    // message
    if (!stopped) {
      // We should disable concurrent here. Then when RpcEndpoint.onStop is called, it's the only
      // thread that is processing messages. So `RpcEndpoint.onStop` can release its resources
      // safely.
      enableConcurrent = false
      stopped = true
      messages.add(OnStop)
      // Note: The concurrent events in messages will be processed one by one.
    }
  }
  • (3) postMessage:將訊息提交給指定的 RpcEndpoint
  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

傳遞訊息時首先會將訊息投遞到該 Endpoint 對應的 Inbox 內,然後 再 receivers.offer(data),MessageLoop 消費訊息。

  def post(message: InboxMessage): Unit = inbox.synchronized {
    if (stopped) {
      // We already put "OnStop" into "messages", so we should drop further messages
      onDrop(message)
    } else {
      messages.add(message)
      false
    }
  }
  • (4) stop:停止 Dispatcher
  def stop(): Unit = {
    synchronized {
      if (stopped) {
        return
      }
      stopped = true
    }
    // Stop all endpoints. This will queue all endpoints for processing by the message loops.
    endpoints.keySet().asScala.foreach(unregisterRpcEndpoint)
    // Enqueue a message that tells the message loops to stop.
    receivers.offer(PoisonPill)
    threadpool.shutdown()
  }

stop 會呼叫 unregisterRpcEndpoint 方法,走該方法的邏輯,然後會向 receivers 中投遞 PoisonPill 毒藥,全都是 null,使 Message 執行緒停止(這段邏輯可回到 MessageLoop 檢視),然後關閉執行緒池。

7.4、Dispatcher 和 Inbox 請求流程圖

八、Outbox

在 NettyRpcEnv 中有一個 outboxes 欄位:

  /**
   * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
   * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
   */
  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

快取了所有 遠端 RpcAddress -> Outbox 的對映,也即是每個遠端對應一個 Outbox。

當一個 Endpoint 向遠端 Endpoint 傳送訊息時,會來到 postToOutbox 方法:

  private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
    if (receiver.client != null) {
      message.sendWith(receiver.client)
    } else {
      require(receiver.address != null,
        "Cannot send message to client endpoint with no listen address.")
      val targetOutbox = {
        val outbox = outboxes.get(receiver.address)
        if (outbox == null) {
          val newOutbox = new Outbox(this, receiver.address)
          val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
          if (oldOutbox == null) {
            newOutbox
          } else {
            oldOutbox
          }
        } else {
          outbox
        }
      }
      if (stopped.get) {
        // It's possible that we put `targetOutbox` after stopping. So we need to clean it.
        outboxes.remove(receiver.address)
        targetOutbox.stop()
      } else {
        targetOutbox.send(message)
      }
    }
  }

postToOutbox 方法中如果 client 為 null,會新建 Outbox,快取在 outboxes 中,然後呼叫 Outbox 的 sned 方法:

  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }

最後就是呼叫 TransportClient 傳送訊息。

九、時序圖

9.1、Endpoint 啟動時序圖

9.2、Endpoint send 和 ask 時序圖

9.3、Endpoint receive 時序圖