1. 程式人生 > >Spark core原始碼分析之spark叢集的啟動(二)

Spark core原始碼分析之spark叢集的啟動(二)

2.2 Worker的啟動

org.apache.spark.deploy.worker

1 從Worker的伴生物件的main方法進入

在main方法中首先是得到一個SparkConf例項conf,然後將conf和啟動Worker傳入的引數封裝得到WorkerArguments的例項args,下一步就是呼叫startSystemAndActor()方法得到actorSystem例項,在方法內部還建立並開啟屬於Worker的actor,程式碼如下:
main方法

private[spark] object Worker extends Logging {
  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val
conf = new SparkConf // 準備了一些啟動Worker服務的引數 val args = new WorkerArguments(argStrings, conf) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) actorSystem.awaitTermination() }

startSystemAndActor()方法,actorSystem.actorOf()如何建立並開啟一個actor在上一篇Master的啟動流程中有分析

 def startSystemAndActor(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

    // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
    val systemName = "sparkWorker"
+ workerNumber.map(_.toString).getOrElse("") val actorName = "Worker" val securityMgr = new SecurityManager(conf) // 建立ActorSystem例項,用於建立actor val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) // 獲得所有的masterAkkaUrl val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) // 成建立workerActor並啟動workerActor actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) }

2 建立並執行屬於Worker的actor

建立Worker actor例項時會先執行Worker類的主構造器中的程式碼,當執行actor.start()方法時會開始執行actor的生命週期方法
1)執行preStart()方法,在preStart()方法中執行registerWithMaster()實現向master傳送註冊資訊,registerWithMaster()方法中主要執行了tryRegisterAllMasters()方法和啟動一個定時器,定時向master 重複註冊,程式碼如下:
preStart()方法

//生命週期方法
  override def preStart() {
    assert(!registered)
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    logInfo("Spark home: " + sparkHome)
    createWorkDir()
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    shuffleService.startIfEnabled()
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()
//    向Master進行註冊
    registerWithMaster()

    metricsSystem.registerSource(workerSource)
    metricsSystem.start()
    // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  }

registerWithMaster()方法,詳細分析見程式碼註釋

//向Master註冊
  def registerWithMaster() {
    // DisassociatedEvent may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
// 使用模式匹配的作用:DisassociatedEvent可能會觸發多次,
// 而DisassociatedEvent對應的模式匹配中呼叫了registerWithMaster()方法,
// 因此這裡使用模式匹配,以實現當已經有未完成的註冊時不能試圖再去註冊
    registrationRetryTimer match {
      case None =>
        registered = false
//        嘗試向所有Master註冊,為什麼要向所有的Master註冊,因為可能高可用叢集
        tryRegisterAllMasters()
//        初始嘗試連線次數
        connectionAttemptCount = 0
//        給registrationRetryTimer賦值,當再次觸發registerWithMaster(),不會重複執行tryRegisterAllMasters()
        registrationRetryTimer = Some {
//開啟一個定時器重複註冊 ReregisterWithMaster,增加容錯性避免因為網路異常或master出錯導致的註冊失敗 
          context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
            INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
        }
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }

tryRegisterAllMasters()

//嘗試向所有的master傳送註冊
  private def tryRegisterAllMasters() {
    for (masterAkkaUrl <- masterAkkaUrls) {
      logInfo("Connecting to master " + masterAkkaUrl + "...")

      val actor = context.actorSelection(masterAkkaUrl)
//      向actor傳送訊息,這裡的actor是Master的actor
      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
    }
  }

2)執行receiveWithLogging方法,開始迴圈監聽等待接收訊息,其中與Worker啟動有關的訊息是RegisteredWorker和SendHeartbeat,下面將結合與Master的互動來分析這種訊息的作用

2.3 Master收到worker的註冊資訊

當Master接收到Worker傳送的註冊訊息RegisterWorker後,先判斷該worker是否已經註冊,如果已經註冊返回註冊失敗訊息RegisterWorkerFailed(“Duplicate worker ID”)給worker,否則將worker傳送過來的資訊封裝到WorkInfo物件中並儲存在記憶體和磁碟中,然後返回封裝了masterUrl和masterWebUiUrl資訊的註冊成功訊息RegisteredWorker(masterUrl, masterWebUiUrl)給worker,worker接收到RegisteredWorker訊息後,更新registered = true,執行changeMaster方法記錄有效的masterUrl資訊,然後啟動一個定時器,定時傳送訊息給自己以觸發向master傳送心跳的功能,程式碼如下:

  override def receiveWithLogging = {
//    Master向Worker傳送的MasterUrl,意味著註冊成功了
    case RegisteredWorker(masterUrl, masterWebUiUrl) =>
      logInfo("Successfully registered with master " + masterUrl)
      registered = true
//      把傳送過來的url進行更新
      changeMaster(masterUrl, masterWebUiUrl)
//      啟動一個定時器向Master傳送心跳
      // 這裡的實現過程是,向自己傳送一個SendHeartbeat訊息,
      // 自己的receiveWithLogging接收到該訊息觸發對應的操作,
      // 也就是先判斷是否和master還有連線,如果仍然連線就向master傳送心跳資訊
      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
      if (CLEANUP_ENABLED) {
        logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
        context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
          CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
      }

    case SendHeartbeat =>
//      如果已經連線,向Master傳送心跳資訊
      if (connected) { master ! Heartbeat(workerId) }