Spark源代碼分析之六:Task調度(二)
話說在《Spark源代碼分析之五:Task調度(一)》一文中,我們對Task調度分析到了DriverEndpoint的makeOffers()方法。這種方法針對接收到的ReviveOffers事件進行處理。代碼例如以下:
// Make fake resource offers on all executors // 在全部的executors上提供假的資源(抽象的資源。也就是資源的對象信息,我是這麽理解的) private def makeOffers() { // Filter out executors under killing // 過濾掉under killing的executors val activeExecutors = executorDataMap.filterKeys(executorIsAlive) // 利用activeExecutors中executorData的executorHost、freeCores,構造workOffers,即資源 val workOffers = activeExecutors.map { case (id, executorData) => // 創建WorkerOffer對象 new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq // 調用scheduler的resourceOffers()方法,分配資源,並調用launchTasks()方法,啟動tasks // 這個scheduler就是TaskSchedulerImpl launchTasks(scheduler.resourceOffers(workOffers)) }
第一。從executorDataMap中過濾掉under killing的executors,得到activeExecutors。
第二。利用activeExecutors中executorData的executorHost、freeCores,獲取workOffers。即資源。
第三,調用scheduler的resourceOffers()方法,分配資源,並調用launchTasks()方法,啟動tasks:這個scheduler就是TaskSchedulerImpl。
我們逐個進行分析。首先看看這個executorDataMap,其定義例如以下:
private val executorDataMap = new HashMap[String, ExecutorData]它是CoarseGrainedSchedulerBackend掌握的集群中executor的數據集合,key為String類型的executorId,value為ExecutorData類型的executor具體信息。
ExecutorData包括的主要內容例如以下:
1、executorEndpoint:RpcEndpointRef類型。RPC終端的引用,用於數據通信。
2、executorAddress:RpcAddress類型。RPC地址。用於數據通信。
3、executorHost:String類型,executor的主機;
4、freeCores:Int類型,可用處理器cores。
5、totalCores:Int類型。處理器cores總數;
6、logUrlMap:Map[String, String]類型,日誌url映射集合。
這樣,通過executorDataMap這個集合我們就能知道集群當前executor的負載情況。方便資源分析並調度任務。那麽executorDataMap內的數據是何時及怎樣更新的呢?go on,繼續分析。
對於第一步中,過濾掉under killing的executors,事實上現是對executorDataMap中的全部executor調用executorIsAlive()方法中。推斷是否在executorsPendingToRemove和executorsPendingLossReason兩個數據結構中。這兩個數據結構中的executors。都是即將移除或者已丟失的executor。
第二步。在過濾掉已失效或者立即要失效的executor後,利用activeExecutors中executorData的executorHost、freeCores,構造workOffers,即資源。這個workOffers更簡單,是一個WorkerOffer對象,它代表了系統的可利用資源。
WorkerOffer代碼例如以下:
/** * Represents free resources available on an executor. */ private[spark] case class WorkerOffer(executorId: String, host: String, cores: Int)而最重要的第三步,先是調用scheduler.resourceOffers(workOffers),即TaskSchedulerImpl的resourceOffers()方法,然後再調用launchTasks()方法將tasks載入到executor上去運行。
我們先看下TaskSchedulerImpl的resourceOffers()方法。代碼例如以下:
/** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. * * 被集群manager調用以提供slaves上的資源。首先來看下它的主體流程。例如以下:我們通過依照優先順序詢問活動task集中的task來回應。
* 我們通過循環的方式將task調度到每一個節點上以便tasks在集群中能夠保持大致的均衡。 */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added // 標記每一個slave節點為alive活躍的,而且記住它的主機名 // 同一時候也追蹤是否有executor被加入 var newExecAvail = false // 循環offers。WorkerOffer為包括executorId、host、cores的結構體。代表集群中的可用executor資源 for (o <- offers) { // 利用HashMap存儲executorId->host映射的集合 executorIdToHost(o.executorId) = o.host // Number of tasks running on each executor // 每一個executor上執行的task的數目,這裏假設之前沒有的話,初始化為0 executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) // 每一個host上executors的集合 // 這個executorsByHost被用來計算host活躍性。反過來我們用它來決定在給定的主機上何時實現數據本地性 if (!executorsByHost.contains(o.host)) {// 假設executorsByHost中不存在相應的host // executorsByHost中加入一條記錄。key為host,value為new HashSet[String]() executorsByHost(o.host) = new HashSet[String]() // 發送一個ExecutorAdded事件。並由DAGScheduler的handleExecutorAdded()方法處理 // eventProcessLoop.post(ExecutorAdded(execId, host)) // 調用DAGScheduler的executorAdded()方法處理 executorAdded(o.executorId, o.host) // 新的slave加入時,標誌位newExecAvail設置為true newExecAvail = true } // 更新hostsByRack for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. // 隨機shuffle offers以避免總是把任務放在同一組workers上執行 val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. // 構造一個task列表,以分配到每一個worker val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) // 能夠使用的cpu資源 val availableCpus = shuffledOffers.map(o => o.cores).toArray // 獲得排序好的task集合 // 先調用Pool.getSortedTaskSetQueue()方法 // 還記得這個Pool嗎,就是調度器中的調度池啊 val sortedTaskSets = rootPool.getSortedTaskSetQueue // 循環每一個taskSet for (taskSet <- sortedTaskSets) { // 記錄日誌 logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) // 假設存在新的活躍的executor(新的slave節點被加入時) if (newExecAvail) { // 調用executorAdded()方法 taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY var launchedTask = false // 依照位置本地性規則調度每一個TaskSet,最大化實現任務的本地性 // 位置本地性規則的順序是:PROCESS_LOCAL(同進程)、NODE_LOCAL(同節點)、NO_PREF、RACK_LOCAL(同機架)、ANY(不論什麽) for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { // 調用resourceOfferSingleTaskSet()方法進行任務集調度 launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } // 設置標誌位hasLaunchedTask if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
1、設置標誌位newExecAvail為false。這個標誌位是在新的slave被加入時被設置的一個標誌。以下在計算任務的本地性規則時會用到;
2、循環offers,WorkerOffer為包括executorId、host、cores的結構體。代表集群中的可用executor資源:
2.1、更新executorIdToHost,executorIdToHost為利用HashMap存儲executorId->host映射的集合;
2.2、更新executorIdToTaskCount,executorIdToTaskCount為每一個executor上執行的task的數目集合,這裏假設之前沒有的話,初始化為0;
2.3、假設新的slave增加:
2.3.1、executorsByHost中加入一條記錄,key為host。value為new HashSet[String]()。
2.3.2、發送一個ExecutorAdded事件。並由DAGScheduler的handleExecutorAdded()方法處理;
2.3.3、新的slave增加時,標誌位newExecAvail設置為true;
2.4、更新hostsByRack;
3、隨機shuffle offers(集群中可用executor資源)以避免總是把任務放在同一組workers上運行;
4、構造一個task列表。以分配到每一個worker,針對每一個executor依照其上的cores數目構造一個cores數目大小的ArrayBuffer,實現最大程度並行化;
5、獲取能夠使用的cpu資源availableCpus;
6、調用Pool.getSortedTaskSetQueue()方法獲得排序好的task集合,即sortedTaskSets;
7、循環sortedTaskSets中每一個taskSet:
7.1、假設存在新增加的slave,則調用taskSet的executorAdded()方法,動態調整位置策略級別,這麽做非常easy理解,新的slave節點增加了,那麽隨之而來的是數據有可能存在於它上面。那麽這時我們就須要又一次調整任務本地性規則;
8、循環sortedTaskSets,依照位置本地性規則調度每一個TaskSet,最大化實現任務的本地性:
8.1、對每一個taskSet。調用resourceOfferSingleTaskSet()方法進行任務集調度。
9、設置標誌位hasLaunchedTask。並返回tasks。
接下來。我們詳解下當中的每一個步驟。
第1步不用講,僅僅是設置標誌位newExecAvail為false。而且記住這個標誌位是在新的slave被加入時被設置的一個標誌,以下在計算任務的本地性規則時會用到;
第2步是集群中的可用executor資源offers的循環處理。更新一些數據結構,而且,在新的slave增加時。標誌位newExecAvail設置為true,而且發送一個ExecutorAdded事件,交由DAGScheduler的handleExecutorAdded()方法處理。我們來看下DAGScheduler的這種方法:
private[scheduler] def handleExecutorAdded(execId: String, host: String) { // remove from failedEpoch(execId) ? if (failedEpoch.contains(execId)) { logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } submitWaitingStages() }非常easy,先將相應host從failedEpoch中移除,failedEpoch存儲的是系統探測到的失效節點的集合,存儲的是execId->host的相應關系。接下來便是調用submitWaitingStages()方法提交等待的stages。
這種方法我們之前分析過。這裏不再贅述。可是存在一個疑點,之前stage都已提交了,這裏為什麽還要提交一遍呢?留待以後再尋找答案吧。
第3步隨機shuffle offers以避免總是把任務放在同一組workers上運行,這也沒什麽特別好講的,為了避免所謂的熱點問題而採取的一種隨機策略而已。
第4步也是。構造一個task列表。以分配到每一個worker,針對每一個executor,創建一個ArrayBuffer。存儲的類型為TaskDescription,大小為executor的cores。即最大程度並行化,充分利用executor的cores。
第5步就是獲取到上述executor集合中cores集合availableCpus,即能夠使用的cpu資源。
以下我們重點分析下第6步,它是調用Pool.getSortedTaskSetQueue()方法。獲得排序好的task集合。還記得這個Pool嗎?它就是上篇文章《Spark源代碼分析之五:Task調度(一)》裏講到的調度器的中的調度池啊,我們看下它的getSortedTaskSetQueue()方法。代碼例如以下:
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { // 創建一個ArrayBuffer。存儲TaskSetManager var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] // schedulableQueue為Pool中的一個調度隊列。裏面存儲的是TaskSetManager // 在TaskScheduler的submitTasks()方法中,通過層層調用,終於通過Pool的addSchedulable()方法將之前生成的TaskSetManager增加到schedulableQueue中 // 而TaskSetManager包括詳細的tasks // taskSetSchedulingAlgorithm為調度算法,包括FIFO和FAIR兩種 // 這裏針對調度隊列,<span style="font-family: Arial, Helvetica, sans-serif;">依照調度算法對其排序,</span>生成一個序列sortedSchedulableQueue, val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) // 循環sortedSchedulableQueue中全部的TaskSetManager,通過其getSortedTaskSetQueue來填充sortedTaskSetQueue for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } // 返回sortedTaskSetQueue sortedTaskSetQueue }首先,創建一個ArrayBuffer,用來存儲TaskSetManager,然後。對Pool中已經存儲好的TaskSetManager,即schedulableQueue隊列,依照taskSetSchedulingAlgorithm調度規則或算法來排序,得到sortedSchedulableQueue,並循環其內的TaskSetManager。通過其getSortedTaskSetQueue()方法來填充sortedTaskSetQueue,最後返回。
TaskSetManager的getSortedTaskSetQueue()方法也非常easy,追加ArrayBuffer[TaskSetManager]就可以。例如以下:
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]() sortedTaskSetQueue += this sortedTaskSetQueue }我們著重來解說下這個調度準則或算法taskSetSchedulingAlgorithm。其定義例如以下:
// 調度準則。包含FAIR和FIFO兩種 var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { case SchedulingMode.FAIR => new FairSchedulingAlgorithm() case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm() } }它包含兩種。FAIR和FIFO。以下我們以FIFO為例來解說。
代碼在SchedulingAlgorithm.scala中。例如以下:
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { // 比較函數 override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority // 先比較priority,即優先級 // priority同樣的話,再比較stageId // 前者小於後者的話,返回true,否則為false var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } if (res < 0) { true } else { false } } }非常easy,就是先比較兩個TaskSetManagerder的優先級priority。優先級同樣再比較stageId。而這個priority在TaskSet生成時。就是jobId,也就是FIFO是先依照Job的順序再依照Stage的順序進行順序調度。一個Job完了再調度還有一個Job,Job內是依照Stage的順序進行調度。關於priority生成的代碼例如以下所看到的:
// 利用taskScheduler.submitTasks()提交task // jobId即為TaskSet的priority taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
比較復雜的是FairSchedulingAlgorithm。代碼例如以下:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare: Int = 0 // 前者的runningTasks<minShare而後者相反的的話,返回true。 // runningTasks為正在執行的tasks數目,minShare為最小共享cores數。 // 前面兩個if推斷的意思是兩個TaskSetManager中,假設當中一個正在執行的tasks數目小於最小共享cores數。則優先調度該TaskSetManager if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) {// 前者的runningTasks>=minShare而後者相反的的話。返回true return false } else if (s1Needy && s2Needy) { // 假設兩者的正在執行的tasks數目都比最小共享cores數小的話,再比較minShareRatio // minShareRatio為正在執行的tasks數目與最小共享cores數的比率 compare = minShareRatio1.compareTo(minShareRatio2) } else { // 最後比較taskToWeightRatio,即權重使用率,weight代表調度池對資源獲取的權重,越大須要越多的資源 compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name } } }它的調度邏輯主要例如以下:
1、優先看正在執行的tasks數目是否小於最小共享cores數,假設兩者僅僅有一個小於,則優先調度小於的那個。原因是既然正在執行的Tasks數目小於共享cores數,說明該節點資源比較充足,應該優先利用;
2、假設不是僅僅有一個的正在執行的tasks數目是否小於最小共享cores數的話,則再推斷正在執行的tasks數目與最小共享cores數的比率。
3、最後再比較權重使用率,即正在執行的tasks數目與該TaskSetManager的權重weight的比,weight代表調度池對資源獲取的權重,越大須要越多的資源。
到此為止,獲得了排序好的task集合,我們來到了第7步:假設存在新增加的slave。則調用taskSet的executorAdded()方法,即TaskSetManager的executorAdded()方法。代碼例如以下:
def executorAdded() { recomputeLocality() }
沒說的,繼續追蹤。看recomputeLocality()方法。代碼例如以下:
// 又一次計算位置 def recomputeLocality() { // 首先獲取之前的位置Level // currentLocalityIndex為有效位置策略級別中的索引,默覺得0 val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) // 計算有效的位置Level myLocalityLevels = computeValidLocalityLevels() // 獲得位置策略級別的等待時間 localityWaits = myLocalityLevels.map(getLocalityWait) // 設置當前使用的位置策略級別的索引 currentLocalityIndex = getLocalityIndex(previousLocalityLevel) }首先說下這個currentLocalityIndex,它的定義為:
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels它是有效位置策略級別中的索引,指示當前的位置信息。也就是我們上一個task被launched所使用的Locality Level。
接下來看下myLocalityLevels,它是任務集TaskSet中應該使用哪種位置Level的數組,在TaskSetManager對象實例化時即被初始化。變量定義例如以下:
// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling // 確定在我們的任務集TaskSet中應該使用哪種位置Level,以便我們做延遲調度 var myLocalityLevels = computeValidLocalityLevels()computeValidLocalityLevels()方法為計算該TaskSet使用的位置策略的方法,代碼例如以下:
/** * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been * added to queues using addPendingTask. * 計算該TaskSet使用的位置策略。假設全部的任務已經通過addPendingTask()被加入入隊列 */ private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { // 引入任務位置策略 import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} // 創建ArrayBuffer類型的levels,存儲TaskLocality val levels = new ArrayBuffer[TaskLocality.TaskLocality] // 假設pendingTasksForExecutor不為空,且PROCESS_LOCAL級別中TaskSetManager等待分配下一個任務的時間不為零,且 // 假設pendingTasksForExecutor中每一個executorId在sched的executorIdToTaskCount中存在 // executorIdToTaskCount為每一個executor上執行的task的數目集合 if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } // 假設pendingTasksForHost不為空。且NODE_LOCAL級別中TaskSetManager等待分配下一個任務的時間不為零,且 // 假設pendingTasksForHost中每一個host在sched的executorsByHost中存在 // executorsByHost為每一個host上executors的集合 if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 && pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } // 假設存在沒有位置信息的task。則加入NO_PREF級別 if (!pendingTasksWithNoPrefs.isEmpty) { levels += NO_PREF } // 相同處理RACK_LOCAL級別 if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 && pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } // 最後加上一個ANY級別 levels += ANY logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", ")) // 返回 levels.toArray }這裏,我們先看下當中幾個比較重要的數據結構。在TaskSetManager中,存在例如以下幾個數據結構:
// 每一個executor上即將被運行的tasks的映射集合 private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]] // 每一個host上即將被運行的tasks的映射集合 private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] // 每一個rack上即將被運行的tasks的映射集合 private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] // Set containing pending tasks with no locality preferences. // 存儲全部沒有位置信息的即將運行tasks的index索引的集合 var pendingTasksWithNoPrefs = new ArrayBuffer[Int] // Set containing all pending tasks (also used as a stack, as above). // 存儲全部即將運行tasks的index索引的集合 val allPendingTasks = new ArrayBuffer[Int]這些數據結構。存儲了task與不同位置的載體的相應關系。在TaskSetManager對象被構造時。有例如以下代碼被運行:
// Add all our tasks to the pending lists. We do this in reverse order // of task index so that tasks with low indices get launched first. // 將全部的tasks加入到pending列表。我們用倒序的任務索引一遍較低索引的任務能夠被優先載入 for (i <- (0 until numTasks).reverse) { addPendingTask(i) }它對TaskSetManager中的tasks的索引倒序處理。addPendingTask()方法例如以下:
/** Add a task to all the pending-task lists that it should be on. */ // 加入一個任務的索引到全部相關的pending-task索引列表 private def addPendingTask(index: Int) { // Utility method that adds `index` to a list only if it's not already there // 定義了一個假設索引不存在加入索引至列表的工具方法 def addTo(list: ArrayBuffer[Int]) { if (!list.contains(index)) { list += index } } // 遍歷task的優先位置 for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => // 假設為ExecutorCacheTaskLocation // 加入任務索引index至pendingTasksForExecutor列表 addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer)) case e: HDFSCacheTaskLocation => {// 假設為HDFSCacheTaskLocation // 調用sched(即TaskSchedulerImpl)的getExecutorsAliveOnHost()方法。獲得指定Host上的Alive Executors val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => { // 循環host上的每一個Alive Executor。加入任務索引index至pendingTasksForExecutor列表 for (e <- set) { addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer)) } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) } case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } } case _ => Unit } // 加入任務索引index至pendingTasksForHost列表 addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) // 依據獲得任務優先位置host獲得機架rack,循環。加入任務索引index至pendingTasksForRack列表 for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) } } // 假設task沒有位置屬性,則將任務的索引index加入到pendingTasksWithNoPrefs,pendingTasksWithNoPrefs為存儲全部沒有位置信息的即將執行tasks的index索引的集合 if (tasks(index).preferredLocations == Nil) { addTo(pendingTasksWithNoPrefs) } // 將任務的索引index加入到allPendingTasks,allPendingTasks為存儲全部即將執行tasks的index索引的集合 allPendingTasks += index // No point scanning this whole list to find the old task there }鑒於上面凝視非常清晰,這裏,我們僅僅說下重點,它是依據task的preferredLocations。來決定該往哪個數據結構存儲的。終於,將task的位置信息,存儲到不同的數據結構中。方便興許任務調度的處理。
同一時候。在TaskSetManager中TaskSchedulerImpl類型的變量中,還存在著例如以下幾個數據結構:
// Number of tasks running on each executor // 每一個executor上正在執行的tasks的數目 private val executorIdToTaskCount = new HashMap[String, Int] // The set of executors we have on each host; this is used to compute hostsAlive, which // in turn is used to decide when we can attain data locality on a given host // 每一個host上executors的集合 // 這個executorsByHost被用來計算host活躍性,反過來我們用它來決定在給定的主機上何時實現數據本地性 protected val executorsByHost = new HashMap[String, HashSet[String]] // 每一個rack上hosts的映射關系 protected val hostsByRack = new HashMap[String, HashSet[String]]它反映了當前集群中executor、host、rack的相應關系。
而在computeValidLocalityLevels()方法中。依據task的位置屬性和當前集群中executor、host、rack的相應關系。依靠上面這兩組數據結構。就能非常方便的確定該TaskSet的TaskLocality Level,具體流程不再贅述,讀者可自行閱讀代碼。
這裏,我們僅僅說下getLocalityWait()方法,它是獲取Locality級別相應TaskSetManager等待分配下一個任務的時間,代碼例如以下:
// 獲取Locality級別相應TaskSetManager等待分配下一個任務的時間 private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { // 默認等待時間。取自參數spark.locality.wait,默覺得3s val defaultWait = conf.get("spark.locality.wait", "3s") // 依據不同的TaskLocality,取不同的參數,設置TaskLocality等待時間 // PROCESS_LOCAL取參數spark.locality.wait.process // NODE_LOCAL取參數spark.locality.wait.node // RACK_LOCAL取參數spark.locality.wait.rack val localityWaitKey = level match { case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process" case TaskLocality.NODE_LOCAL => "spark.locality.wait.node" case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack" case _ => null } if (localityWaitKey != null) { conf.getTimeAsMs(localityWaitKey, defaultWait) } else { 0L } }不同的Locality級別相應取不同的參數。
為什麽要有這個Locality級別相應TaskSetManager等待分配下一個任務的時間呢?我們先留個小小的疑問。
回到recomputeLocality()方法,接下來便是調用computeValidLocalityLevels()這種方法,計算當前最新的有效的位置策略Level。為什麽要再次計算呢?主要就是新的slave節點增加。我們須要又一次評估下集群中task位置偏好與當前集群executor、host、rack等總體資源的關系,起到了一個位置策略級別動態調整的一個效果。
然後,便是獲得位置策略級別的等待時間localityWaits、設置當前使用的位置策略級別的索引currentLocalityIndex,不再贅述。
好了。第7步就分析完了,有些細節留到以後再歸納整理吧。
接著分析第8步,循環sortedTaskSets,依照位置本地性規則調度每一個TaskSet。最大化實現任務的本地性。也就是對每一個taskSet。調用resourceOfferSingleTaskSet()方法進行任務集調度。顯然。我們須要首先看下resourceOfferSingleTaskSet()這種方法。代碼例如以下:
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { // 標誌位launchedTask初始化為false,用它來標記是否有task被成功分配或者launched var launchedTask = false // 循環shuffledOffers,即每一個可用executor for (i <- 0 until shuffledOffers.size) { // 獲取其executorId和host val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host // 假設executor上可利用cpu數目大於每一個task須要的數目,則繼續task分配 // CPUS_PER_TASK為參數spark.task.cpus配置的值,未配置的話默覺得1 if (availableCpus(i) >= CPUS_PER_TASK) { try { // 調用TaskSetManager的resourceOffer()方法。處理返回的每一個TaskDescription for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { // 分配task成功 // 將task增加到tasks相應位置 // 註意,tasks為一個空的。依據shuffledOffers和其可用cores生成的有一定結構的列表 tasks(i) += task // 更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、 // executorsByHost、availableCpus等數據結構 val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet // taskId與TaskSetManager的映射關系 taskIdToExecutorId(tid) = execId // taskId與ExecutorId的映射關系 executorIdToTaskCount(execId) += 1// executor上正在執行的task數目加1 executorsByHost(host) += execId// host上相應的executor的映射關系 availableCpus(i) -= CPUS_PER_TASK// 能夠Cpu cores降低相應數目 // 確保availableCpus(i)不小於0 assert(availableCpus(i) >= 0) // 標誌位launchedTask設置為true launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }該方法的主體流程例如以下:
1、標誌位launchedTask初始化為false。用它來標記是否有task被成功分配或者launched;
2、循環shuffledOffers,即每一個可用executor:
2.1、獲取其executorId和host;
2.2、假設executor上可利用cpu數目大於每一個task須要的數目。則繼續task分配;
2.3、調用TaskSetManager的resourceOffer()方法,處理返回的每一個TaskDescription:
2.3.1、分配task成功。將task增加到tasks相應位置(註意,tasks為一個空的,依據shuffledOffers和其可用cores生成的有一定結構的列表);
2.3.2、更新taskIdToTaskSetManager、taskIdToExecutorId、executorIdToTaskCount、executorsByHost、availableCpus等數據結構;
2.3.3、確保availableCpus(i)不小於0;
2.3.4、標誌位launchedTask設置為true;
3、返回launchedTask。
其它都好說,我們僅僅看下TaskSetManager的resourceOffer()方法。
代碼例如以下:
/** * Respond to an offer of a single executor from the scheduler by finding a task * * NOTE: this function is either called with a maxLocality which * would be adjusted by delay scheduling algorithm or it will be with a special * NO_PREF locality which will be not modified * * @param execId the executor Id of the offered resource * @param host the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at */ @throws[TaskNotSerializableException] def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { if (!isZombie) { // 當前時間 val curTime = clock.getTimeMillis() // 確定能夠被同意的位置策略:allowedLocality var allowedLocality = maxLocality // 假設maxLocality不為TaskLocality.NO_PREF if (maxLocality != TaskLocality.NO_PREF) { // 獲取被同意的Locality。主要是看等待時間 allowedLocality = getAllowedLocalityLevel(curTime) // 假設allowedLocality大於maxLocality。將maxLocality賦值給allowedLocality if (allowedLocality > maxLocality) { // We're not allowed to search for farther-away tasks allowedLocality = maxLocality } } // 出列task,即分配task dequeueTask(execId, host, allowedLocality) match { case Some((index, taskLocality, speculative)) => { // 找到相應的task // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() // Do various bookkeeping // 更新copiesRunning copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size // 創建TaskInfo val info = new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative) // 更新taskInfos taskInfos(taskId) = info // 更新taskAttempts taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling // NO_PREF will not affect the variables related to delay scheduling // 設置currentLocalityIndex、lastLaunchTime if (maxLocality != TaskLocality.NO_PREF) { currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } // Serialize and return the task // 開始時間 val startTime = clock.getTimeMillis() // 序列化task,得到serializedTask val serializedTask: ByteBuffer = try { Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. case NonFatal(e) => val msg = s"Failed to serialize task $taskId, not attempting to retry it." logError(msg, e) abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } // 加入running task addRunningTask(taskId) // We used to log the time it takes to serialize the task, but task size is already // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," + s"$taskLocality, ${serializedTask.limit} bytes)") // 調用DagScheduler的taskStarted()方法,標記Task已啟動 sched.dagScheduler.taskStarted(task, info) // 返回TaskDescription。當中包括taskId、attemptNumber、execId、index、serializedTask等重要信息 // attemptNumber是猜測運行原理必須使用的,即拖後腿的任務能夠運行多份,誰先完畢用誰的結果 return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, taskName, index, serializedTask)) } case _ => } } None }resourceOffer()方法的處理流程大體例如以下:
1、記錄當前時間;
2、 確定能夠被同意的位置策略:allowedLocality;
3、出列task。即分配task;
3.1、假設找到相應的task,即task能夠被分配:
3.1.1、完畢獲得taskId、更新copiesRunning、獲得attemptNum、創建TaskInfo、更新taskInfos、更新taskAttempts、設置currentLocalityIndex、lastLaunchTime等基礎數據結構的更新;
3.1.2、序列化task,得到serializedTask;
3.1.3、加入running task;
3.1.4、調用DagScheduler的taskStarted()方法,標記Task已啟動。
3.1.5、返回TaskDescription,當中包括taskId、attemptNumber、execId、index、serializedTask等重要信息,attemptNumber是猜測運行原理必須使用的。即拖後腿的任務能夠運行多份,誰先完畢用誰的結果。
首先說下這個allowedLocality。假設maxLocality不為TaskLocality.NO_PREF。我們須要調用getAllowedLocalityLevel(),傳入當前時間,得到allowedLocality,getAllowedLocalityLevel()方法邏輯比較簡單,代碼例如以下:
/** * Get the level we can launch tasks according to delay scheduling, based on current wait time. * 基於當前的等待是時間,得到我們能夠調度task的級別 */ private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = { // Remove the scheduled or finished tasks lazily // 推斷task能否夠被調度 def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = { var indexOffset = pendingTaskIds.size // 循環 while (indexOffset > 0) { // 索引遞減 indexOffset -= 1 // 獲得task索引 val index = pendingTaskIds(indexOffset) // 假設相應task不存在不論什麽執行實例。且未執行成功,能夠調度。返回true if (copiesRunning(index) == 0 && !successful(index)) { return true } else { // 從pendingTaskIds中移除 pendingTaskIds.remove(indexOffset) } } false } // Walk through the list of tasks that can be scheduled at each location and returns true // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have // already been scheduled. def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = { val emptyKeys = new ArrayBuffer[String] // 循環pendingTasks val hasTasks = pendingTasks.exists { case (id: String, tasks: ArrayBuffer[Int]) => // 推斷task能否夠被調度 if (tasksNeedToBeScheduledFrom(tasks)) { true } else { emptyKeys += id false } } // The key could be executorId, host or rackId // 移除數據 emptyKeys.foreach(id => pendingTasks.remove(id)) hasTasks } // 從當前索引currentLocalityIndex開始,循環myLocalityLevels while (currentLocalityIndex < myLocalityLevels.length - 1) { // 是否存在待調度task,依據不同的Locality Level。調用moreTasksToRunIn()方法從不同的數據結構中獲取。 // NO_PREF直接看pendingTasksWithNoPrefs是否為空 val moreTasks = myLocalityLevels(currentLocalityIndex) match { case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost) case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) } if (!moreTasks) {// 不存在能夠被調度的task // This is a performance optimization: if there are no more tasks that can // be scheduled at a particular locality level, there is no point in waiting // for the locality wait timeout (SPARK-4939). // 記錄lastLaunchTime lastLaunchTime = curTime logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") // 位置策略索引加1 currentLocalityIndex += 1 } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { // Jump to the next locality level, and reset lastLaunchTime so that the next locality // wait timer doesn't immediately expire // 更新localityWaits lastLaunchTime += localityWaits(currentLocalityIndex) // 位置策略索引加1 currentLocalityIndex += 1 logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " + s"${localityWaits(currentLocalityIndex)}ms") } else { // 返回當前位置策略級別 return myLocalityLevels(currentLocalityIndex) } } // 返回當前位置策略級別 myLocalityLevels(currentLocalityIndex) }在確定allowedLocality後,我們就須要調用dequeueTask()方法。出列task。進行調度。代碼例如以下:
/** * Dequeue a pending task for a given node and return its index and locality level. * Only search for tasks matching the given locality constraint. * * @return An option containing (task index within the task set, locality, is speculative?) */ private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) : Option[(Int, TaskLocality.Value, Boolean)] = { // 首先調用dequeueTaskFromList()方法。對PROCESS_LOCAL級別的task進行調度 for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } // PROCESS_LOCAL未調度到task的話,再調度NODE_LOCAL級別 if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) { for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL, false)) } } // NODE_LOCAL未調度到task的話,再調度NO_PREF級別 if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } } // NO_PREF未調度到task的話。再調度RACK_LOCAL級別 if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack)) } { return Some((index, TaskLocality.RACK_LOCAL, false)) } } // 最好是ANY級別的調度 if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { for (index <- dequeueTaskFromList(execId, allPendingTasks)) { return Some((index, TaskLocality.ANY, false)) } } // find a speculative task if all others tasks have been scheduled // 假設全部的class都被調度的話,尋找一個speculative task。同MapReduce的猜測運行原理的思想 dequeueSpeculativeTask(execId, host, maxLocality).map { case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)} }非常easy,依照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的順序進行調度。
最後,假設全部的class都被調度的話,尋找一個speculative task,同MapReduce的猜測運行原理的思想。
至此。我們得到了TaskDescription,也就知道了哪個Task須要在哪個節點上運行,而Task調度也就全講完了。
題外話:
要透徹的、清晰的解說一個復雜的流程。是非常費力的,短短幾篇文章也是遠遠不夠的。Task調度這兩篇文章。重在敘述一個完整的流程。同一時候解說部分細節。
在這兩篇文章的敘述中,肯定會有非常多細節沒講清晰、講透徹,甚至會有些理解錯誤的地方,希望高手指教,以免繼續誤導大家。
針對部分細節,和對流程的深入理解,我以後還會陸續推出博文。進行具體解說,並歸納總結。謝謝大家!
Spark源代碼分析之六:Task調度(二)