1. 程式人生 > >資源排程機制原始碼分析(schedule方法,兩種排程演算法)

資源排程機制原始碼分析(schedule方法,兩種排程演算法)

sparkContext初始化後會註冊Application,然後會呼叫schedule方法,如何為Application在worker上啟動Executor,Executor啟動後,DAGScheduler和TaskScheduler才能分配task給Executor來進行計算。所以schedule是把整個流程竄起來的重點。

private def schedule(): Unit = {
  //standby master是不會進行Application等資源排程的
  if (state != RecoveryState.ALIVE) {
    return
}
  // Drivers take strict precedence over executors
  //第一行重要程式碼,Random.shuffle的原理,將集合的元素隨機大量
    取出workers中所有之前註冊上來的workers,進行過濾,必須是狀態ALIVE的worker
    對狀態為ALIVE的worker,呼叫Random的shuffle方法進行隨機打亂
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 //首先排程Driver,什麼情況下會註冊Driver並且導致Driver被排程,其實只有用yarn-cluster模式提交     才會。因為standalone和yarn-client模式,都是在本地直接啟動Driver,而不會來註冊Driver,更不可能排程Driver     遍歷waitingDrivers ArrayBuffer
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers.
var launched = false var numWorkersVisited = 0     //只要有活著的worker沒有遍歷到,並且driver還沒有被啟動,也就是launched為false while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1     //如果當前這個worker的空閒記憶體和cpu數量大於等於Driver需要的 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {         //啟動Driver launchDriver(worker, driver)         //並且將driver從ArrayBuffer中移除 waitingDrivers -= driver launched = true }       //將指標指向下一個worker curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() }
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  //將driver加入worker記憶體的快取結構
    將worker內使用的記憶體和cpu數量,都加上driver需要的記憶體和cpu數量
  worker.addDriver(driver)
  //同時把worker也加入到driver的快取結構中
  driver.worker = Some(worker)
  //然後呼叫worker的RpcEndpoint,給它傳送LaunchDriver訊息,讓worker來啟動Driver
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  //將driver的狀態設定為Running
  driver.state = DriverState.RUNNING
}

 Application的排程機制(核心之核心 )
 兩種演算法:一種是spreadOutApps(預設),另一種是非spreadOutApps

 通過spreadOutApps(預設)演算法,其實 會將每個application,要啟動的executor都平均分配 到每個worker上
 比如有20cpu core,有10個worker,那麼實際會遍歷兩遍,每次迴圈,每個worker分配一個core
 最後每個worker分配了兩個core

非spreadOutApps演算法與上面的正好相反,每個application,都儘可能少的分配到worker上去, 比如總共有10個worker,每個有10個core application總共要分配20個core,那麼只會分配到兩個worker上,每個worker都佔滿了這10個core那麼其它的application只能分配另外的worker上去了。 所以我們在spark-submit中配置了要10個executor,每個execuotr需要2個core 那麼共需要20個core,但這種演算法中,其實只會啟動兩個executor,每個executor有10個core
//這個方法就是真正啟動executor的方法,在執行這個方法之前,會呼叫一些其他的驗證方法,得到一個結果集合
//assignedCores,這個集合計算出了每個一個work上能分配幾個core。通過這個結果,就能知道啟動幾個executor
private def allocateWorkerResourceToExecutors(
        app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    //在迴圈的當前worker裡,要啟動exec的個數 (該worker的總數core / 每個exec需要的core = exec個數,如果沒配置每個exec所需core,則預設為1)
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    //如果沒配置每個exec所需core,直接在這個把分配給這個worker的所有core全部用來啟動這個exec,否者按照配置的來
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
        val exec = app.addExecutor(worker, coresToAssign)
        launchExecutor(worker, exec)
        app.state = ApplicationState.RUNNING
    }
}
/**
 總結:
 提交任務時指定每個exec分配2個core,啟動3個executor
 那麼在spark會用預設演算法spreadOutApps,平均給每個worker分配資源的情況下
 先計算出總core數  2*3 = 6
 然後給某三個worker一個分配1個exec(assignedCores集合裡存兩個Int:2,2,2 代表三個worker分別分配2個core)
 然後公式 assignedCores(該worker啟動exec所需core總數) / coresPerExecutor(配置的每個exec啟動core個數) = (2/2=1)(該worker啟動的exe個數)
 然後公式 coresPerExecutor.getOrElse(assignedCores) 到底要啟動幾個core(2)
 最後:得到了要啟動2個core,得到了要啟動exec的個數,就迴圈exec個數來分別啟動2個core

相關推薦

資源排程機制原始碼分析schedule方法排程演算法

sparkContext初始化後會註冊Application,然後會呼叫schedule方法,如何為Application在worker上啟動Executor,Executor啟動後,DAGScheduler和TaskScheduler才能分配task給Executor來進行

Master原理剖析與原始碼分析資源排程機制原始碼分析schedule()資源排程演算法

1、主備切換機制原理剖析與原始碼分析 2、註冊機制原理剖析與原始碼分析 3、狀態改變處理機制原始碼分析 4、資源排程機制原始碼分析(schedule(),兩種資源排程演算法) * Dri

Android App啟動時Apk資源載入機制原始碼分析

在Andorid開發中我們要設定文字或圖片顯示,都直接通過Api一步呼叫就完成了,不僅是我們工程下res資源以及系統自帶的framwork資源也可以,那這些資源打包成Apk之後是如何被系統載入從而顯示出來的呢。 這裡我要從Apk安裝之後啟動流程開始講起,在桌面

Android Apk資源載入機制原始碼分析以及資源動態載入實現系列文章

Android系統中執行Apk時是如何對包內的資源進行載入以及我們開發中設定相關資源後又是如何被加載出來,這個系列我們可以學習系統載入資源的機制原理,然後我們再巧妙的利用學習系統載入技巧來打造我們自己的動態資源載入機制實現。 這個系列主要分為如下3部分內容來講

Dubbo SPI 機制原始碼分析基於2.7.7

Dubbo SPI 機制涉及到 `@SPI`、`@Adaptive`、`@Activate` 三個註解,ExtensionLoader 作為 Dubbo SPI 機制的核心負責載入和管理擴充套件點及其實現。本文以 ExtensionLoader 的原始碼作為分析主線,進而引出三個註解的作用和工作機制。 Ex

聚類分析劃分方法層次方法、密度方法 ---機器學習

本節學習聚類分析,聚類屬於無監督學習,其中聚類的方法有很多種常見的有K-means、層次聚類(Hierarchical clustering)、譜聚類(Spectral Clustering)等,在這裡,上來不會直接介紹這些理論,需要一些基礎知識鋪墊,和前面一樣,一上來就直接介紹聚類演算法,顯得

Activiti原始碼分析框架、核心類。。。

http://jiangwenfeng762.iteye.com/blog/1338553 Activiti是業界很流行的java工作流引擎,關於Activiti與JBPM5的關係和如何選擇不是本文要討論的話題,相關內容可以baidu一下。Activiti從架構角度看是

機器學習--聚類分析劃分方法層次方法、密度方法

本節學習聚類分析,聚類屬於無監督學習,其中聚類的方法有很多種常見的有K-means、層次聚類(Hierarchical clustering)、譜聚類(Spectral Clustering)等,在這裡,上來不會直接介紹這些理論,需要一些基礎知識鋪墊,和前面一樣,一上來就直接

需求分析ER圖數據流圖

數據流圖 需求 images com 分析 blog src 需求分析 image 需求分析(ER圖,數據流圖)

Eclipse匯入git工程HTTP與SSH匯入方式

Eclipse匯入Git工程(HTTP與SSH兩種匯入方式) Eclipse git官方使用說明:http://wiki.eclipse.org/EGit/User_Guide#Eclipse_SSH_Configuration 一、使用HTTP方式匯入git工程

合併個陣列並去重ES5和ES6方式實現

 ES6實現方式 let arr1 = [1, 1, 2, 3, 6, 9, 5, 5, 4] let arr2 = [1, 2, 5, 4, 9, 7, 7, 8, 8] f

用Python3、NetCore、Shell分別開發一個Ubuntu版的定時提醒附NetCore跨平臺釋出方式

新增直接執行py檔案的補充:請在py前面加上:#!/usr/bin/env python3 然後再執行 sudo chmod +x ./task.py 下次執行直接 ./task.py 平時經常用定時提醒來提高工作效率,到了Linux。。。。蒙圈了,以前C#寫的不能跨平臺

快速排序:Java實現必須掌握的實現方式

第一種實現方式採用《演算法導論》(原書第3版)中的快速排序演算法,且參考了《劍指Offer》(第2版)中遞迴實現快速排序的程式碼,如下: public void quickSort_1(int[]

Glide原始碼分析——從用法來看之load&into方法

上一篇,我們分析了with方法,文章連結: https://blog.csdn.net/qq_36391075/article/details/82833260 在with方法中,進行了Glide的初始化,建立了RequesManger,並且綁定了生命週期,最終返回了一個Reques

Glide原始碼分析從用法來看之with方法

繼續啃原始碼,用過Glide的人,肯定都覺得它好好用,我們一般只需要幾行程式碼,就可以達到我們想要的效果,可以在這個背後是什麼呢?就需要我們來看了。 我一般看原始碼,我喜歡先從用法來看,然後一步一步的再細扣,所以就先從用法來看Glide的整體流程。 用過Glide的人,用下面這段

Dubbo原始碼分析Dubbo通訊的編碼解碼機制

Dubbo原始碼分析(一)Dubbo的擴充套件點機制 Dubbo原始碼分析(二)Dubbo服務釋出Export Dubbo原始碼分析(三)Dubbo的服務引用Refer Dubbo原始碼分析(四)Dubbo呼叫鏈-消費端(叢集容錯機制) Dubbo原始碼分析(五)Dubbo呼叫鏈-服務端

大資料之Spark--- Spark核心APISpark術語Spark三級排程流程原始碼分析

一、Spark核心API ----------------------------------------------- [SparkContext] 連線到spark叢集,入口點. [HadoopRDD] extends RDD 讀取hadoop

Java定時任務Timer排程器【一】 原始碼分析圖文詳解版

就以鬧鐘的例子開頭吧(後續小節皆以鬧鐘為例,所有原始碼只列關鍵部分)。 public class ScheduleDemo { public static void main(String[] args) throws InterruptedException {

Java定時任務Timer排程器【二】 多執行緒原始碼分析圖文版

  上一節通過一個小例子分析了Timer執行過程,牽涉的執行執行緒雖然只有兩個,但實際場景會比上面複雜一些。 首先通過一張簡單類圖(只列出簡單的依賴關係)看一下Timer暴露的介面。   為了演示Timer所暴露的介面,下面舉一個極端的例子(每一個介面方法面

Netflix Eureka原始碼分析13——eureka server的登錄檔多級快取過期機制:主動過期+定時過期+被動過期

(1)主動過期 readWriteCacheMap,讀寫快取 有新的服務例項發生註冊、下線、故障的時候,就會去重新整理readWriteCacheMap 比如說現在有一個服務A,ServiceA,有一個新的服務例項,Instance010來註冊了,註冊完了之後,其實必須