1. 程式人生 > >《深入理解Spark:核心思想與原始碼分析》——SparkContext的初始化(伯篇)——執行環境與元資料清理器

《深入理解Spark:核心思想與原始碼分析》——SparkContext的初始化(伯篇)——執行環境與元資料清理器

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》

《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》

由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。本文展現第3章第一部分的內容:

第3章 SparkContext的初始化

“道生一,一生二,二生三,三生萬物。”

——《道德經》

n 本章導讀:

  SparkContext的初始化是Driver應用程式提交執行的前提,本章內容以local模式為主,並按照程式碼執行順序講解,這將有助於首次接觸Spark的讀者理解原始碼。讀者朋友如果能邊跟蹤程式碼,邊學習本章內容,也許是快速理解SparkContext初始化過程的便捷途徑。已經熟練使用Spark的開發人員可以選擇跳過本章內容。

  本章將在介紹SparkContext初始化過程的同時,向讀者介紹各個元件的作用,為閱讀後面的章節打好基礎。Spark中的元件很多,就其功能而言涉及到網路通訊、分散式、訊息、儲存、計算、快取、測量、清理、檔案服務、Web UI的方方面面。

3.1 SparkContext概述

  Spark Driver用於提交使用者應用程式,實際可以看作Spark的客戶端。瞭解Spark Driver的初始化,有助於讀者理解使用者應用程式在客戶端的處理過程。

  Spark Driver的初始化始終圍繞著SparkContext的初始化。SparkContext可以算得上是所有Spark應用程式的發動機引擎,轎車要想跑起來,發動機首先要啟動。SparkContext初始化完畢,才能向Spark叢集提交任務。在平坦的公路上,發動機只需以較低的轉速,較低的功率就可以遊刃有餘;在山區,你可能需要一臺能夠提供大功率的發動機,這樣才能滿足你轉山的體驗。這些引數都是通過駕駛員操作油門、檔位等傳送給發動機的,而SparkContext的配置引數則由SparkConf負責,SparkConf就是你的操作面板。

SparkConf的構造很簡單,主要是通過ConcurrentHashMap來維護各種Spark的配置屬性。SparkConf程式碼結構見程式碼清單3-1。Spark的配置屬性都是以“spark.”開頭的字串。

程式碼清單3-1  SparkConf程式碼結構

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
  import SparkConf._
  def this() = this(true)
  private val settings = new ConcurrentHashMap[String, String]()
  if (loadDefaults) {
    // 載入任何以spark.開頭的系統屬性
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      set(key, value)
    }
  }
//其餘程式碼省略

現在開始介紹SparkContext,SparkContext的初始化步驟如下:

1)         建立Spark執行環境SparkEnv;

2)         建立RDD清理器metadataCleaner;

3)         建立並初始化Spark UI;

4)         Hadoop相關配置及Executor環境變數的設定

5)         建立任務排程TaskScheduler;

6)         建立和啟動DAGScheduler;

7)         TaskScheduler的啟動;

8)         初始化塊管理器BlockManager(BlockManager是儲存體系的主要元件之一,將在第4章介紹);

9)         啟動測量系統MetricsSystem;

10)     建立和啟動Executor分配管理器ExecutorAllocationManager;

11)     ContextCleaner的建立與啟動;

12)     Spark環境更新;

13)     建立DAGSchedulerSource和BlockManagerSource;

14)     將SparkContext標記為啟用。

SparkContext的主構造器引數為SparkConf,其實現如下。

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
private val creationSite: CallSite = Utils.getCallSite()
  private val allowMultipleContexts: Boolean =
    config.getBoolean("spark.driver.allowMultipleContexts", false)
  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

上面程式碼中的CallSite儲存了執行緒棧中最靠近棧頂的使用者類及最靠近棧底的Scala或者Spark核心類資訊。Utils.getCallSite的詳細資訊見附錄A。SparkContext預設只有一個例項(由屬性spark.driver.allowMultipleContexts來控制,使用者需要多個SparkContext例項時,可以將其設定為true),方法markPartiallyConstructed用來確保例項的唯一性,並將當前SparkContext標記為正在構建中。

  接下來會對SparkConf進行拷貝,然後對各種配置資訊進行校驗,程式碼如下。

  private[spark] val conf = config.clone()
  conf.validateSettings()

  if (!conf.contains("spark.master")) {
    throw new SparkException("A master URL must be set in your configuration")
  }
  if (!conf.contains("spark.app.name")) {
    throw new SparkException("An application name must be set in your configuration")
  }

從上面校驗的程式碼看到必須指定屬性spark.master 和spark.app.name,否則會丟擲異常,結束初始化過程。spark.master用於設定部署模式,spark.app.name指定應用程式名稱。

3.2 建立執行環境SparkEnv

SparkEnv.createDriverEnv方法有三個引數,conf、isLocal和 listenerBus。

  val isLocal = (master == "local" || master.startsWith("local["))
  private[spark] val listenerBus = new LiveListenerBus
  conf.set("spark.executor.id", "driver")

  private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
  SparkEnv.set(env)

上面程式碼中的conf是對SparkConf的拷貝,isLocal標識是否是單機模式,listenerBus採用監聽器模式維護各類事件的處理,在3.14節會詳細介紹。

SparkEnv的方法createDriverEnv最終呼叫create建立SparkEnv。SparkEnv的構造步驟如下:

1)         建立安全管理器SecurityManager;

2)         建立基於Akka的分散式訊息系統ActorSystem;

3)         建立Map任務輸出跟蹤器mapOutputTracker;

4)         例項化ShuffleManager;

5)         建立ShuffleMemoryManager;

6)         建立塊傳輸服務BlockTransferService;

7)         建立BlockManagerMaster;

8)         建立塊管理器BlockManager;

9)         建立廣播管理器BroadcastManager;

10)     建立快取管理器CacheManager;

11)     建立HTTP檔案伺服器HttpFileServer;

12)     建立測量系統MetricsSystem;

13)     建立SparkEnv;

3.2.1 安全管理器SecurityManager

  SecurityManager主要對許可權、賬號進行設定,如果使用Hadoop YARN作為叢集管理器,則需要使用證書生成 secret key登入,最後給當前系統設定預設的口令認證例項,此例項採用匿名內部類實現,參見程式碼清單3-2。

程式碼清單3-2  SecurityManager的實現

  private val secretKey = generateSecretKey()

  // 使用HTTP連線設定口令認證
  if (authOn) {
    Authenticator.setDefault(
      new Authenticator() {
        override def getPasswordAuthentication(): PasswordAuthentication = {
          var passAuth: PasswordAuthentication = null
          val userInfo = getRequestingURL().getUserInfo()
          if (userInfo != null) {
            val  parts = userInfo.split(":", 2)
            passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
          }
          return passAuth
        }
      }
    )
  }

3.2.2 基於Akka的分散式訊息系統ActorSystem

  ActorSystem是Spark中最基礎的設施,Spark既使用它傳送分散式訊息,又用它實現併發程式設計。怎麼,訊息系統可以實現併發?要解釋清楚這個問題,首先應該簡單的介紹下Scala語言的Actor併發程式設計模型:Scala認為Java執行緒通過共享資料以及通過鎖來維護共享資料的一致性是糟糕的做法,容易引起鎖的爭用,降低併發程式的效能,甚至會引入死鎖的問題。在Scala中只需要自定義型別繼承Actor,並且提供act方法,就如同Java裡實現Runnable介面,需要實現run方法一樣。但是不能直接呼叫act方法,而是通過傳送訊息的方式(Scala傳送訊息是非同步的),傳遞資料。如:

  Actor ! message

  Akka是Actor程式設計模型的高階類庫,類似於JDK 1.5之後越來越豐富的併發工具包,簡化了程式設計師併發程式設計的難度。ActorSystem便是Akka提供的用於建立分散式訊息通訊系統的基礎類。Akka的具體資訊見附錄B。

  正式因為Actor輕量級的併發程式設計、訊息傳送以及ActorSystem支援分散式訊息傳送等特點,Spark選擇了ActorSystem。

  SparkEnv中建立ActorSystem時用到了AkkaUtils工具類,見程式碼清單3-3。AkkaUtils.createActorSystem方法用於啟動ActorSystem,見程式碼清單3-4。AkkaUtils使用了Utils的靜態方法startServiceOnPort, startServiceOnPort最終會回撥方法startService: Int => (T, Int),此處的startService實際是方法doCreateActorSystem。真正啟動ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具體實現細節請見附錄B。Spark的Driver中Akka的預設訪問地址是akka://sparkDriver,Spark的Executor中Akka的預設訪問地址是akka:// sparkExecutor。如果不指定ActorSystem的埠,那麼所有節點的ActorSystem埠在每次啟動時隨機產生。關於startServiceOnPort的實現,請見附錄A。

程式碼清單3-3  ActorSystem的建立和啟動

    val (actorSystem, boundPort) =
      Option(defaultActorSystem) match {
        case Some(as) => (as, port)
        case None =>
          val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
          AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
      }

程式碼清單3-4  ActorSystem的建立和啟動

def createActorSystem(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager): (ActorSystem, Int) = {
    val startService: Int => (ActorSystem, Int) = { actualPort =>
      doCreateActorSystem(name, host, actualPort, conf, securityManager)
    }
    Utils.startServiceOnPort(port, startService, conf, name)
  }

3.2.3 map任務輸出跟蹤器mapOutputTracker

  mapOutputTracker用於跟蹤map階段任務的輸出狀態,此狀態便於reduce階段任務獲取地址及中間輸出結果。每個map任務或者reduce任務都會有其唯一標識,分別為mapId和reduceId。每個reduce任務的輸入可能是多個map任務的輸出,reduce會到各個map任務的所在節點上拉取Block,這一過程叫做shuffle。每批shuffle過程都有唯一的標識shuffleId。

  這裡先介紹下MapOutputTrackerMaster。MapOutputTrackerMaster內部使用mapStatuses:TimeStampedHashMap[Int, Array[MapStatus]]來維護跟蹤各個map任務的輸出狀態。其中key對應shuffleId,Array儲存各個map任務對應的狀態資訊MapStatus。由於MapStatus維護了map輸出Block的地址BlockManagerId,所以reduce任務知道從何處獲取map任務的中間輸出。MapOutputTrackerMaster還使用cachedSerializedStatuses:TimeStampedHashMap[Int, Array[Byte]]維護序列化後的各個map任務的輸出狀態。其中key對應shuffleId,Array儲存各個序列化MapStatus生成的位元組陣列。

  Driver和Executor處理MapOutputTrackerMaster的方式有所不同:

如果當前應用程式是Driver,則建立MapOutputTrackerMaster,然後建立MapOutputTrackerMasterActor,並且註冊到ActorSystem中。

如果當前應用程式是Executor,則建立MapOutputTrackerWorker,並從ActorSystem中找到MapOutputTrackerMasterActor。

無論是Driver還是Executor,最後都由mapOutputTracker的屬性trackerActor持有MapOutputTrackerMasterActor的引用,參見程式碼清單3-5。

程式碼清單3-5  registerOrLookup方法用於查詢或者註冊Actor的實現

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Props(newActor), name = name)
      } else {
        AkkaUtils.makeDriverRef(name, conf, actorSystem)
      }
    }

    val mapOutputTracker =  if (isDriver) {
      new MapOutputTrackerMaster(conf)
    } else {
      new MapOutputTrackerWorker(conf)
}

    mapOutputTracker.trackerActor = registerOrLookup(
      "MapOutputTracker",
      new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

在後面章節大家會知道map任務的狀態正是由Executor向持有的MapOutputTrackerMasterActor傳送訊息,將map任務狀態同步到mapOutputTracker的mapStatuses和cachedSerializedStatuses的。Executor究竟是如何找到MapOutputTrackerMasterActor的?registerOrLookup方法通過呼叫AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,實際正是利用ActorSystem提供的分散式訊息機制實現的,具體細節參見附錄B。這裡第一次使用到了Akka提供的功能,以後大家會漸漸感覺到使用Akka的便捷。

3.2.4 例項化ShuffleManager

  ShuffleManager負責管理本地及遠端的block資料的shuffle操作。ShuffleManager預設為通過反射方式生成的SortShuffleManager的例項,可以指定屬性spark.shuffle.manager來顯示控制使用HashShuffleManager。SortShuffleManager通過持有的IndexShuffleBlockManager間接操作BlockManager中的DiskBlockManager將map結果寫入本地,並根據shuffleId、mapId寫入索引檔案,也能通過MapOutputTrackerMaster中維護的mapStatuses從本地或者其他遠端節點讀取檔案。有讀者可能會問,為什麼需要shuffle?Spark作為平行計算框架,同一個作業會被劃分為多個任務在多個節點上並行執行,reduce的輸入可能存在於多個節點上,因此需要通過“洗牌”將所有reduce的輸入彙總起來,這個過程就是shuffle。這個問題以及對ShuffleManager的具體使用會在第5章和第6章詳述。ShuffleManager的例項化見程式碼清單3-6。程式碼清單3-6最後建立的ShuffleMemoryManager,將在3.2.5節介紹。

程式碼清單3-6  ShuffleManager的例項化及ShuffleMemoryManager的建立

    val shortShuffleMgrNames = Map(
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass = shortShuffleMgrNames.get
OrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

    val shuffleMemoryManager = new ShuffleMemoryManager(conf)

3.2.5 shuffle執行緒記憶體管理器ShuffleMemoryManager

  ShuffleMemoryManager負責管理shuffle執行緒佔有記憶體的分配與釋放,並通過threadMemory:mutable.HashMap[Long, Long]快取每個執行緒的記憶體位元組數,見程式碼清單3-7。

程式碼清單3-7  ShuffleMemoryManager的資料結構

private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
  private val threadMemory = new mutable.HashMap[Long, Long]()  // threadId -> memory bytes
  def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))

getMaxMemory方法用於獲取shuffle所有執行緒佔用的最大記憶體,實現如下。

def getMaxMemory(conf: SparkConf): Long = {
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
  }

從上面程式碼可以看出,shuffle所有執行緒佔用的最大記憶體的計算公式為:

Java執行時最大記憶體 * Spark的shuffle最大記憶體佔比 * Spark的安全記憶體佔比

可以配置屬性spark.shuffle.memoryFraction修改Spark的shuffle最大記憶體佔比,配置屬性spark.shuffle.safetyFraction修改Spark的安全記憶體佔比。

注意:ShuffleMemoryManager通常執行在Executor中, Driver中的ShuffleMemoryManager 只有在local模式下才起作用。

3.2.6 塊傳輸服務BlockTransferService

  BlockTransferService預設為NettyBlockTransferService(可以配置屬性spark.shuffle.blockTransferService使用NioBlockTransferService),它使用Netty提供的非同步事件驅動的網路應用框架,提供web服務及客戶端,獲取遠端節點上Block的集合。

val blockTransferService =
      conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
        case "netty" =>
          new NettyBlockTransferService(conf, securityManager, numUsableCores)
        case "nio" =>
          new NioBlockTransferService(conf, securityManager)
      }

NettyBlockTransferService的具體實現將在第4章詳細介紹。這裡大家可能覺得奇怪,這樣的網路應用為何也要放在儲存體系?大家不妨先帶著疑問,直到你真正瞭解儲存體系。

3.2.7 BlockManagerMaster介紹

BlockManagerMaster負責對Block的管理和協調,具體操作依賴於BlockManagerMasterActor。Driver和Executor處理BlockManagerMaster的方式不同:

如果當前應用程式是Driver,則建立BlockManagerMasterActor,並且註冊到ActorSystem中。

如果當前應用程式是Executor,則從ActorSystem中找到BlockManagerMasterActor。

無論是Driver還是Executor,最後BlockManagerMaster的屬性driverActor將持有對BlockManagerMasterActor的引用。BlockManagerMaster的建立程式碼如下。

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

registerOrLookup已在3.2.3節介紹過了,不再贅述。BlockManagerMaster及BlockManagerMasterActor的具體實現將在第4章詳細介紹。

3.2.8 建立塊管理器BlockManager

  BlockManager負責對Block的管理,只有在BlockManager的初始化方法initialize被呼叫後,它才是有效的。BlockManager作為儲存系統的一部分,具體實現見第4章。BlockManager的建立程式碼如下。

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
      serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
      numUsableCores)

3.2.9 建立廣播管理器BroadcastManager

  BroadcastManager用於將配置資訊和序列化後的RDD、Job以及ShuffleDependency等資訊在本地儲存。如果為了容災,也會複製到其他節點上。建立BroadcastManager的程式碼實現如下。

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

BroadcastManager必須在其初始化方法initialize被呼叫後,才能生效。Initialize方法實際利用反射生成廣播工廠例項broadcastFactory(可以配置屬性spark.broadcast.factory指定,預設為org.apache.spark.broadcast.TorrentBroadcastFactory)。BroadcastManager的廣播方法newBroadcast實際代理了工廠broadcastFactory的newBroadcast方法來生成廣播或者非廣播物件。BroadcastManager的Initialize及newBroadcast方法見程式碼清單3-8。

程式碼清單3-8  BroadcastManager的實現

  private def initialize() {
    synchronized {
      if (!initialized) {
        val broadcastFactoryClass = conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
        broadcastFactory =
          Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
        broadcastFactory.initialize(isDriver, conf, securityManager)
        initialized = true
      }
    }
  }

  private val nextBroadcastId = new AtomicLong(0)

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  }
}

3.2.10 建立快取管理器CacheManager

  CacheManager用於快取RDD某個分割槽計算後中間結果,快取計算結果發生在迭代計算的時候,將在6.1節講到。而CacheManager將在4.14節詳細描述。建立CacheManager的程式碼如下。

val cacheManager = new CacheManager(blockManager)

3.2.11 HTTP檔案伺服器HttpFileServer

參見程式碼清單3-9。HttpFileServer主要提供對jar及其他檔案的http訪問,這些jar包包括使用者上傳的jar包。埠由屬性spark.fileserver.port配置,預設為0,表示隨機生成埠號。

程式碼清單3-9  HttpFileServer的建立

  val httpFileServer =
      if (isDriver) {
        val fileServerPort = conf.getInt("spark.fileserver.port", 0)
        val server = new HttpFileServer(conf, securityManager, fileServerPort)
        server.initialize()
        conf.set("spark.fileserver.uri",  server.serverUri)
        server
      } else {
        null
      }

HttpFileServer的初始化過程,見程式碼清單3-10,主要包括以下步驟:

1)         使用Utils工具類建立檔案伺服器的根目錄及臨時目錄(臨時目錄在執行時環境關閉時會刪除)。Utils工具的詳細介紹,見附錄A。

2)         建立存放jar包及其他檔案的檔案目錄。

3)         建立並啟動HTTP服務。

程式碼清單3-10         HttpFileServer的初始化

  def initialize() {
    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
    fileDir = new File(baseDir, "files")
    jarDir = new File(baseDir, "jars")
    fileDir.mkdir()
    jarDir.mkdir()
    logInfo("HTTP File server directory is " + baseDir)
    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
    httpServer.start()
    serverUri = httpServer.uri
    logDebug("HTTP file server started at: " + serverUri)
  }

HttpServer的構造和start方法的實現中,再次使用了Utils的靜態方法startServiceOnPort,因此會回撥doStart方法,見程式碼清單3-11。有關jetty的API使用參見附錄C。

程式碼清單3-11         HttpServer的啟動

  def start() {
    if (server != null) {
      throw new ServerStateException("Server is already started")
    } else {
      logInfo("Starting HTTP Server")
      val (actualServer, actualPort) =
        Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
      server = actualServer
      port = actualPort
    }
  }

doStart方法中啟動內嵌的jetty所提供的HTTP服務,見程式碼清單3-12。

程式碼清單3-12         HttpServer的啟動功能實現

  private def doStart(startPort: Int): (Server, Int) = {
    val server = new Server()
    val connector = new SocketConnector
    connector.setMaxIdleTime(60 * 1000)
    connector.setSoLingerTime(-1)
    connector.setPort(startPort)
    server.addConnector(connector)

    val threadPool = new QueuedThreadPool
    threadPool.setDaemon(true)
    server.setThreadPool(threadPool)
    val resHandler = new ResourceHandler
    resHandler.setResourceBase(resourceBase.getAbsolutePath)

    val handlerList = new HandlerList
    handlerList.setHandlers(Array(resHandler, new DefaultHandler))

    if (securityManager.isAuthenticationEnabled()) {
      logDebug("HttpServer is using security")
      val sh = setupSecurityHandler(securityManager)
      // make sure we go through security handler to get resources
      sh.setHandler(handlerList)
      server.setHandler(sh)
    } else {
      logDebug("HttpServer is not using security")
      server.setHandler(handlerList)
    }

    server.start()
    val actualPort = server.getConnectors()(0).getLocalPort

    (server, actualPort)
  }

3.2.12 建立測量系統MetricsSystem

  MetricsSystem是Spark的測量系統,建立MetricsSystem的程式碼如下。

  val metricsSystem = if (isDriver) {
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }

上面呼叫的createMetricsSystem方法實際建立了MetricsSystem,程式碼如下。

  def createMetricsSystem(
      instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
    new MetricsSystem(instance, conf, securityMgr)
  }

構造MetricsSystem的過程最重要的是呼叫了MetricsConfig的initialize方法,見程式碼清單3-13。

程式碼清單3-13         MetricsConfig的初始化

def initialize() {
    setDefaultProperties(properties)

    var is: InputStream = null
    try {
      is = configFile match {
        case Some(f) => new FileInputStream(f)
        case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
      }

      if (is != null) {
        properties.load(is)
      }
    } catch {
      case e: Exception => logError("Error loading configure file", e)
    } finally {
      if (is != null) is.close()
    }

    propertyCategories = subProperties(properties, INSTANCE_REGEX)
    if (propertyCategories.contains(DEFAULT_PREFIX)) {
      import scala.collection.JavaConversions._

      val defaultProperty = propertyCategories(DEFAULT_PREFIX)
      for { (inst, prop) <- propertyCategories
            if (inst != DEFAULT_PREFIX)
            (k, v) <- defaultProperty
            if (prop.getProperty(k) == null) } {
        prop.setProperty(k, v)
      }
    }
  }

從以上實現可以看出,MetricsConfig的initialize方法主要負責載入metrics.properties檔案中的屬性配置,並對屬性進行初始化轉換。

例如:將屬性

{*.sink.servlet.path=/metrics/json, applications.sink.servlet.path=/metrics/applications/json, *.sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, master.sink.servlet.path=/metrics/master/json}

轉換為

Map(applications -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/applications/json}, master -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/master/json}, * -> {sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json})

3.2.13 建立SparkEnv

  當所有的基礎元件準備好後,最終使用下面的程式碼建立執行環境SparkEnv。

new SparkEnv(executorId, actorSystem, serializer, closureSerializer, cacheManager,
      mapOutputTracker, shuffleManager, broadcastManager, blockTransferService,
 blockManager, securityManager, httpFileServer, sparkFilesDir, 
metricsSystem, shuffleMemoryManager, conf)

注意:serializer和closureSerializer都是使用Class.forName反射生成的org.apache.spark.serializer.JavaSerializer類的例項,其中closureSerializer例項特別用來對Scala中的閉包進行序列化。

3.3 建立metadataCleaner

  SparkContext為了保持對所有持久化的RDD的跟蹤,使用型別是TimeStampedWeakValueHashMap的persistentRdds快取。metadataCleaner的功能是清除過期的持久化RDD。建立metadataCleaner的程式碼如下。

  private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
  private[spark] val metadataCleaner =
    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

我們仔細看看MetadataCleaner的實現,見程式碼清單3-14。

程式碼清單3-14         MetadataCleaner的實現

private[spark] class MetadataCleaner(
    cleanerType: MetadataCleanerType.MetadataCleanerType,
    cleanupFunc: (Long) => Unit,
    conf: SparkConf)
  extends Logging
{
  val name = cleanerType.toString

  private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
  private val periodSeconds = math.max(10, delaySeconds / 10)
  private val timer = new Timer(name + " cleanup timer", true)

  private val task = new TimerTask {
    override def run() {
      try {
        cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
        logInfo("Ran metadata cleaner for " + name)
      } catch {
        case e: Exception => logError("Error running cleanup task for " + name, e)
      }
    }
  }

  if (delaySeconds > 0) {
    timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
  }

  def cancel() {
    timer.cancel()
  }
}

從MetadataCleaner的實現可以看出其實質是一個用TimerTask實現的定時器,不斷呼叫cleanupFunc: (Long) => Unit這樣的函式引數。構造metadataCleaner時的函式引數是cleanup,用於清理persistentRdds中的過期內容,程式碼如下。

  private[spark] def cleanup(cleanupTime: Long) {
    persistentRdds.clearOldValues(cleanupTime)
  }

 未完待續。。。

後記:自己犧牲了7個月的週末和下班空閒時間,通過研究Spark原始碼和原理,總結整理的《深入理解Spark:核心思想與原始碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、噹噹、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究原始碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流程,Spark自身的版本迭代也很快,如今最新已經是1.6.0。目前市面上另外2本原始碼研究的Spark書籍的版本分別是0.9.0版本和1.2.0版本,看來這些書的作者都與我一樣,遇到了這種問題。由於研究和出版都需要時間,所以不能及時跟上Spark的腳步,還請大家見諒。但是Spark核心部分的變化相對還是很少的,如果對版本不是過於追求,依然可以選擇本書。

相關推薦

深入理解Spark核心思想原始碼分析》——SparkContext初始——執行環境資料清理

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。本文展現第3章第一部分的內容: 第3章

深入理解Spark核心思想原始碼分析第2章

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 本文主要展示本書的第2章內容: Spark設計理念與基本架構 “若夫乘天地之正,而御六氣之辯,以遊無窮者,彼且惡乎待哉?” ——《莊子·逍遙遊》 n本章導讀: 上一章,介紹了Spark環境的搭建,為方便讀

深入理解SPARK核心思想原始碼分析》一書正式出版上市

自己犧牲了7個月的週末和下班空閒時間,通過研究Spark原始碼和原理,總結整理的《深入理解Spark:核心思想與原始碼分析》一書現在已經正式出版上市,目前京東、噹噹、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究原始碼時的Spark版本是1.2.0,經過7個多月的研

深入理解Spark核心思想原始碼分析前言及第1章

  自己犧牲了7個月的週末和下班空閒時間,通過研究Spark原始碼和原理,總結整理的《深入理解Spark:核心思想與原始碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、噹噹、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究原始碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流

深入理解SPARK核心思想原始碼分析》——SparkContext初始——SparkUI、環境變數及排程

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。 本文展現第3章第二部分的內容:

精盡MyBatis原始碼分析 - MyBatis初始之載入 Mapper 介面 XML 對映檔案

> 該系列文件是本人在學習 Mybatis 的原始碼過程中總結下來的,可能對讀者不太友好,請結合我的原始碼註釋([Mybatis原始碼分析 GitHub 地址](https://github.com/liu844869663/mybatis-3)、[Mybatis-Spring 原始碼分析 GitHub 地址

SpringMVC原始碼分析--容器初始FrameworkServlet

一下SpringMVC配置檔案的地址contextConfigLocation的配置屬性,然後其呼叫的子類FrameworkServlet的initServletBean方法。 其實FrameworkServlet是springMVC初始化IOC容器的核心,通過讀取配置的c

springMVC原始碼分析--容器初始ContextLoaderListener

在spring Web中,需要初始化IOC容器,用於存放我們注入的各種物件。當tomcat啟動時首先會初始化一個web對應的IOC容器,用於初始化和注入各種我們在web執行過程中需要的物件。當tomcat啟動的時候是如何初始化IOC容器的,我們先看一下在web.xml中經常看

SpringMVC原始碼分析--容器初始DispatcherServlet

上一篇部落格SpringMVC原始碼分析--容器初始化(四)FrameworkServlet我們已經瞭解到了SpringMVC容器的初始化,SpringMVC對容器初始化後會進行一系列的其他屬性的初始化操作,在SpringMVC初始化完成之後會呼叫onRefresh(wac

SpringMVC原始碼分析--容器初始HttpServletBean

在上一篇部落格 springMVC原始碼分析--容器初始化(二)DispatcherServlet中,我們隊SpringMVC整體生命週期有一個簡單的說明,並沒有進行詳細的原始碼分析,接下來我們會根據部落格中提供的springMVC的生命週期圖來詳細的對SpringMVC的

精盡 MyBatis 原始碼分析 - MyBatis 初始之載入 mybatis-config.xml

> 該系列文件是本人在學習 Mybatis 的原始碼過程中總結下來的,可能對讀者不太友好,請結合我的原始碼註釋([Mybatis原始碼分析 GitHub 地址](https://github.com/liu844869663/mybatis-3)、[Mybatis-Spring 原始碼分析 GitHub 地址

精盡MyBatis原始碼分析 - MyBatis初始之 SQL 初始

> 該系列文件是本人在學習 Mybatis 的原始碼過程中總結下來的,可能對讀者不太友好,請結合我的原始碼註釋([Mybatis原始碼分析 GitHub 地址](https://github.com/liu844869663/mybatis-3)、[Mybatis-Spring 原始碼分析 GitHub 地址

深入理解Spark-核心思想原始碼分析》讀書筆記1

前兩章 第一章主要是講如何安裝和配置spark,以及如何匯入spark原始碼除錯執行;第二章主要講的是上次那本書《Spark快速大資料分析》的內容,科普一下spark的知識。 第三章 SparkContext的初始化 1. 概述 這章的主要內容就

Linux核心原始碼分析--系統時間初始kernel_mktime()函式

        從boot檔案中的幾個彙編程式執行後跳轉到init檔案中的main.c程式開始繼續執行,該main.c函式式為系統執行的環境進行初始化的。首先來看系統時間的初始化(因為系統時間的初始化開始程式就在init檔案中),其中主要還是由kernel中的mktime.

資料結構演算法分析c語言描述Mark Allen--佇列ADT連結串列實現

佇列ADT連結串列實現 使用連結串列儲存 操作集合 入隊 出隊 初始化 返回隊前元素 列印 #include <stdio.h> #includ

資料結構演算法分析c語言描述Mark Allen--迴圈佇列ADT陣列實現

迴圈佇列ADT陣列實現 使用陣列儲存 操作集合 入隊 出隊 清空 初始化 返回隊前元素 列印 重點注意! 對於一個迴圈佇列 front == rear時候佇列

Java靜態變數初始及建構函式的執行順序執行時機分析

    對於Java初學者來說,關於靜態變數、非靜態變數的初始化時機與順序,以及建構函式的執行時機與順序都會感覺有點理不清頭緒,下面文章使用例項程式幫大家解決這方面的疑惑。雖然簡單,但是對Java入門者來說還是有一定的幫助作用。    

Uboot啟動過程原始碼分析之第一階段硬體相關

從上一個部落格知道uboot的入口點在 cpu/arm920t/start.s 開啟cpu/arm920t/start.s 跳轉到reset reset: /* * set the cpu to SVC32 mode// CUP設定為管理模式 */ mrs r0,cps

資料結構演算法分析c語言描述Mark Allen--線性錶鏈表方法實現

線性表--連結串列實現 標頭檔案 #define ElementType int #define INF INT_MAX #ifndef _List_H struct Node; typedef struct Node *PtrToNode; typedef PtrToN

資料結構演算法分析c語言描述Mark Allen--多項式ADT陣列實現

多項式ADT陣列實現 使用陣列進行儲存 操作集合 乘法 加法 標頭檔案 //cpp head file PloynomialADTarray.h #define MaxDegree 1000 typedef struct Pol { int C