1. 程式人生 > >Spark通訊機制:1)Spark1.3 vs Spark1.6原始碼分析

Spark通訊機制:1)Spark1.3 vs Spark1.6原始碼分析

前一段時間看了Spark1.3的原始碼,其RPC通訊機制是基於Akka的,但是在Spark1.6中,提供了2種實現方式:Netty(預設)、Akka
下面對比Spark1.3的Akka通訊機制,看下Spark1.6中Master是如何與Worker進行通訊。

首先看下Spark1.6中的Master類

private[deploy] class Master( //v1.6
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable
private[spark] class Master( //v1.3
    host: String,
    port: Int,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends Actor with ActorLogReceive with Logging with LeaderElectable
Spark1.3中Master直接繼承Akka.Actor,而Spark1.6中Maste繼承了特質ThreadSafeRpcEndpoint,看下特質ThreadSafeRpcEndpoint
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint //v1.6
ThreadSafeRpcEndpoint又繼承特質RpcEndpoint,RpcEndpoint演算法比較重要,其部分程式碼如下
private[spark] trait RpcEndpoint { //v1.6
  //只保留我們關心的程式碼
  /**
   * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
   */
  val rpcEnv: RpcEnv

  /**
   * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
   * called. And `self` will become `null` when `onStop` is called.
   *
   * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
   * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
   */
  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

  /**
   * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
   * unmatched message, [[SparkException]] will be thrown and sent to `onError`.
   */
  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

  /**
   * Invoked before [[RpcEndpoint]] starts to handle any message.
   */
  def onStart(): Unit = {
    // By default, do nothing.
  }
}

RpcEndpoint 裡面有個關鍵的成員RpcEnv,RpcEndpoint 需要向RpcEnv註冊一個名字來收發訊息,這個特質後面會詳細介紹。RpcEndpoint 裡的方法其是為例相容akka和netty設計的,其可以類比Akka.Acotr:

preStart  <---->  onStart  類啟動時呼叫

receive或receiveAndReply <----> receiveWithLogging  接收訊息使用

再回到Master的main方法中

  def main(argStrings: Array[String]) {  //v1.6
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }
其呼叫了startRpcEnvAndEndpoint建立了一個rpcEnv的例項,下面看下Master中的startRpcEnvAndEndpoint方法
  /**
   * Start the Master and return a three tuple of:
   *   (1) The Master RpcEnv
   *   (2) The web UI bound port
   *   (3) The REST server bound port, if any
   */
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
}
其呼叫了RpcEnv.create建立了一個rpcEnv,接著去RpcEnv.create
  def create( //v1.6
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean = false): RpcEnv = {
    // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
    val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
    getRpcEnvFactory(conf).create(config)
  }
}
首先它getRpcEnvFactory通過讀取配置檔案獲得RpcEnvFactory來建立rpcEnv,getRpcEnvFactory方法程式碼:
  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
    val rpcEnvNames = Map(
      "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
      "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
    val rpcEnvName = conf.get("spark.rpc", "netty")
    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
    Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
  }
可以看到這裡的工廠有2種實現:akka和netty,可以在配置檔案中指導使用哪種通訊機制,最後使用反射的機制建立了RpcEnvFactory(事實上,是其子類)。接下來看下這個RpcEnvFactory工廠
private[spark] trait RpcEnvFactory {

  def create(config: RpcEnvConfig): RpcEnv
}
其設計很簡單隻有一個方法,下面分別看下它的2個實現類AkkaRpcEnvFactoryNettyRpcEnvFactory 
private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory {

  def create(config: RpcEnvConfig): RpcEnv = {
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
      config.name, config.host, config.port, config.conf, config.securityManager)
    actorSystem.actorOf(Props(classOf[ErrorMonitor]), "ErrorMonitor")
    new AkkaRpcEnv(actorSystem, config.securityManager, config.conf, boundPort)
  }
}
private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {

  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }
}
在抽象層的基礎上,相比較Spark1.3的Akka.Actor,看看Spark1.6中Master是如何與Worker通訊的

在Spark1.3中Worker的presStart方法最終呼叫了tryRegisterAllMasters向Master註冊,其程式碼如下

private def tryRegisterAllMasters() { //v1.3
	for (masterAkkaUrl <- masterAkkaUrls) {
	  logInfo("Connecting to master " + masterAkkaUrl + "...")
	  val actor = context.actorSelection(masterAkkaUrl)
	  actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
	}
}
在Master端,Master在receiveWithLogging方法中接收到了Worker的註冊訊息,並給Worker返回了一個註冊成功的反饋
 override def receiveWithLogging = {//v1.3
	 //只保留我們關心的程式碼......
    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
    {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
        sender, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
		  //通知worker註冊成功
          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
          schedule()
		}
    }
 }
在Spark1.6中Worker的onStart方法呼叫了registerWithMaster方法向Master進行註冊,具體看下registerWithMaster
   private def registerWithMaster() {//v1.6
    //只保留我們關心的程式碼......
    registrationRetryTimer match {
      case None =>
        registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }
裡面使用了一個執行緒池排程器來向可能的Master傳送註冊的訊息,這裡的self是一個方法
  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }
self用來獲取RPC通訊的例項物件,registerWithMaster中的send方法類似於Spark1.3中的 actor ! RegisterWorker
在Master中看下其是如何接受處理這個註冊請求的,具體在Master.receiveAndReply
//receive只處理訊息,而receiveAndReply處理訊息並反饋
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        context.reply(MasterInStandby)
      } else if (idToWorker.contains(id)) {
        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
		  //通知worker註冊成功
          context.reply(RegisteredWorker(self, masterWebUiUrl))
          schedule()
        } else {
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }
    }
	//.....
}

後話:Spark1.6只是把通訊機制進行了更高層次的抽象,核心的DAGScheduler和TaskSheduler並沒有本質上的改變。熟悉Spark1.3的原始碼依然可以看懂Spark1.6或者Spark2.0的原始碼