今天抽空回顧了一下Spark相關的原始碼,本來想要了解一下Block的管理機制,但是看著看著就回到了SparkContext的建立與使用。正好之前沒有正式的整理過這部分的內容,這次就順帶著回顧一下。
更多內容參考:我的大資料之路
Spark作為目前最流行的大資料計算框架,已經發展了幾個年頭了。版本也從我剛接觸的1.6升級到了2.2.1。由於目前工作使用的是2.2.0,所以這次的分析也就從2.2.0版本入手了。
涉及的內容主要有:
- Standalone模式中的Master與Worker
- client、driver、excutor的關係
下面就按照順序依次介紹一下。
Master與Worker
在最開始程式設計的時候,很少會涉及分散式,因為資料量也不大。後來隨著硬體的發展cpu的瓶頸,開始流行多執行緒程式設計,基於多執行緒來加快處理速度;再後來,衍生出了網格計算、CPU與GPU的異構平行計算以及當時流行的mapreduce分散式計算。但是mapreduce由於儲存以及計算流程的限制,spark開始流行起來。Spark憑藉記憶體計算、強大的DAG回溯能力,快速的佔領平行計算的風口。
那麼平行計算肯定是需要分散式叢集的,常見的叢集管理方式,有Master-Slave模式、P2P模式等等。
比如Mysql的主從複製,就是Master-Slave模式;Elasticsearch的分片管理就是P2P模式。在Spark中有不同的部署方式,但是計算的模式都是Master-Slave模式,只不過Slave換了名字叫做worker而已。叢集的部署模式如下所示:
流程就是使用者以client的身份向master提交任務,master去worker上面建立執行任務的載體(driver和excutor)。
client、driver、excutor的關係
Master和Worker是伺服器的部署角色,程式從執行上,則分成了client、driver、excutor三種角色。按照模式的不同,client和driver可能是同一個。以2.2.0版本的standalone模式來說,他們三個是獨立的角色。client用於提交程式,初始化一些環境變數;driver用於生成task並追蹤管理task的執行;excutor負責最終task的執行。
原始碼探索
總的流程可以總結為下面的一張圖:
通過檢視原始碼,來看一下
1 SparkContext建立排程器
在建立SparkContext的時候會建立幾個核心的模組:
- DAGScheduler 面向job的排程器
- TaskScheduler 不同的叢集模式,有不同的實現方式,如standalone下的taskschedulerImpl
- SchedulerBackend 不同的叢集模式下,有不同的實現方式,如standalone下的StandaloneSchedulerBackend.負責向master發起註冊
// 建立並啟動排程器
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
...
// 啟動排程器
_taskScheduler.start()
在createTaskSchduler中,根據master的不同,選擇不同的實現方式,主要是在backend的實現上有差異:
master match {
case "local" =>
...
case LOCAL_N_REGEX(threads) =>
...
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
...
case SPARK_REGEX(sparkUrl) =>
// 建立排程器
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
// 建立backend
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
// 把backend注入到schduler中
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
...
case masterUrl =>
...
}
我們這裡只看一下standalone模式的建立,就是建立了TaskSchedulerImpl和StandaloneSchedulerBackend的物件,另外初始化了排程器,根據配置選擇排程模式,預設是FIFO:
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
2 TaskSchedulerImpl執行start方法
其實是執行了backend的start()方法
override def start() {
backend.start()
...
}
3 StandaloneSchedulerBackend執行start方法
這部分程式碼比較多,可以簡化的看:
- 封裝command物件
- 封裝appDesc物件
- 建立StandaloneAppClient物件
- 執行start()方法
其中command中包含的那個類,就是excutor的實現類。
override def start() {
//初始化引數
...
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
...
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
// 注意前面建立了一大堆的配置物件,主要就是那個class等資訊
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
...
}
4 發起註冊
核心的程式碼在StanaloneAppClient中,並在start()方法中啟動了一個rpc的服務——ClientEndpoint
override def onStart(): Unit = {
try {
registerWithMaster(1)//發起註冊
} catch {
...
}
}
registerWithMaster採用了非同步傳送請求連線master,只要有一個註冊成功,其他的都會cancel。這裡有時間可以做個小hello world玩玩看。
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
//發起註冊
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
...
masterRef.send(RegisterApplication(appDescription, self))
...
}
5 Master接收到請求執行schedule方法
Master是一個常駐的程序,時刻監聽別人發過來的訊息。剛才client傳送了一個RegisterApplication訊息,忽略前面建立app的內容,直接執行了schedule方法:
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
...
schedule()
}
6 Master傳送launchDriver
傳送lanunchDriver請求
private def schedule(): Unit = {
...
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
...
while (numWorkersVisited < numWorkersAlive && !launched) {
...
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
...
}
...
}
}
startExecutorsOnWorkers()
}
//向worker傳送launchDriver請求
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
...
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
...
}
7 Worker建立DriverRunner
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
8 Master傳送launchExcutor
第6步中最後有一個startExecutorsOnWorkers方法。
private def startExecutorsOnWorkers(): Unit = {
...
for (app <- waitingApps if app.coresLeft > 0) {
...
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
...
for (i <- 1 to numExecutors) {
...
launchExecutor(worker, exec)
...
}
}
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
...
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
...
}
9 Worker建立ExcutorRunner
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
...
} else {
try {
...
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
...
} catch {
...
}
}
至此,Driver和Excutor就啟動起來了.....
之後程式碼是怎麼執行的,就且聽下回分解把!
參考
- SparkContext http://www.cnblogs.com/jcchoiling/p/6427406.html
- spark worker解密:http://www.cnblogs.com/jcchoiling/p/6433196.html
- 2.2.0原始碼
- 《Spark核心機制及效能調優》· 王家林