1. 程式人生 > >《深入理解Spark-核心思想與原始碼分析》讀書筆記(1)

《深入理解Spark-核心思想與原始碼分析》讀書筆記(1)

前兩章

第一章主要是講如何安裝和配置spark,以及如何匯入spark原始碼除錯執行;第二章主要講的是上次那本書《Spark快速大資料分析》的內容,科普一下spark的知識。

第三章 SparkContext的初始化

1. 概述

這章的主要內容就是講解SparkContext的初始化。SparkContext就是所有Spark應用基礎環境而配置Spark任務則是由SparkConf來完成。SparkContext的初始化一共有以下幾步
1)建立 Spark 執行環境 SparkEnv;
2)建立 RDD 清理器 metadataCleaner;
3)建立並初始化 Spark UI;
4)Hadoop 相關配置及 Executor 環境變數的設定;
5)建立任務排程 TaskScheduler;
6)建立和啟動 DAGScheduler;
7)TaskScheduler 的啟動;
8)初始化塊管理器 BlockManager(BlockManager 是儲存體系的主要元件之一,將在第 4章介紹);
9)啟動測量系統 MetricsSystem;
10)建立和啟動 Executor 分配管理器 ExecutorAllocationManager;
11)ContextCleaner 的建立與啟動;
12)Spark 環境更新;
13)建立 DAGSchedulerSource 和 BlockManagerSource;
14)將 SparkContext 標記為啟用。
SparkContext構造器的引數就是SparkConf

    class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient

2. 建立執行環境SparkEnv

SparkEnv包含眾多和Executor(執行器)相關的物件。Executor就是Worker(工作節點)的一個程序。包括以下內容
1)建立安全管理器 SecurityManager;
2)建立基於 Akka 的分散式訊息系統 ActorSystem;
3)建立 Map 任務輸出跟蹤器 mapOutputTracker;
4)例項化 ShuffleManager;
5)建立 ShuffleMemoryManager;
6)建立塊傳輸服務 BlockTransferService;
7)建立 BlockManagerMaster;
8)建立塊管理器 BlockManager;
9)建立廣播管理器 BroadcastManager;
10)建立快取管理器 CacheManager;
11)建立 HTTP 檔案伺服器 HttpFileServer;
12)建立測量系統 MetricsSystem;
13)建立 SparkEnv。

2.1 安全管理器SecurityManager
用來管理系統的口令

            //Set our own authenticator鑑別器 to properly專有的 negotiate協商
            //userpassword for HTTP connections. This is needed by the HTTP client
            //fetching from the HttpServer. Put here so its only set once.
              if (authOn) {
                Authenticator.setDefault(
                  new Authenticator() {
                override def
getPasswordAuthentication():
PasswordAuthentication = { var passAuth: PasswordAuthentication = null val userInfo = getRequestingURL().getUserInfo() if (userInfo != null) { val parts = userInfo.split(":", 2) passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray()) } return passAuth } } ) }

2.2 基於Akka的分散式訊息系統ActorSystem
ActorSystem是Akka提供的用於建立分散式訊息通訊系統的基礎類。SparkEnv使用了AkkaUtils.createActorSystem方法完成,而createActorSystem實際上使用了doCreaterActorSystem來創造ActorSystem。

2.3 map任務輸出跟蹤器mapOutputTracker
跟蹤map階段任務的輸出狀態,便於reduce階段任務獲取地址和中間輸出結果。所以這個mapOutputTracker就是用來管下面這些map和shuffle的,比如知道map輸出block之類的,讓reduce能找得到map的結果。
mapOutputTracker
下面的程式碼是建立MapOutputTrackerMasterActor的。map任務的狀態是由Executor像持有的MapOutputTrackerMasterActor傳送訊息,講map任務狀態同步到mapOutputTracker的mapStatuses和cachedSerializedStatuses。

2.4 例項化ShuffleManager
這個就是用來管理上面那個圖裡的shuffle的

2.5 塊傳輸服務BlockTransferService
獲取遠端節點上的block的,第四章講

2.6 BlockManagerMaster介紹
這個負責對block進行管理,具體操作藉助BlockManagerMasterActor,在初始化之後,建立BlockManager

2.7 建立廣播管理器BroadcastManager
BroadcastManager是用於配置資訊和序列化後的RDD、Job以及ShuffleDEpendency等資訊在本地儲存。

2.8 建立快取管理器CacheManager
用於快取RDDM某個分割槽計算後的中間結果,第四章解釋。

2.9 HTTP檔案伺服器HttpFileServer
提供對檔案的HTTP訪問。開始時要初始化,建立檔案伺服器的根目錄和臨時目錄。建立jar包及其他檔案的檔案目錄。用start()方法啟動,而這個方法用了doStart方法,doStart方法就是各種配置server物件,然後啟動它。


def initialize() {
    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
    fileDir = new File(baseDir, "files")
    jarDir = new File(baseDir, "jars")
    fileDir.mkdir()
    jarDir.mkdir()
    logInfo("HTTP File server directory is " + baseDir)
    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
    httpServer.start()
    serverUri = httpServer.uri
    logDebug("HTTP file server started at: " + serverUri)
}

2.10 建立測量系統MetricsSystem
MetricsSystem是Spark的測量系統,其作用是定期將資料指標從資料來源(source)拉到資料匯(sink)。

2.11 建立SparkEnv
當所有基礎元件準備好後,使用new Spark(……)來建立執行環境SparkEnv。

3. 建立metadataCleaner

metadataCleaner的功能是清楚過期的持久化RDD。

            /**
             * Runs a timer task to periodically定期地 clean up metadata (e.g. old files or hashtable entries)
             */
            private[spark] class MetadataCleaner(
                cleanerType: MetadataCleanerType.MetadataCleanerType,
                cleanupFunc: (Long) => Unit,
                conf: SparkConf)
              extends Logging
            {
              val name = cleanerType.toString

              private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
              private val periodSeconds = math.max(10, delaySeconds / 10)
              private val timer = new Timer(name + " cleanup timer", true)


              private val task = new TimerTask {
                override def run() {
                  try {
                  //就這,定期清理,用cleanupFunc
                    cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
                    logInfo("Ran metadata cleaner for " + name)
                  } catch {
                    case e: Exception => logError("Error running cleanup task for " + name, e)
                  }
                }
              }

              if (delaySeconds > 0) {
                logDebug(
                  "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
                  "and period of " + periodSeconds + " secs")
                timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
              }

              def cancel() {
                timer.cancel()
              }
            }
而clean函式如下

            private[spark] def cleanup(cleanupTime:Long){                       persistentRdds.clearOldValues(cleanupTime)
            }

4. SparkUI詳解

SparkUI
DAGScheduler是主要的產生各種Event的源頭,它將各種SparkListenerEvent傳送到listenerBus的時間佇列中,然後BUS把事件和具體的sparklistener匹配,最終由sparkUI展示。

4.1 listenerBus詳解
由三個部分組成。

  • 事件阻塞佇列
  • 監聽器陣列
  • 事件匹配監聽器執行緒

事件阻塞佇列相當於排隊上車的人,而執行緒就是公交車,不停地拉去排事件阻塞佇列裡的事件與監聽器陣列匹配,然後對事件進行操作。

private val EVENT_QUEUE_CAPACITY = 10000
  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
  private var queueFullErrorMessageLogged = false
  private var started = false
  // A counter that represents the number of events produced and consumed in the queue
  private val eventLock = new Semaphore(0)

  private val listenerThread = new Thread("SparkListenerBus") {
    setDaemon(true)
    override def run(): Unit = Utils.logUncaughtExceptions {
      while (true) {
        eventLock.acquire()
        // Atomically remove and process this event
        LiveListenerBus.this.synchronized {
          val event = eventQueue.poll
          if (event == SparkListenerShutdown) {
            // Get out of the while loop and shutdown the daemon thread
            return
          }
          Option(event).foreach(postToAll)
        }
      }
    }
  }

  def start() {
    if (started) {
      throw new IllegalStateException("Listener bus already started!")
    }
    listenerThread.start()
    started = true
  }
def post(event: SparkListenerEvent) {
    val eventAdded = eventQueue.offer(event)
    if (eventAdded) {
      eventLock.release()
    } else {
      logQueueFullErrorMessage()
    }
  }

  def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }

  def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }

  def stop() {
    if (!started) {
      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
    }
    post(SparkListenerShutdown)
    listenerThread.join()
  }

4.2 構造JobProgressListener
JobProgressListener用來統計Job的資訊和狀態。並在它們發生變化時進行反應。

4.3 SparkUI的建立與初始化
用create()方法加入一些listener,然後initialize()連線tab。再之後就是利用render()方法對頁面進佈局,實現顯示。

5.Hadoop相關配置及Executor環境變數

獲取Hadoop相關配置資訊和對Executor的環境變數進行配置

6.建立任務排程器TaskScheduler

z這個就是用來負責任務的提交,並且請求叢集管理器對任務排程。可以看做是任務排程的客戶端。首先createTaskScheduler要建立TaskSchedulerImpl。
6.1 建立TaskSchedulerImpl

  • 從SparkConf中讀取配置資訊,包括每個任務分配的CPU數,排程模式(分為FAIR和FIFO兩種,預設為FIFO)
  • 建立TaskResultGetter,它的作用是通過執行緒池對Worker上的Executor傳送的Task的執行結果進行處理。
    排程方式最終落實到介面SchedulerBackend上實現

6.2 TaskSchedulerImpl的初始化

1)使 TaskSchedulerImpl 持有 LocalBackend 的引用。
2)建立 Pool,Pool 中快取了排程佇列、排程演算法及 TaskSetManager 集合等資訊。
3)建立 FIFOSchedulableBuilder,FIFOSchedulableBuilder 用來操作 Pool 中的排程佇列。

7.建立和啟動DAGScheduler

DAGScheduler 主要用於在任務正式交給 TaskSchedulerImpl 提交之前做一些準備工作,包 括: 創 建 Job, 將 DAG 中 的 RDD 劃 分 到 不 同 的 Stage, 提 交 Stage, 等 等。
此節留以後詳細說明

8.TashScheduler的啟動

此節留以後詳細說明

9.啟動測量系統MetricsSystem

這個測量系統有三個概念。

  • Instance:指定了誰在使用測量系統
  • Source:指定了從哪裡收集測量資料
  • Sink:指定了往哪裡輸出測量資料

啟動過程包括 1)註冊Source 2)註冊Sinks 3)給Sinks增加Jetty的ServletContextHandler

  private def registerSources() {
    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        val source = Utils.classForName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

給Sinks增加Jetty的ServletContextHandler主要是為了和SparkUI同步,用到了getServletHandler方法,最終生成處理 /metrics/json請求的ServletContextHandler。

10.啟動和建立ExecutorAllocationManager

ExecutorAllocationManager用與對已經分配的Executor進行管理。

  def start(): Unit = {
    listenerBus.addListener(listener)

    val scheduleTask = new Runnable() {
      override def run(): Unit = {
        try {
          schedule()
        } catch {
          case ct: ControlThrowable =>
            throw ct
          case t: Throwable =>
            logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
        }
      }
    }
    executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
  }

其中start方法就將listener加入bus中,通過監聽事件,動態新增、刪除Executor。

11.ContextCleaner的建立和啟動

用於清理那些超出應用範圍的RDD、ShuffleDepency和Broadcast物件。

/** Keep cleaning RDD, shuffle, and broadcast state. */
  private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])

// Synchronize here to avoid being interrupted on stop()
        synchronized {
          reference.map(_.task).foreach { task =>
            logDebug("Got cleaning task " + task)
            referenceBuffer -= reference.get
            task match {
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
              case CleanAccum(accId) =>
                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
              case CleanCheckpoint(rddId) =>
                doCleanCheckpoint(rddId)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }

12.Spark環境更新

在SparkContext的初始化過程中,可能對其環境造成影響,所以需要更新環境,程式碼如下。
postEnviromentUpdate()
postApplicationStart()

13.建立DAGSchedulerSource和BlockManagerSource

private def initDriveMetrics(){
 SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
 SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriveMetrics()

14.將SparkContext標記為啟用

SparkContext.setActiveContext(this,allowMultipleContexts)