1. 程式人生 > >Spark原始碼分析之Master註冊機制原理

Spark原始碼分析之Master註冊機制原理

一 Worker向Master註冊


1.1 Worker啟動,呼叫registerWithMaster,向Master註冊

當worker啟動的時候,會呼叫registerWithMaster方法

# 註冊狀態置為false

# 嘗試向所有master註冊

# 後臺執行緒定時排程,傳送ReregisterWithMaster請求,如果之前已經註冊成功,則下一次來註冊,則啥也不做

private def registerWithMaster() {
  registrationRetryTimermatch {
    //如果沒有,說明還沒有註冊,然後會開始去註冊case None =>
      //

初始註冊狀態為false
     
registered
= false
     
// 嘗試向所有master註冊registerMasterFutures= tryRegisterAllMasters()
      // 連線嘗試次數設為0
     
connectionAttemptCount
= 0
     
// 後臺執行緒定時排程,傳送ReregisterWithMaster請求,如果之前已經註冊成功,則下一次來註冊,則啥也不做registrationRetryTimer= Some(forwordMessageScheduler.scheduleAtFixedRate(
        new
Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            Option(self).foreach(_.send(ReregisterWithMaster))
          }
        },
        INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
        INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
        TimeUnit.SECONDS))
    //
如果已經有 registrationRetryTimer,就啥都不做
case Some(_) =>
      logInfo("Not spawning another attempt to register with the master, sincethere is an" +
        " attemptscheduled already.")
  }
}

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  masterRpcAddresses.map { masterAddress =>
    registerMasterThreadPool.submit(new Runnable {
      override def run(): Unit = {
        try {
          logInfo("Connecting to master " + masterAddress + "...")
          // 構造master RpcEndpoint,用於向master傳送訊息或者請求val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
          // 向指定的master註冊registerWithMaster(masterEndpoint)
        } catch {
          case ie: InterruptedException => // Cancelled
          case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
        }
      }
    })
  }
}
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
  // master傳送RegisterWorker請求masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
    workerId, host, port, self, cores, memory, workerWebUiUrl))
    .onComplete {
      // 回撥成功,則呼叫handleRegisterResponse
      case Success(msg) =>
        Utils.tryLogNonFatalError {
          handleRegisterResponse(msg)
        }
      // 回撥失敗,則退出case Failure(e) =>
        logError(s"Cannot register with master: ${masterEndpoint.address}", e)
        System.exit(1)
    }(ThreadUtils.sameThread)
}

1.2 Master接受到Worker的RegisterWorker請求,則開始註冊worker

# 檢查worker是否已經註冊過,如果已經註冊過,返回註冊失敗的RegisterWorkerFailed訊息
# 檢查master所維護的worker節點中是否有DEAD狀態的worker,如果有則移除這些worker
# 檢查RpcAddress->Worker的對映是否包含這個RpcAddress,如果包含檢查狀態是否是為UNKNOWN狀態,如果是則移除
# 把這個worker新增到Master所維護的與worker相關列表或者集合中
# 然後向Worker傳送RegisteredWorker訊息,表示註冊已成功
# 重新呼叫schedule方法,開始進行排程,讓worker開始幹活
// 如果當前節點狀態是standby,返回MasterInStandby
if (state == RecoveryState.STANDBY) {
  context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
  // 判斷維護的workerid->WorkerInfo對映是否包含這個worker id
  // 如果包含返回wokerid,則返回 worker id重複的RegisterWorkerFailed
  context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {// 表示當前節點為master,且要註冊是worker id之前是不存在的
  // 建立worker,並進行註冊,註冊成功並且返回RegisteredWorker請求,然後開始排程
  // 否則返回RegisterWorkerFailed請求,worker註冊失敗val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
    workerRef, workerWebUiUrl)
  if (registerWorker(worker)) {
    persistenceEngine.addWorker(worker)
    context.reply(RegisteredWorker(self, masterWebUiUrl))
    schedule()
  } else {
    val workerAddress = worker.endpoint.address
    logWarning("Worker registration failed. Attempted to re-register worker at same " +
      "address: " + workerAddress)
    context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
      + workerAddress))
  }
}

1.3Worker收到Master返回的註冊結果,呼叫handleRegisterResponse處理結果

# 如果接收RegisteredWorker訊息,則更新註冊狀態;後臺執行緒開始定時排程向master傳送心跳的執行緒;向master傳送WorkerLatestState請求,獲取worker最近狀態;

# 如果接收RegisterWorkerFailed訊息,則退出

private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
  msg match {
    // 如果是RegisteredWorker請求,表示已經註冊成功case RegisteredWorker(masterRef, masterWebUiUrl) =>
      logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
      registered = true // 更新registered狀態changeMaster(masterRef, masterWebUiUrl)
      // 後臺執行緒開始定時排程向master傳送心跳的執行緒forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          self.send(SendHeartbeat)
        }
      }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
      // 如果啟用了cleanup功能,後臺執行緒開始定時排程傳送WorkDirCleanup指令,清理目錄if (CLEANUP_ENABLED) {
        logInfo(
          s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(WorkDirCleanup)
          }
        }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
      }
      // 根據worker所持有的executor構造ExecutorDescription物件,描述該executor
      val execs = executors.values.map { e =>
        new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
      }
      // master傳送WorkerLatestState請求,獲取worker最近狀態masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
    // 如果是RegisterWorkerFailed請求,表示註冊失敗case RegisterWorkerFailed(message) =>
      // 如果還沒有註冊成功,則退出if (!registered) {
        logError("Worker registration failed: " + message)
        System.exit(1)
      }
    // 如果是MasterInStandby請求,則啥也不做case MasterInStandby =>
      // Ignore. Master not yet ready.
  }
}

二Driver向Master註冊

在用spark-submit提交應用程式的時候,會呼叫SparkSubmit這個類,SparkSubmit會呼叫prepareSubmitEnvironment準備提交環境,在這個時候會設定叢集管理者即Clsuter Manager;然後根據部署模式是standalone叢集模式,且不是使用rest方式提交,則會初始化org.apache.spark.deploy.Client這個類,並且給定launch引數

2.1 客戶端向Master發起提交driver的請求

Client在啟動的時候會呼叫onstart方法,然後根據給定指令時launch還是kill傳送對應的訊息。

如果是launch:

則最終會呼叫ayncSendToMasterAndForwardReply向master傳送RequestSubmitDriver訊息

如果是kill:

則最終會呼叫ayncSendToMasterAndForwardReply向master傳送RequestKillDriver訊息

driverArgs.cmd match {
    case "launch" =>val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

      val classPathConf = "spark.driver.extraClassPath"
      val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
        cp.split(java.io.File.pathSeparator)
      }

      val libraryPathConf = "spark.driver.extraLibraryPath"
      val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
        cp.split(java.io.File.pathSeparator)
      }

      val extraJavaOptsConf = "spark.driver.extraJavaOptions"
      val extraJavaOpts = sys.props.get(extraJavaOptsConf)
        .map(Utils.splitCommandString).getOrElse(Seq.empty)
      val sparkJavaOpts = Utils.sparkJavaOpts(conf)
      val javaOpts = sparkJavaOpts ++ extraJavaOpts
      val command = new Command(mainClass,
        Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
        sys.env, classPathEntries, libraryPathEntries, javaOpts)

      val driverDescription = new DriverDescription(
        driverArgs.jarUrl,
        driverArgs.memory,
        driverArgs.cores,
        driverArgs.supervise,
        command)
      ayncSendToMasterAndForwardReply[SubmitDriverResponse](
        RequestSubmitDriver(driverDescription))

    case "kill" =>
      val driverId = driverArgs.driverId
      ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
  }
}

2.2Master接收客戶端的RequestSubmitDriver訊息,開始註冊driver

# 建立driver

# 持久化引擎新增driver

# 將driver新增到master所維護的driver相關集合或者列表中

# 呼叫schedule開始排程資源

# 向Client傳送SubmitDriverResponse訊息

case RequestSubmitDriver(description) =>
  // 如果master不是active,返回錯誤if (state != RecoveryState.ALIVE) {
    val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
      "Can only accept driver submissions in ALIVE state."
    context.reply(SubmitDriverResponse(self, false, None, msg))
  } else {
    logInfo("Driver submitted " + description.command.mainClass)
    // 建立driver
    val driver = createDriver(description)
    // 持久化引擎新增drriver
    persistenceEngine.addDriver(driver)
    // drivers集合和waitingDrivers集合新增driver
    waitingDrivers += driver
    drivers.add(driver)
    schedule()// 開始排程
    // 返回成功的請求訊息context.reply(SubmitDriverResponse(self, true, Some(driver.id),
      s"Driver successfully submitted as ${driver.id}"))
  }

三Application向Master註冊


3.1 構建StandaloneAppClient,然後向Master註冊應用程式

在Standalone模式下,Driver是通過StandaloneSchedulerBackend來和Master進行資源請求協商的.

# SparkContext在初始化的時候會呼叫createTaskScheduler方法建立TaskSchedulerImpl和StandaloneSchedulerBackend

# 呼叫TaskSchedulerImpl的start方法啟動TaskScheduler

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
_taskScheduler.start()

# 啟動TaskScheduler的時候,首先就會啟動StandaloneSchedulerBackend

override def start() {
  backend.start()

  if (!isLocal && conf.getBoolean("spark.speculation", false)) {
    logInfo("Starting speculative execution thread")
    speculationScheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
        checkSpeculatableTasks()
      }
    }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  }
}

# 啟動StandaloneSchedulerBackend就會建立StandaloneAppClient,並且啟動它

override def start() {

// ……省略

val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
  appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()

// ……省略

}

# 啟動StandaloneAppClient的時候,會構建通訊環境, 會註冊一個ClientEndpoint用於通訊,然後呼叫ClientEndpoint的onstart方法

def start() {
  // Just launch an rpcEndpoint; it will call back into the listener.
  endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}

# onstart方法會呼叫registerWithMaster方法,然後呼叫tryRegisterAllMasters方法向所有master傳送RegisterApplication訊息,註冊應用程式application

override def onStart(): Unit = {
  try {
    registerWithMaster(1)
  } catch {
    case e: Exception =>
      logWarning("Failed to connect to master", e)
      markDisconnected()
      stop()
  }
}
private def registerWithMaster(nthRetry: Int) {
  registerMasterFutures.set(tryRegisterAllMasters())
  registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
    override def run(): Unit = {
      if (registered.get) {
        registerMasterFutures.get.foreach(_.cancel(true))
        registerMasterThreadPool.shutdownNow()
      } else if (nthRetry >= REGISTRATION_RETRIES) {
        markDead("All masters are unresponsive! Giving up.")
      } else {
        registerMasterFutures.get.foreach(_.cancel(true))
        registerWithMaster(nthRetry + 1)
      }
    }
  }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  for (masterAddress <- masterRpcAddresses) yield {
    registerMasterThreadPool.submit(new Runnable {
      override def run(): Unit = try {
        if (registered.get) {
          return
        }
        logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
        val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
        masterRef.send(RegisterApplication(appDescription, self))
      } catch {
        case ie: InterruptedException => // Cancelled
        case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
      }
    })
  }
}

3.2Master開始註冊應用程式

# 建立應用程式

# 如果該應用程式已經註冊過,則直接返回

# 把該應用程式註冊到master,即新增到master所維護與application相關集合或者列表,放入等待佇列

# 持久化引擎新增該應用程式

# 向master傳送RegisteredApplication請求,表示註冊已完成

# 呼叫schedule方法,開始排程

case RegisterApplication(description, driver) =>
  // 其他的非leadermaster是不能進行應用程式的建立和註冊if (state == RecoveryState.STANDBY) {
    // ignore, don't send response
  } else {
    logInfo("Registering app " + description.name)
    // 建立應用程式和driver
    val app = createApplication(description, driver)
    // 註冊應用程式registerApplication(app)
    logInfo("Registered app " + description.name + " with ID " + app.id)
    // 持久化引擎新增該application
    persistenceEngine.addApplication(app)
    // master傳送RegisteredApplication請求,表示註冊已完成driver.send(RegisteredApplication(app.id, self))
    schedule()
  }
private def registerApplication(app: ApplicationInfo): Unit = {
  // 獲取appRpcAddress
  val appAddress = app.driver.address
  // 如果已經註冊過,則直接返回if (addressToApp.contains(appAddress)) {
    logInfo("Attempted to re-register application at same address: " + appAddress)
    return
  }

  applicationMetricsSystem.registerSource(app.appSource)
  apps += app // 新增這個appmaster所維護的application集合
  // 並且把app相關資料存放到對應application對映列表idToApp(app.id) = app
  endpointToApp(app.driver) = app
  addressToApp(appAddress) = app
  waitingApps += app
  if (reverseProxy) {
    webUi.addProxyTargets(app.id, app.desc.appUiUrl)
  }
}

3.3StandaloneAppClient接收到RegisteredApplication訊息

# 為應用程式設定id

# 註冊狀態設定為true

# 設定master

# StandaloneAppClientListener開始監聽應用程式

case RegisteredApplication(appId_, masterRef) =>
  appId.set(appId_)
  registered.set(true)
  master = Some(masterRef)
  listener.connected(appId.get)