1. 程式人生 > >Spark1.6-----原始碼解讀之BlockManager的概述

Spark1.6-----原始碼解讀之BlockManager的概述

BlockManager的實現

BlockManager是spark儲存體系中的核心元件,Driver 和Executor都會建立BlockManager。

在SparkEnv 364行會建立BlockManager:

    // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

BlockManager 74行為BlockMange包含的成員:

磁碟塊管理器DiskBlockManager。

TimeStampedHashMap[BlockId, BlockInfo]為快取BlockId和對應的BlockInfo

記憶體儲存MemoryStore

磁碟儲存DiskStore

Shuffle客戶端ShuffleClient 如果有外部的ShuffleClient呼叫外部的ShuffleClient的初始化方法否則為BlockTransferService。看來這個可以自己寫啊。

跟BlockManagerMaster通訊的BlockManagerSlaveEndpoint

壓縮演算法CompressionCodec

非廣播Block清理器metadataCleaner和廣播Block清理器broadcastCleaner(不講解)

  val diskBlockManager = new DiskBlockManager(this, conf)

  private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

  private val futureExecutionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))

  // Actual storage of where blocks are kept
  private var externalBlockStoreInitialized = false
  private[spark] val memoryStore = new MemoryStore(this, memoryManager)
  private[spark] val diskStore = new DiskStore(this, diskBlockManager)
  private[spark] lazy val externalBlockStore: ExternalBlockStore = {
    externalBlockStoreInitialized = true
    new ExternalBlockStore(this, executorId)
  }
  memoryManager.setMemoryStore(memoryStore)

  // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
  // However, since we use this only for reporting and logging, what we actually want here is
  // the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need
  // to revisit whether reporting this value as the "max" is intuitive to the user.
  private val maxMemory = memoryManager.maxStorageMemory

  private[spark]
  val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)

  // Port used by the external shuffle service. In Yarn mode, this may be already be
  // set through the Hadoop configuration as the server is launched in the Yarn NM.
  private val externalShuffleServicePort = {
    val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
    if (tmpPort == 0) {
      // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
      // an open port.  But we still need to tell our spark apps the right port to use.  So
      // only if the yarn config has the port set to 0, we prefer the value in the spark config
      conf.get("spark.shuffle.service.port").toInt
    } else {
      tmpPort
    }
  }

  var blockManagerId: BlockManagerId = _

  // Address of the server that serves this executor's shuffle files. This is either an external
  // service, or just our own Executor's BlockManager.
  private[spark] var shuffleServerId: BlockManagerId = _

  // Client to read other executors' shuffle files. This is either an external service, or just the
  // standard BlockTransferService to directly connect to other Executors.
  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
    new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
      securityManager.isSaslEncryptionEnabled())
  } else {
    blockTransferService
  }

  // Whether to compress broadcast variables that are stored
  private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
  // Whether to compress shuffle output that are stored
  private val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
  // Whether to compress RDD partitions that are stored serialized
  private val compressRdds = conf.getBoolean("spark.rdd.compress", false)
  // Whether to compress shuffle output temporarily spilled to disk
  private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)

  private val slaveEndpoint = rpcEnv.setupEndpoint(
    "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))

  // Pending re-registration action being executed asynchronously or null if none is pending.
  // Accesses should synchronize on asyncReregisterLock.
  private var asyncReregisterTask: Future[Unit] = null
  private val asyncReregisterLock = new Object

  private val metadataCleaner = new MetadataCleaner(
    MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
  private val broadcastCleaner = new MetadataCleaner(
    MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)

  // Field related to peer block managers that are necessary for block replication
  @volatile private var cachedPeers: Seq[BlockManagerId] = _
  private val peerFetchLock = new Object
  private var lastPeerFetchTime = 0L

  /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
   * the initialization of the compression codec until it is first used. The reason is that a Spark
   * program could be using a user-defined codec in a third party jar, which is loaded in
   * Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
   * loaded yet. */
  private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)

BlockManager要生效必須進行初始化

初始化之前先說一下:

①Executor的BlockManager跟Driver的BlockManager進行通訊。

②對BlockManager將進行操作時,如果MemorySotre的記憶體不夠了,就會寫到DiskStore中

③通過訪問遠端節點的Executor的BlockManager中的TransportServer提供的Rpc服務下載或者上傳Block

④遠端節點會訪問本地的Executor的BlockManager的TransportServer的Rpc服務下載或者上傳Block

⑤ShuffleClient包含TransportServer

BlockManager 174行為初始化方法:

 def initialize(appId: String): Unit = {
    //元件初始化
    blockTransferService.init(this)
    shuffleClient.init(appId)
     
    blockManagerId = BlockManagerId(
      executorId, blockTransferService.hostName, blockTransferService.port)
    
    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }
    //向Master註冊自己
    master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

    // Register Executors' configuration with the local shuffle service, if one should exist.
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }
  }