1. 程式人生 > >spark中shuffle的過程------不看你後悔

spark中shuffle的過程------不看你後悔

Spark大會上,所有的演講嘉賓都認為shuffle是最影響效能的地方,但是又無可奈何。之前去百度面試hadoop的時候,也被問到了這個問題,直接回答了不知道。

這篇文章主要是沿著下面幾個問題來開展:

1、shuffle過程的劃分?

2、shuffle的中間結果如何儲存?

3、shuffle的資料如何拉取過來?

Shuffle過程的劃分

Spark的操作模型是基於RDD的,當呼叫RDD的reduceByKey、groupByKey等類似的操作的時候,就需要有shuffle了。再拿出reduceByKey這個來講。

def reduceByKey(func: (V, V) => V, numPartitions: Int)
:
RDD[(K, V)] = { reduceByKey(new HashPartitioner(numPartitions), func) }

reduceByKey的時候,我們可以手動設定reduce的個數,如果不指定的話,就可能不受控制了。

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if
r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } }
View Code

如果不指定reduce個數的話,就按預設的走:

1、如果自定義了分割槽函式partitioner的話,就按你的分割槽函式來走。

2、如果沒有定義,那麼如果設定了 spark.default.parallelism ,就使用雜湊的分割槽方式,reduce個數就是設定的這個值。

3、如果這個也沒設定,那就按照輸入資料的分片的數量來設定。如果是hadoop的輸入資料的話,這個就多了。。。大家可要小心啊。

設定完之後,它會做三件事情,也就是之前講的3次RDD轉換。

//map端先按照key合併一次
val combined = self.mapPartitionsWithContext((context, iter) => {
aggregator.combineValuesByKey(iter, context)
 }, preservesPartitioning = true)
//reduce抓取資料
val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializer)
//合併資料,執行reduce計算
partitioned.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
 }, preservesPartitioning = true)
View Code

1、在第一個MapPartitionsRDD這裡先做一次map端的聚合操作。

2、SHuffledRDD主要是做從這個抓取資料的工作。

3、第二個MapPartitionsRDD把抓取過來的資料再次進行聚合操作。

4、步驟1和步驟3都會涉及到spill的過程。

怎麼做的聚合操作,回去看RDD那章。

Shuffle的中間結果如何儲存

作業提交的時候,DAGScheduler會把Shuffle的過程切分成map和reduce兩個Stage(之前一直被我叫做shuffle前和shuffle後),具體的切分的位置在上圖的虛線處。

map端的任務會作為一個ShuffleMapTask提交,最後在TaskRunner裡面呼叫了它的runTask方法。

override def runTask(context: TaskContext): MapStatus = {
    val numOutputSplits = dep.partitioner.numPartitions
    metrics = Some(context.taskMetrics)

    val blockManager = SparkEnv.get.blockManager
    val shuffleBlockManager = blockManager.shuffleBlockManager
    var shuffle: ShuffleWriterGroup = null
    var success = false

    try {
      // serializer為空的情況呼叫預設的JavaSerializer,也可以通過spark.serializer來設定成別的
      val ser = Serializer.getSerializer(dep.serializer)
      // 例項化Writer,Writer的數量=numOutputSplits=前面我們說的那個reduce的數量
      shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)

      // 遍歷rdd的元素,按照key計算出來它所在的bucketId,然後通過bucketId找到相應的Writer寫入
      for (elem <- rdd.iterator(split, context)) {
        val pair = elem.asInstanceOf[Product2[Any, Any]]
        val bucketId = dep.partitioner.getPartition(pair._1)
        shuffle.writers(bucketId).write(pair)
      }

      // 提交寫入操作. 計算每個bucket block的大小
      var totalBytes = 0L
      var totalTime = 0L
      val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
        writer.commit()
        writer.close()
        val size = writer.fileSegment().length
        totalBytes += size
        totalTime += writer.timeWriting()
        MapOutputTracker.compressSize(size)
      }

      // 更新 shuffle 監控引數.
      val shuffleMetrics = new ShuffleWriteMetrics
      shuffleMetrics.shuffleBytesWritten = totalBytes
      shuffleMetrics.shuffleWriteTime = totalTime
      metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

      success = true
      new MapStatus(blockManager.blockManagerId, compressedSizes)
    } catch { case e: Exception =>
      // 出錯了,取消之前的操作,關閉writer
      if (shuffle != null && shuffle.writers != null) {
        for (writer <- shuffle.writers) {
          writer.revertPartialWrites()
          writer.close()
        }
      }
      throw e
    } finally {
      // 關閉writer
      if (shuffle != null && shuffle.writers != null) {
        try {
          shuffle.releaseWriters(success)
        } catch {
          case e: Exception => logError("Failed to release shuffle writers", e)
        }
      }
      // 執行註冊的回撥函式,一般是做清理工作
      context.executeOnCompleteCallbacks()
    }
  }
View Code

遍歷每一個記錄,通過它的key來確定它的bucketId,再通過這個bucket的writer寫入資料。

下面我們看看ShuffleBlockManager的forMapTask方法吧。

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
    new ShuffleWriterGroup {
      shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
      private val shuffleState = shuffleStates(shuffleId)
      private var fileGroup: ShuffleFileGroup = null

      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
      // 從已有的檔案組裡選檔案,一個bucket一個檔案,即要傳送到同一個reduce的資料寫入到同一個檔案
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
        }
      } else {
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          // 按照blockId來生成檔案,檔案數為map數*reduce數
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
        }
      }
View Code

1、map的中間結果是寫入到本地硬碟的,而不是記憶體。

2、預設是一個map的中間結果檔案是M*R(M=map數量,R=reduce的數量),設定了spark.shuffle.consolidateFiles為true之後是R個檔案,根據bucketId把要分到同一個reduce的結果寫入到一個檔案中。

3、consolidateFiles採用的是一個reduce一個檔案,它還記錄了每個map的寫入起始位置,所以查詢的時候,先通過reduceId查詢到哪個檔案,再同坐mapId查詢索引當中的起始位置offset,長度length=(mapId + 1).offset -(mapId).offset,這樣就可以確定一個FileSegment(file, offset, length)。

4、Finally,儲存結束之後, 返回了一個new MapStatus(blockManager.blockManagerId, compressedSizes),把blockManagerId和block的大小都一起返回。

個人想法,shuffle這塊和hadoop的機制差別不大,tez這樣的引擎會趕上spark的速度呢?還是讓我們拭目以待吧!

Shuffle的資料如何拉取過來

ShuffleMapTask結束之後,最後在DAGScheduler的handleTaskCompletion方法當中。

case smt: ShuffleMapTask =>
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
stage.addOutputLoc(smt.partitionId, status)
}
if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
markStageAsFinished(stage)
if (stage.shuffleDep.isDefined) {
 // 真的map過程才會有這個依賴,reduce過程None
 mapOutputTracker.registerMapOutputs(
     stage.shuffleDep.get.shuffleId,
 stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
 changeEpoch = true)
 }
  clearCacheLocs()
  if (stage.outputLocs.exists(_ == Nil)) {
  // 一些任務失敗了,需要重新提交stage
  submitStage(stage)
   } else {
  // 提交下一批任務			  
   }
}
View Code

1、把結果新增到Stage的outputLocs數組裡,它是按照資料的分割槽Id來儲存對映關係的partitionId->MapStaus。

2、stage結束之後,通過mapOutputTracker的registerMapOutputs方法,把此次shuffle的結果outputLocs記錄到mapOutputTracker裡面。

這個stage結束之後,就到ShuffleRDD運行了,我們看一下它的compute函式。

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)

它是通過ShuffleFetch的fetch方法來抓取的,具體實現在BlockStoreShuffleFetcher裡面。

override def fetch[T](
      shuffleId: Int,
      reduceId: Int,
      context: TaskContext,
      serializer: Serializer)
    : Iterator[T] =
{
    val blockManager = SparkEnv.get.blockManager
    val startTime = System.currentTimeMillis
   // mapOutputTracker也分Master和Worker,Worker向Master請求獲取reduce相關的MapStatus,主要是(BlockManagerId和size)
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
    // 一個BlockManagerId對應多個檔案的大小
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
    for (((address, size), index) <- statuses.zipWithIndex) {
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
    }
    // 構造BlockManagerId 和 BlockId的對映關係,想不到ShffleBlockId的mapId,居然是1,2,3,4的序列...
    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
      case (address, splits) =>
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
    }
    // 名為updateBlock,實際是檢驗函式,每個Block都對應著一個Iterator介面,如果該介面為空,則應該報錯
    def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
      val blockId = blockPair._1
      val blockOption = blockPair._2
      blockOption match {
        case Some(block) => {
          block.asInstanceOf[Iterator[T]]
        }
        case None => {
          blockId match {
            case ShuffleBlockId(shufId, mapId, _) =>
              val address = statuses(mapId.toInt)._1
              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
            case _ =>
              throw new SparkException("Failed to get block " + blockId + ", which is not a shuffle block")
          }
        }
      }
    }
    // 從blockManager獲取reduce所需要的全部block,並新增校驗函式
    val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
    val itr = blockFetcherItr.flatMap(unpackBlock)
    
  val completionIter = CompletionIterator[T, Iterator[T]](itr, {
      // CompelteIterator迭代結束之後,會執行以下這部分程式碼,提交它記錄的各種引數
      val shuffleMetrics = new ShuffleReadMetrics
      shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
      shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
      shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
      shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
      shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
      shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
      context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
    })

    new InterruptibleIterator[T](context, completionIter)
  }
}
View Code

1、MapOutputTrackerWorker向MapOutputTrackerMaster獲取shuffle相關的map結果資訊。

2、把map結果資訊構造成BlockManagerId --> Array(BlockId, size)的對映關係。

3、通過BlockManager的getMultiple批量拉取block。

4、返回一個可遍歷的Iterator介面,並更新相關的監控引數。

我們繼續看getMultiple方法。

  def getMultiple(
      blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
      serializer: Serializer): BlockFetcherIterator = {
    val iter =
      if (conf.getBoolean("spark.shuffle.use.netty", false)) {
        new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
      } else {
        new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
      }

    iter.initialize()
    iter
  }
View Code

分兩種情況處理,分別是netty的和Basic的,Basic的就不講了,就是通過ConnectionManager去指定的BlockManager那裡獲取資料,上一章剛好說了。

我們講一下Netty的吧,這個是需要設定的才能啟用的,不知道效能會不會好一些呢?

看NettyBlockFetcherIterator的initialize方法,再看BasicBlockFetcherIterator的initialize方法,發現Basic的不能同時抓取超過48Mb的資料。

override def initialize() {
// 分開本地請求和遠端請求,返回遠端的FetchRequest
val remoteRequests = splitLocalRemoteBlocks()
// 抓取順序隨機
for (request <- Utils.randomize(remoteRequests)) {
  fetchRequestsSync.put(request)
}
// 預設是開6個執行緒去進行抓取
copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))// 讀取本地的block
getLocalBlocks()
   }
View Code

在NettyBlockFetcherIterator的sendRequest方法裡面,發現它是通過ShuffleCopier來試下的。

val cpier = new ShuffleCopier(blockManager.conf)
      cpier.getBlocks(cmId, req.blocks, putResult)

這塊接下來就是netty的客戶端呼叫的方法了,我對這個不瞭解。在服務端的處理是在DiskBlockManager內部啟動了一個ShuffleSender的服務,最終的業務處理邏輯是在FileServerHandler。

它是通過getBlockLocation返回一個FileSegment,下面這段程式碼是ShuffleBlockManager的getBlockLocation方法。

def getBlockLocation(id: ShuffleBlockId): FileSegment = {
// Search all file groups associated with this shuffle.
val shuffleState = shuffleStates(id.shuffleId)
for (fileGroup <- shuffleState.allFileGroups) {
  val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
  if (segment.isDefined) { return segment.get }
}
throw new IllegalStateException("Failed to find shuffle block: " + id)
  }

獲取的方法前面說了,通過reduceId找到檔案,再通過mapId找到它的起始位置。但是這裡有個疑問了,如果啟用了consolidateFiles,一個reduce的所需資料都在一個檔案裡,是不是就可以把整個檔案一起返回呢,而不是通過N個map過不需要來分多次讀取?還是害怕一次傳送一個大檔案容易失敗?這就不得而知了。

到這裡整個過程就講完了。可以看得出來Shuffle這塊還是做了一些優化的,但是這些引數並沒有啟用,有需要的朋友可以自己啟用一下試試效果。

岑玉海

轉載請註明出處,謝謝!