1. 程式人生 > >Spark源代碼分析之六:Task調度(二)

Spark源代碼分析之六:Task調度(二)

oge 3.4 總結 utili filter 相關 .com ram 順序

話說在《Spark源代碼分析之五:Task調度(一)》一文中,我們對Task調度分析到了DriverEndpoint的makeOffers()方法。這種方法針對接收到的ReviveOffers事件進行處理。代碼例如以下:

// Make fake resource offers on all executors
    // 在全部的executors上提供假的資源(抽象的資源。也就是資源的對象信息,我是這麽理解的)
    private def makeOffers() {
      // Filter out executors under killing
      // 過濾掉under killing的executors
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      
      // 利用activeExecutors中executorData的executorHost、freeCores,構造workOffers,即資源
      val workOffers = activeExecutors.map { case (id, executorData) =>
        // 創建WorkerOffer對象
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      
      // 調用scheduler的resourceOffers()方法,分配資源,並調用launchTasks()方法,啟動tasks
      // 這個scheduler就是TaskSchedulerImpl
      launchTasks(scheduler.resourceOffers(workOffers))
    }
代碼邏輯非常easy,一共分為三步:

第一。從executorDataMap中過濾掉under killing的executors,得到activeExecutors。

第二。利用activeExecutors中executorData的executorHost、freeCores,獲取workOffers。即資源。

第三,調用scheduler的resourceOffers()方法,分配資源,並調用launchTasks()方法,啟動tasks:這個scheduler就是TaskSchedulerImpl。

我們逐個進行分析。首先看看這個executorDataMap,其定義例如以下:

private val executorDataMap = new HashMap[String, ExecutorData]
它是CoarseGrainedSchedulerBackend掌握的集群中executor的數據集合,key為String類型的executorId,value為ExecutorData類型的executor具體信息。

ExecutorData包括的主要內容例如以下:

1、executorEndpoint:RpcEndpointRef類型。RPC終端的引用,用於數據通信。

2、executorAddress:RpcAddress類型。RPC地址。用於數據通信。

3、executorHost:String類型,executor的主機;

4、freeCores:Int類型,可用處理器cores。

5、totalCores:Int類型。處理器cores總數;

6、logUrlMap:Map[String, String]類型,日誌url映射集合。

這樣,通過executorDataMap這個集合我們就能知道集群當前executor的負載情況。方便資源分析並調度任務。那麽executorDataMap內的數據是何時及怎樣更新的呢?go on,繼續分析。
對於第一步中,過濾掉under killing的executors,事實上現是對executorDataMap中的全部executor調用executorIsAlive()方法中。推斷是否在executorsPendingToRemove和executorsPendingLossReason兩個數據結構中。這兩個數據結構中的executors。都是即將移除或者已丟失的executor。

第二步。在過濾掉已失效或者立即要失效的executor後,利用activeExecutors中executorData的executorHost、freeCores,構造workOffers,即資源。這個workOffers更簡單,是一個WorkerOffer對象,它代表了系統的可利用資源。

WorkerOffer代碼例如以下:

/**
 * Represents free resources available on an executor.
 */
private[spark]
case class WorkerOffer(executorId: String, host: String, cores: Int)
而最重要的第三步,先是調用scheduler.resourceOffers(workOffers),即TaskSchedulerImpl的resourceOffers()方法,然後再調用launchTasks()方法將tasks載入到executor上去運行。

我們先看下TaskSchedulerImpl的resourceOffers()方法。代碼例如以下:

/**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   *
   * 被集群manager調用以提供slaves上的資源。

我們通過依照優先順序詢問活動task集中的task來回應。

* 我們通過循環的方式將task調度到每一個節點上以便tasks在集群中能夠保持大致的均衡。 */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added // 標記每一個slave節點為alive活躍的,而且記住它的主機名 // 同一時候也追蹤是否有executor被加入 var newExecAvail = false // 循環offers。WorkerOffer為包括executorId、host、cores的結構體。代表集群中的可用executor資源 for (o <- offers) { // 利用HashMap存儲executorId->host映射的集合 executorIdToHost(o.executorId) = o.host // Number of tasks running on each executor // 每一個executor上執行的task的數目,這裏假設之前沒有的話,初始化為0 executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) // 每一個host上executors的集合 // 這個executorsByHost被用來計算host活躍性。反過來我們用它來決定在給定的主機上何時實現數據本地性 if (!executorsByHost.contains(o.host)) {// 假設executorsByHost中不存在相應的host // executorsByHost中加入一條記錄。key為host,value為new HashSet[String]() executorsByHost(o.host) = new HashSet[String]() // 發送一個ExecutorAdded事件。並由DAGScheduler的handleExecutorAdded()方法處理 // eventProcessLoop.post(ExecutorAdded(execId, host)) // 調用DAGScheduler的executorAdded()方法處理 executorAdded(o.executorId, o.host) // 新的slave加入時,標誌位newExecAvail設置為true newExecAvail = true } // 更新hostsByRack for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. // 隨機shuffle offers以避免總是把任務放在同一組workers上執行 val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. // 構造一個task列表,以分配到每一個worker val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) // 能夠使用的cpu資源 val availableCpus = shuffledOffers.map(o => o.cores).toArray // 獲得排序好的task集合 // 先調用Pool.getSortedTaskSetQueue()方法 // 還記得這個Pool嗎,就是調度器中的調度池啊 val sortedTaskSets = rootPool.getSortedTaskSetQueue // 循環每一個taskSet for (taskSet <- sortedTaskSets) { // 記錄日誌 logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) // 假設存在新的活躍的executor(新的slave節點被加入時) if (newExecAvail) { // 調用executorAdded()方法 taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false // 依照位置本地性規則調度每一個TaskSet,最大化實現任務的本地性 // 位置本地性規則的順序是:PROCESS_LOCAL(同進程)、NODE_LOCAL(同節點)、NO_PREF、RACK_LOCAL(同機架)、ANY(不論什麽) for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { // 調用resourceOfferSingleTaskSet()方法進行任務集調度 launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } // 設置標誌位hasLaunchedTask if (tasks.size > 0) { hasLaunchedTask = true } return tasks }

首先來看下它的主體流程。例如以下:

1、設置標誌位newExecAvail為false。這個標誌位是在新的slave被加入時被設置的一個標誌。以下在計算任務的本地性規則時會用到;

2、循環offers,WorkerOffer為包括executorId、host、cores的結構體。代表集群中的可用executor資源:

2.1、更新executorIdToHost,executorIdToHost為利用HashMap存儲executorId->host映射的集合;

2.2、更新executorIdToTaskCount,executorIdToTaskCount為每一個executor上執行的task的數目集合,這裏假設之前沒有的話,初始化為0;

2.3、假設新的slave增加:

2.3.1、executorsByHost中加入一條記錄,key為host。value為new HashSet[String]()。

2.3.2、發送一個ExecutorAdded事件。並由DAGScheduler的handleExecutorAdded()方法處理;

2.3.3、新的slave增加時,標誌位newExecAvail設置為true;

2.4、更新hostsByRack;

3、隨機shuffle offers(集群中可用executor資源)以避免總是把任務放在同一組workers上運行;

4、構造一個task列表。以分配到每一個worker,針對每一個executor依照其上的cores數目構造一個cores數目大小的ArrayBuffer,實現最大程度並行化;

5、獲取能夠使用的cpu資源availableCpus;

6、調用Pool.getSortedTaskSetQueue()方法獲得排序好的task集合,即sortedTaskSets;

7、循環sortedTaskSets中每一個taskSet:

7.1、假設存在新增加的slave,則調用taskSet的executorAdded()方法,動態調整位置策略級別,這麽做非常easy理解,新的slave節點增加了,那麽隨之而來的是數據有可能存在於它上面。那麽這時我們就須要又一次調整任務本地性規則;

8、循環sortedTaskSets,依照位置本地性規則調度每一個TaskSet,最大化實現任務的本地性:

8.1、對每一個taskSet。調用resourceOfferSingleTaskSet()方法進行任務集調度。

9、設置標誌位hasLaunchedTask。並返回tasks。

接下來。我們詳解下當中的每一個步驟。

第1步不用講,僅僅是設置標誌位newExecAvail為false。而且記住這個標誌位是在新的slave被加入時被設置的一個標誌,以下在計算任務的本地性規則時會用到;

第2步是集群中的可用executor資源offers的循環處理。更新一些數據結構,而且,在新的slave增加時。標誌位newExecAvail設置為true,而且發送一個ExecutorAdded事件,交由DAGScheduler的handleExecutorAdded()方法處理。我們來看下DAGScheduler的這種方法:

private[scheduler] def handleExecutorAdded(execId: String, host: String) {
    // remove from failedEpoch(execId) ?
    if (failedEpoch.contains(execId)) {
      logInfo("Host added was in lost list earlier: " + host)
      failedEpoch -= execId
    }
    submitWaitingStages()
  }
非常easy,先將相應host從failedEpoch中移除,failedEpoch存儲的是系統探測到的失效節點的集合,存儲的是execId->host的相應關系。接下來便是調用submitWaitingStages()方法提交等待的stages。

這種方法我們之前分析過。這裏不再贅述。可是存在一個疑點,之前stage都已提交了,這裏為什麽還要提交一遍呢?留待以後再尋找答案吧。

第3步隨機shuffle offers以避免總是把任務放在同一組workers上運行,這也沒什麽特別好講的,為了避免所謂的熱點問題而採取的一種隨機策略而已。

第4步也是。構造一個task列表。以分配到每一個worker,針對每一個executor,創建一個ArrayBuffer。存儲的類型為TaskDescription,大小為executor的cores。即最大程度並行化,充分利用executor的cores。

第5步就是獲取到上述executor集合中cores集合availableCpus,即能夠使用的cpu資源。

以下我們重點分析下第6步,它是調用Pool.getSortedTaskSetQueue()方法。獲得排序好的task集合。還記得這個Pool嗎?它就是上篇文章《Spark源代碼分析之五:Task調度(一)》裏講到的調度器的中的調度池啊,我們看下它的getSortedTaskSetQueue()方法。代碼例如以下:

override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    
    // 創建一個ArrayBuffer。存儲TaskSetManager
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    
    // schedulableQueue為Pool中的一個調度隊列。裏面存儲的是TaskSetManager
    // 在TaskScheduler的submitTasks()方法中,通過層層調用,終於通過Pool的addSchedulable()方法將之前生成的TaskSetManager增加到schedulableQueue中
    // 而TaskSetManager包括詳細的tasks
    // taskSetSchedulingAlgorithm為調度算法,包括FIFO和FAIR兩種
    // 這裏針對調度隊列,<span style="font-family: Arial, Helvetica, sans-serif;">依照調度算法對其排序,</span>生成一個序列sortedSchedulableQueue,
    val sortedSchedulableQueue =
      schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    
    // 循環sortedSchedulableQueue中全部的TaskSetManager,通過其getSortedTaskSetQueue來填充sortedTaskSetQueue
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    
    // 返回sortedTaskSetQueue
    sortedTaskSetQueue
  }
首先,創建一個ArrayBuffer,用來存儲TaskSetManager,然後。對Pool中已經存儲好的TaskSetManager,即schedulableQueue隊列,依照taskSetSchedulingAlgorithm調度規則或算法來排序,得到sortedSchedulableQueue,並循環其內的TaskSetManager。通過其getSortedTaskSetQueue()方法來填充sortedTaskSetQueue,最後返回。

TaskSetManager的getSortedTaskSetQueue()方法也非常easy,追加ArrayBuffer[TaskSetManager]就可以。例如以下:

override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
    sortedTaskSetQueue += this
    sortedTaskSetQueue
  }
我們著重來解說下這個調度準則或算法taskSetSchedulingAlgorithm。其定義例如以下:

// 調度準則。包含FAIR和FIFO兩種
  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }
它包含兩種。FAIR和FIFO。以下我們以FIFO為例來解說。

代碼在SchedulingAlgorithm.scala中。例如以下:

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  // 比較函數
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    
    // 先比較priority,即優先級
    // priority同樣的話,再比較stageId
    // 前者小於後者的話,返回true,否則為false
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}
非常easy,就是先比較兩個TaskSetManagerder的優先級priority。優先級同樣再比較stageId。而這個priority在TaskSet生成時。就是jobId,也就是FIFO是先依照Job的順序再依照Stage的順序進行順序調度。一個Job完了再調度還有一個Job,Job內是依照Stage的順序進行調度。關於priority生成的代碼例如以下所看到的:

// 利用taskScheduler.submitTasks()提交task
      // jobId即為TaskSet的priority
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

比較復雜的是FairSchedulingAlgorithm。代碼例如以下:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0

    // 前者的runningTasks<minShare而後者相反的的話,返回true。
    // runningTasks為正在執行的tasks數目,minShare為最小共享cores數。
    // 前面兩個if推斷的意思是兩個TaskSetManager中,假設當中一個正在執行的tasks數目小於最小共享cores數。則優先調度該TaskSetManager
    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {// 前者的runningTasks>=minShare而後者相反的的話。返回true
      return false
    } else if (s1Needy && s2Needy) {
      // 假設兩者的正在執行的tasks數目都比最小共享cores數小的話,再比較minShareRatio
      // minShareRatio為正在執行的tasks數目與最小共享cores數的比率
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      // 最後比較taskToWeightRatio,即權重使用率,weight代表調度池對資源獲取的權重,越大須要越多的資源
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}
它的調度邏輯主要例如以下:

1、優先看正在執行的tasks數目是否小於最小共享cores數,假設兩者僅僅有一個小於,則優先調度小於的那個。原因是既然正在執行的Tasks數目小於共享cores數,說明該節點資源比較充足,應該優先利用;

2、假設不是僅僅有一個的正在執行的tasks數目是否小於最小共享cores數的話,則再推斷正在執行的tasks數目與最小共享cores數的比率。

3、最後再比較權重使用率,即正在執行的tasks數目與該TaskSetManager的權重weight的比,weight代表調度池對資源獲取的權重,越大須要越多的資源。

到此為止,獲得了排序好的task集合,我們來到了第7步:假設存在新增加的slave。則調用taskSet的executorAdded()方法,即TaskSetManager的executorAdded()方法。代碼例如以下:

def executorAdded() {
    recomputeLocality()
  }

沒說的,繼續追蹤。看recomputeLocality()方法。代碼例如以下:

// 又一次計算位置
  def recomputeLocality() {
    // 首先獲取之前的位置Level
    // currentLocalityIndex為有效位置策略級別中的索引,默覺得0
    val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
    
    // 計算有效的位置Level
    myLocalityLevels = computeValidLocalityLevels()
    
    // 獲得位置策略級別的等待時間
    localityWaits = myLocalityLevels.map(getLocalityWait)
    
    // 設置當前使用的位置策略級別的索引
    currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
  }
首先說下這個currentLocalityIndex,它的定義為:
var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels
它是有效位置策略級別中的索引,指示當前的位置信息。也就是我們上一個task被launched所使用的Locality Level。

接下來看下myLocalityLevels,它是任務集TaskSet中應該使用哪種位置Level的數組,在TaskSetManager對象實例化時即被初始化。變量定義例如以下:

// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
  // 確定在我們的任務集TaskSet中應該使用哪種位置Level,以便我們做延遲調度
  var myLocalityLevels = computeValidLocalityLevels()
computeValidLocalityLevels()方法為計算該TaskSet使用的位置策略的方法,代碼例如以下:

/**
   * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
   * added to queues using addPendingTask.
   * 計算該TaskSet使用的位置策略。假設全部的任務已經通過addPendingTask()被加入入隊列
   */
  private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
    // 引入任務位置策略
    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
    
    // 創建ArrayBuffer類型的levels,存儲TaskLocality
    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
    
    // 假設pendingTasksForExecutor不為空,且PROCESS_LOCAL級別中TaskSetManager等待分配下一個任務的時間不為零,且
    // 假設pendingTasksForExecutor中每一個executorId在sched的executorIdToTaskCount中存在
    // executorIdToTaskCount為每一個executor上執行的task的數目集合
    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
      levels += PROCESS_LOCAL
    }
    
    // 假設pendingTasksForHost不為空。且NODE_LOCAL級別中TaskSetManager等待分配下一個任務的時間不為零,且
    // 假設pendingTasksForHost中每一個host在sched的executorsByHost中存在
    // executorsByHost為每一個host上executors的集合
    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
        pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
      levels += NODE_LOCAL
    }
    
    // 假設存在沒有位置信息的task。則加入NO_PREF級別
    if (!pendingTasksWithNoPrefs.isEmpty) {
      levels += NO_PREF
    }
    
    // 相同處理RACK_LOCAL級別
    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
        pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
      levels += RACK_LOCAL
    }
    
    // 最後加上一個ANY級別
    levels += ANY
    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
    
    // 返回
    levels.toArray
  }
這裏,我們先看下當中幾個比較重要的數據結構。在TaskSetManager中,存在例如以下幾個數據結構:

// 每一個executor上即將被運行的tasks的映射集合
  private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

// 每一個host上即將被運行的tasks的映射集合
  private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]

// 每一個rack上即將被運行的tasks的映射集合
  private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

// Set containing pending tasks with no locality preferences.
  // 存儲全部沒有位置信息的即將運行tasks的index索引的集合
  var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

// Set containing all pending tasks (also used as a stack, as above).
  // 存儲全部即將運行tasks的index索引的集合
  val allPendingTasks = new ArrayBuffer[Int]
這些數據結構。存儲了task與不同位置的載體的相應關系。在TaskSetManager對象被構造時。有例如以下代碼被運行:

// Add all our tasks to the pending lists. We do this in reverse order
  // of task index so that tasks with low indices get launched first.
  // 將全部的tasks加入到pending列表。我們用倒序的任務索引一遍較低索引的任務能夠被優先載入
  for (i <- (0 until numTasks).reverse) {
    addPendingTask(i)
  }
它對TaskSetManager中的tasks的索引倒序處理。addPendingTask()方法例如以下:

/** Add a task to all the pending-task lists that it should be on. */
  // 加入一個任務的索引到全部相關的pending-task索引列表
  private def addPendingTask(index: Int) {
    // Utility method that adds `index` to a list only if it's not already there
    // 定義了一個假設索引不存在加入索引至列表的工具方法
    def addTo(list: ArrayBuffer[Int]) {
      if (!list.contains(index)) {
        list += index
      }
    }

    // 遍歷task的優先位置
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        case e: ExecutorCacheTaskLocation => // 假設為ExecutorCacheTaskLocation
          // 加入任務索引index至pendingTasksForExecutor列表
          addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
        case e: HDFSCacheTaskLocation => {// 假設為HDFSCacheTaskLocation
          
          // 調用sched(即TaskSchedulerImpl)的getExecutorsAliveOnHost()方法。獲得指定Host上的Alive Executors
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) => {
              // 循環host上的每一個Alive Executor。加入任務索引index至pendingTasksForExecutor列表
              for (e <- set) {
                addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
              }
              logInfo(s"Pending task $index has a cached location at ${e.host} " +
                ", where there are executors " + set.mkString(","))
            }
            case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
                ", but there are no executors alive there.")
          }
        }
        case _ => Unit
      }
      
      // 加入任務索引index至pendingTasksForHost列表
      addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
      
      // 依據獲得任務優先位置host獲得機架rack,循環。加入任務索引index至pendingTasksForRack列表
      for (rack <- sched.getRackForHost(loc.host)) {
        addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
      }
    }

    // 假設task沒有位置屬性,則將任務的索引index加入到pendingTasksWithNoPrefs,pendingTasksWithNoPrefs為存儲全部沒有位置信息的即將執行tasks的index索引的集合
    if (tasks(index).preferredLocations == Nil) {
      addTo(pendingTasksWithNoPrefs)
    }

    // 將任務的索引index加入到allPendingTasks,allPendingTasks為存儲全部即將執行tasks的index索引的集合
    allPendingTasks += index  // No point scanning this whole list to find the old task there
  }
鑒於上面凝視非常清晰,這裏,我們僅僅說下重點,它是依據task的preferredLocations。來決定該往哪個數據結構存儲的。終於,將task的位置信息,存儲到不同的數據結構中。方便興許任務調度的處理。

同一時候。在TaskSetManager中TaskSchedulerImpl類型的變量中,還存在著例如以下幾個數據結構:

// Number of tasks running on each executor
  // 每一個executor上正在執行的tasks的數目
  private val executorIdToTaskCount = new HashMap[String, Int]

  // The set of executors we have on each host; this is used to compute hostsAlive, which
  // in turn is used to decide when we can attain data locality on a given host
  // 每一個host上executors的集合
  // 這個executorsByHost被用來計算host活躍性,反過來我們用它來決定在給定的主機上何時實現數據本地性
  protected val executorsByHost = new HashMap[String, HashSet[String]]

  // 每一個rack上hosts的映射關系
  protected val hostsByRack = new HashMap[String, HashSet[String]]
它反映了當前集群中executor、host、rack的相應關系。

而在computeValidLocalityLevels()方法中。依據task的位置屬性和當前集群中executor、host、rack的相應關系。依靠上面這兩組數據結構。就能非常方便的確定該TaskSet的TaskLocality Level,具體流程不再贅述,讀者可自行閱讀代碼。

這裏,我們僅僅說下getLocalityWait()方法,它是獲取Locality級別相應TaskSetManager等待分配下一個任務的時間,代碼例如以下:

// 獲取Locality級別相應TaskSetManager等待分配下一個任務的時間
  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
    // 默認等待時間。取自參數spark.locality.wait,默覺得3s
    val defaultWait = conf.get("spark.locality.wait", "3s")
    
    // 依據不同的TaskLocality,取不同的參數,設置TaskLocality等待時間
    // PROCESS_LOCAL取參數spark.locality.wait.process
    // NODE_LOCAL取參數spark.locality.wait.node
    // RACK_LOCAL取參數spark.locality.wait.rack
    val localityWaitKey = level match {
      case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
      case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
      case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
      case _ => null
    }

    if (localityWaitKey != null) {
      conf.getTimeAsMs(localityWaitKey, defaultWait)
    } else {
      0L
    }
  }
不同的Locality級別相應取不同的參數。

為什麽要有這個Locality級別相應TaskSetManager等待分配下一個任務的時間呢?我們先留個小小的疑問。

回到recomputeLocality()方法,接下來便是調用computeValidLocalityLevels()這種方法,計算當前最新的有效的位置策略Level。為什麽要再次計算呢?主要就是新的slave節點增加。我們須要又一次評估下集群中task位置偏好與當前集群executor、host、rack等總體資源的關系,起到了一個位置策略級別動態調整的一個效果。

然後,便是獲得位置策略級別的等待時間localityWaits、設置當前使用的位置策略級別的索引currentLocalityIndex,不再贅述。

好了。第7步就分析完了,有些細節留到以後再歸納整理吧。

接著分析第8步,循環sortedTaskSets,依照位置本地性規則調度每一個TaskSet。最大化實現任務的本地性。也就是對每一個taskSet。調用resourceOfferSingleTaskSet()方法進行任務集調度。顯然。我們須要首先看下resourceOfferSingleTaskSet()這種方法。代碼例如以下:

private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    
    // 標誌位launchedTask初始化為false,用它來標記是否有task被成功分配或者launched
    var launchedTask = false
    
    // 循環shuffledOffers,即每一個可用executor
    for (i <- 0 until shuffledOffers.size) {
      
      // 獲取其executorId和host
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      
      // 假設executor上可利用cpu數目大於每一個task須要的數目,則繼續task分配
      // CPUS_PER_TASK為參數spark.task.cpus配置的值,未配置的話默覺得1
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
        
          // 調用TaskSetManager的resourceOffer()方法。處理返回的每一個TaskDescription
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            
            // 分配task成功
            // 將task增加到tasks相應位置
            // 註意,tasks為一個空的。依據shuffledOffers和其可用cores生成的有一定結構的列表
            tasks(i) += task
            
            // 更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、
            // executorsByHost、availableCpus等數據結構
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet // taskId與TaskSetManager的映射關系
            taskIdToExecutorId(tid) = execId // taskId與ExecutorId的映射關系
            executorIdToTaskCount(execId) += 1// executor上正在執行的task數目加1
            executorsByHost(host) += execId// host上相應的executor的映射關系
            availableCpus(i) -= CPUS_PER_TASK// 能夠Cpu cores降低相應數目
            
            // 確保availableCpus(i)不小於0
            assert(availableCpus(i) >= 0)
            
            // 標誌位launchedTask設置為true
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }
該方法的主體流程例如以下:

1、標誌位launchedTask初始化為false。用它來標記是否有task被成功分配或者launched;

2、循環shuffledOffers,即每一個可用executor:

2.1、獲取其executorId和host;

2.2、假設executor上可利用cpu數目大於每一個task須要的數目。則繼續task分配;

2.3、調用TaskSetManager的resourceOffer()方法,處理返回的每一個TaskDescription:

2.3.1、分配task成功。將task增加到tasks相應位置(註意,tasks為一個空的,依據shuffledOffers和其可用cores生成的有一定結構的列表);

2.3.2、更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、executorsByHost、availableCpus等數據結構;

2.3.3、確保availableCpus(i)不小於0;

2.3.4、標誌位launchedTask設置為true;

3、返回launchedTask。

其它都好說,我們僅僅看下TaskSetManager的resourceOffer()方法。

代碼例如以下:

/**
   * Respond to an offer of a single executor from the scheduler by finding a task
   *
   * NOTE: this function is either called with a maxLocality which
   * would be adjusted by delay scheduling algorithm or it will be with a special
   * NO_PREF locality which will be not modified
   *
   * @param execId the executor Id of the offered resource
   * @param host  the host Id of the offered resource
   * @param maxLocality the maximum locality we want to schedule the tasks at
   */
  @throws[TaskNotSerializableException]
  def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    if (!isZombie) {
    
      // 當前時間
      val curTime = clock.getTimeMillis()

      // 確定能夠被同意的位置策略:allowedLocality
      var allowedLocality = maxLocality

      // 假設maxLocality不為TaskLocality.NO_PREF
      if (maxLocality != TaskLocality.NO_PREF) {
        // 獲取被同意的Locality。主要是看等待時間
        allowedLocality = getAllowedLocalityLevel(curTime)
        
        // 假設allowedLocality大於maxLocality。將maxLocality賦值給allowedLocality
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      // 出列task,即分配task
      dequeueTask(execId, host, allowedLocality) match {
        case Some((index, taskLocality, speculative)) => {
          
          // 找到相應的task
          // Found a task; do some bookkeeping and return a task description
          val task = tasks(index)
          val taskId = sched.newTaskId()
          // Do various bookkeeping
          // 更新copiesRunning
          copiesRunning(index) += 1
          val attemptNum = taskAttempts(index).size
          
          // 創建TaskInfo
          val info = new TaskInfo(taskId, index, attemptNum, curTime,
            execId, host, taskLocality, speculative)
          
          // 更新taskInfos
          taskInfos(taskId) = info
          
          // 更新taskAttempts
          taskAttempts(index) = info :: taskAttempts(index)
          // Update our locality level for delay scheduling
          // NO_PREF will not affect the variables related to delay scheduling
          
          // 設置currentLocalityIndex、lastLaunchTime
          if (maxLocality != TaskLocality.NO_PREF) {
            currentLocalityIndex = getLocalityIndex(taskLocality)
            lastLaunchTime = curTime
          }
          
          // Serialize and return the task
          // 開始時間
          val startTime = clock.getTimeMillis()
          
          // 序列化task,得到serializedTask
          val serializedTask: ByteBuffer = try {
            Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          } catch {
            // If the task cannot be serialized, then there's no point to re-attempt the task,
            // as it will always fail. So just abort the whole task-set.
            case NonFatal(e) =>
              val msg = s"Failed to serialize task $taskId, not attempting to retry it."
              logError(msg, e)
              abort(s"$msg Exception during serialization: $e")
              throw new TaskNotSerializableException(e)
          }
          if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
              !emittedTaskSizeWarning) {
            emittedTaskSizeWarning = true
            logWarning(s"Stage ${task.stageId} contains a task of very large size " +
              s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
              s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
          }
          
          // 加入running task
          addRunningTask(taskId)

          // We used to log the time it takes to serialize the task, but task size is already
          // a good proxy to task serialization time.
          // val timeTaken = clock.getTime() - startTime
          val taskName = s"task ${info.id} in stage ${taskSet.id}"
          logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
            s"$taskLocality, ${serializedTask.limit} bytes)")

          // 調用DagScheduler的taskStarted()方法,標記Task已啟動
          sched.dagScheduler.taskStarted(task, info)
          
          // 返回TaskDescription。當中包括taskId、attemptNumber、execId、index、serializedTask等重要信息
          // attemptNumber是猜測運行原理必須使用的,即拖後腿的任務能夠運行多份,誰先完畢用誰的結果
          return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
            taskName, index, serializedTask))
        }
        case _ =>
      }
    }
    None
  }
resourceOffer()方法的處理流程大體例如以下:

1、記錄當前時間;

2、 確定能夠被同意的位置策略:allowedLocality;

3、出列task。即分配task;

3.1、假設找到相應的task,即task能夠被分配:

3.1.1、完畢獲得taskId、更新copiesRunning、獲得attemptNum、創建TaskInfo、更新taskInfos、更新taskAttempts、設置currentLocalityIndex、lastLaunchTime等基礎數據結構的更新;

3.1.2、序列化task,得到serializedTask;

3.1.3、加入running task;

3.1.4、調用DagScheduler的taskStarted()方法,標記Task已啟動。

3.1.5、返回TaskDescription,當中包括taskId、attemptNumber、execId、index、serializedTask等重要信息,attemptNumber是猜測運行原理必須使用的。即拖後腿的任務能夠運行多份,誰先完畢用誰的結果。

首先說下這個allowedLocality。假設maxLocality不為TaskLocality.NO_PREF。我們須要調用getAllowedLocalityLevel(),傳入當前時間,得到allowedLocality,getAllowedLocalityLevel()方法邏輯比較簡單,代碼例如以下:

/**
   * Get the level we can launch tasks according to delay scheduling, based on current wait time.
   * 基於當前的等待是時間,得到我們能夠調度task的級別
   */
  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    // Remove the scheduled or finished tasks lazily
    // 推斷task能否夠被調度
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      // 循環
      while (indexOffset > 0) {
        // 索引遞減
        indexOffset -= 1
        
        // 獲得task索引
        val index = pendingTaskIds(indexOffset)
        
        // 假設相應task不存在不論什麽執行實例。且未執行成功,能夠調度。返回true
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
        
          // 從pendingTaskIds中移除
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // Walk through the list of tasks that can be scheduled at each location and returns true
    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
    // already been scheduled.
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      
      // 循環pendingTasks
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          
          // 推斷task能否夠被調度
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      // 移除數據
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }
    
    // 從當前索引currentLocalityIndex開始,循環myLocalityLevels
    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      
      // 是否存在待調度task,依據不同的Locality Level。調用moreTasksToRunIn()方法從不同的數據結構中獲取。
      // NO_PREF直接看pendingTasksWithNoPrefs是否為空
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
      }
      if (!moreTasks) {// 不存在能夠被調度的task
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        // 記錄lastLaunchTime
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        
        // 位置策略索引加1
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        
        // 更新localityWaits
        lastLaunchTime += localityWaits(currentLocalityIndex)
        
        // 位置策略索引加1
        currentLocalityIndex += 1
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
      } else {
      
        // 返回當前位置策略級別
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    
    // 返回當前位置策略級別
    myLocalityLevels(currentLocalityIndex)
  }
在確定allowedLocality後,我們就須要調用dequeueTask()方法。出列task。進行調度。代碼例如以下:

/**
   * Dequeue a pending task for a given node and return its index and locality level.
   * Only search for tasks matching the given locality constraint.
   *
   * @return An option containing (task index within the task set, locality, is speculative?)
   */
  private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    // 首先調用dequeueTaskFromList()方法。對PROCESS_LOCAL級別的task進行調度
    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }

    // PROCESS_LOCAL未調度到task的話,再調度NODE_LOCAL級別
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }

    // NODE_LOCAL未調度到task的話,再調度NO_PREF級別
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
      for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
        return Some((index, TaskLocality.PROCESS_LOCAL, false))
      }
    }

    // NO_PREF未調度到task的話。再調度RACK_LOCAL級別
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, false))
      }
    }

    // 最好是ANY級別的調度
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      for (index <- dequeueTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY, false))
      }
    }

    // find a speculative task if all others tasks have been scheduled
    // 假設全部的class都被調度的話,尋找一個speculative task。同MapReduce的猜測運行原理的思想
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  }
非常easy,依照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的順序進行調度。

最後,假設全部的class都被調度的話,尋找一個speculative task,同MapReduce的猜測運行原理的思想。

至此。我們得到了TaskDescription,也就知道了哪個Task須要在哪個節點上運行,而Task調度也就全講完了。

題外話:

要透徹的、清晰的解說一個復雜的流程。是非常費力的,短短幾篇文章也是遠遠不夠的。Task調度這兩篇文章。重在敘述一個完整的流程。同一時候解說部分細節。

在這兩篇文章的敘述中,肯定會有非常多細節沒講清晰、講透徹,甚至會有些理解錯誤的地方,希望高手指教,以免繼續誤導大家。

針對部分細節,和對流程的深入理解,我以後還會陸續推出博文。進行具體解說,並歸納總結。謝謝大家!






Spark源代碼分析之六:Task調度(二)