Spark工作機制以及API詳解

本篇文章將會承接上篇關於如何部署Spark分散式叢集的部落格,會先對RDD程式設計中常見的API進行一個整理,接著再結合原始碼以及註釋詳細地解讀spark的作業提交流程,排程機制以及shuffle的過程,廢話不多說,我們直接開始吧!

1. Spark基本API解讀

首先我們寫一段簡單的進行單詞統計的程式碼,考察其中出現的API,然後做出整理:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/*
    使用java實現word count
 */
public class WCJava1 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("wcjava");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //載入檔案
        JavaRDD<String> rdd1 = jsc.textFile("hdfs://mycluster/wc.txt");
        //壓扁
        JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        //map
        JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        List<Tuple2<String, Integer>> list = rdd4.collect();
        for (Tuple2<String, Integer> stringIntegerTuple2 : list) {
            System.out.println(stringIntegerTuple2);
        }
    }
}

從語法角度來說,scala版的程式碼比java要簡潔的多,但從效能角度來說,java雖然囉嗦,卻優於scala,上述程式碼中,使用到了一些在做RDD變換中經常會用到的API,事實上,這些API可以參考apache的官方文件,可見網址:http://spark.apache.org/docs/latest/rdd-programming-guide.html

1.1 Spark core模組核心概念介紹

RDD:彈性分散式資料集(resilient distributed dataset)

RDD是Spark程式設計中的一個核心概念,不同於Hadoop的儲存在Datanode中的真實資料集,RDD是一個邏輯上的概念,並沒有真實的資料,RDD物件一旦創建出來,就是不可變的了,而“分散式”也就意味著這些資料是被分成不同的範圍執行平行計算的,下面我們摘錄一段scala版RDD的官方文件註釋,進行一下解讀:

Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)

內部來說,每個RDD都由五個主要特性組成:

1. 分割槽列表:指的是分割槽物件中包含的資料的範圍

2. 計算每個切片的函式:指的是operator,也就是運算元,或者稱之為演算法

3. 和其他RDD的依賴列表:主要有寬依賴以及窄依賴兩種

4. (可選的)KV型別RDD的分割槽器

5. (可選的)計算每個切片的首選位置列表

Task

任務是Spark中最小的執行單位,RDD的每個分割槽對應一個task,而每個任務執行在節點上的執行緒中

依賴(Dependency)

之前說過,依賴分為兩種,寬依賴以及窄依賴,

窄依賴:子RDD的每個分割槽依賴於父RDD的少量分割槽(不一定是隻有一個)

NarrowDependency:本身是一個抽象類,有三個實現子類,OneToOneDependency,PruneDependency以及RangeDependency

寬依賴:子RDD的每個分割槽依賴於父RDD的所有分割槽

ShuffleDependency:是Dependency抽象類的實現子類,可以將其稱之為shuffle依賴或是寬依賴

1.2 RDD API基本特性總結

關於groupByKey,reduceByKey等聚合方法的總結:

檢視groupByKey的scala原始碼:

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

可以看到,很多帶有"ByKey"的方法底層都是通過呼叫combineByKeyWithClassTag方法來實現的,而在groupByKey方法體中我們發現它將mapSideCombine=false,因此該方法並沒有map端的聚合

關於基本聚合方法aggregateByKey方法的解讀:

參考該方法的原始碼:

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
    val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
    val zeroArray = new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)

    lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
    val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))

    // We will clean the combiner closure later in `combineByKey`
    val cleanedSeqOp = self.context.clean(seqOp)
    combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
      cleanedSeqOp, combOp, partitioner)
  }
def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

很明顯,該函式使用的是一個柯里化過程,第一個括號內由使用者指定一個zeroValue,即零值U,該值可以是任意的資料型別,在seqOp中使用的變換過程是 ( U , V ) => U,在同一個分割槽中,該零值與value進行某種聚合操作,因此能夠將value的資料型別改變成U的資料型別,而combOp則是對分割槽之間各自計算出來的U值進行聚合操作,然後聚合成最終的U,即 ( U , U ) => U,該函式的優點有這麼幾個:

1. 使用了map端聚合,因此可以提高計算效率

2. 可以改變value的資料型別,並且可由使用者自行指定聚合規則,非常的靈活

那麼,下面我們就來進行一次實戰演練,寫一個Demo運用一下該方法吧!

import org.apache.spark.{SparkConf, SparkContext}

/*
    寫一個使用aggregateByKey方法做word count的demo
 */
object AggScala {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local")
        conf.setAppName("aggscala")
        val sc = new SparkContext(conf)
        //設定zeroValue
        val zeroValue = ""
        def seqOp(U:String, V:Int) : String = {
            U + V + ","
        }
        def combOp(U1:String, U2:String) : String = {
            U1 + ":" + U2
        }
        sc.textFile("hdfs://mycluster/wc.txt",2).flatMap(e => e.split(" "))
            .map(e => (e, 1))
            .aggregateByKey(zeroValue)(seqOp,combOp)
            .collect
            .foreach(println(_))
    }
}

關於countByKey為何是action的疑問

我們來一起看一下countByKey的原始碼:

def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }

檢視原始碼,我們可以發現,countByKey是將每一個value計數成1,然後呼叫的是reduceByKey方法,最後再呼叫collect方法和map方法,因此觸發了action操作

2. Spark工作流程詳解

2.1 Spark作業提交流程圖

當使用spark-submit命令進行作業提交時,有兩種方式,在client上或是在cluster上提交,上圖描述的是在叢集上進行作業提交的基本流程,一共可分為10步

1. 客戶端向主節點請求註冊APP

2. 主節點APP註冊成功,向客戶端傳送一條APP已經註冊的回執

3. Master聯絡三個worker程序中的其中一個(上圖中是worker-3)傳送請求讓該節點啟動驅動

4. worker-3得到了從Master節點發送過來的啟動驅動的請求,開始正式啟動驅動,因此起來了一個叫做DriverWrappers的子程序

5. Master繼續聯絡剩餘的兩個節點work-1以及worker-2併發送啟動執行器即executor的請求

6. 兩個節點呼叫startProcess方法準備開始啟動執行器

7. 執行器向驅動傳送註冊執行器的請求

8. 驅動完成註冊告知執行器可以執行任務了,此時在work-1以及work-2節點上將會出現粗粒度執行器後臺程序,即CoarseGrainedExecutorBakend程序

9. 執行器得到來自驅動的訊息並對資料進行解碼,翻譯成任務描述符TaskDesc

10. 執行器正式開始執行任務

2.2 三級排程框架

DagScheduler
  direct acycle graph,有向無環圖排程器 ,
  計算階段並提交階段,最終是將階段換算成task集合(每個分割槽對應一個任務),
  然後將task集合交給下層排程器(taskScheduler)


TaskScheduler
  接受Dag排程器的task集合。
  Spark給出TaskScheduler介面,只有一個實現TaskSchedulerImpl

 

SchedulerBackend
  完成底層通訊,這三中實現子類實際上就分別對應了幾種不同的spark叢集模式,spark的叢集一共有5種模式:local,standalone,yarn,mesos以及k8s
  LocalSchedulerBackend
  CoarseGrainedSchedulerBackend
            ^
            |---StandaloneSchedulerBackend

本篇部落格不會對這三種排程器做全部講解,而主要關注於第一種DagScheduler,特別是關於該排程器是如何劃分階段(stage)以及如何對任務進行提交的將會重點講解

首先還是老規矩,翻譯一下DagScheduler類的官方文件

The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
minimal schedule to run the job. It then submits stages as TaskSets to an underlying
TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent
tasks that can run right away based on the data that's already on the cluster (e.g. map output
files from previous stages), though it may fail if this data becomes unavailable.

這個類是一個較高層級的排程器,是三級排程框架的最上層,它實現的是基於stage,也就是階段的一個排程,也就是說這個排程器
一開始提交的是階段,而不是任務。它為每一個任務計算了一個階段的有向無環圖,追蹤了RDD和階段輸出的具體實現,可以將其理解
為每個RDD變換的一個鏈條,或者形象的來說,一個血統,並且尋找到一個執行任務的最優化的排程方式。接著,它對任務的集合即
TaskSets作為階段進行提交,而它的底層是通過使用任務排程器,即TaskScheduler的一個實現,跑在叢集上來完成的。一個任務
集合包含了已經在叢集上並且可以直接依據資料立即執行的完全獨立的任務(比方說,一個來自於上一個stage的切片輸出檔案),儘管
它會失敗如果資料變得不再可用時。

Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with
"narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks
in each stage, but operations with shuffle dependencies require multiple stages (one to write a
set of map output files, and another to read those files after a barrier). In the end, every
stage will have only shuffle dependencies on other stages, and may compute multiple operations
inside it. The actual pipelining of these operations happens in the RDD.compute() functions of
various RDDs

Spark階段的建立是通過在shuffle的邊界處截斷RDD有向無環圖來實現的。有窄依賴的RDD運算元,比如說map()方法或是filter()
方法,在每個階段中會被像管道一樣串聯到一個任務集合中去,但是對於有shuffle依賴的運算元來說,它們需要多個stage(一方需要
寫出一個切片輸出檔案的集合,另一方則需要在一個邊界之後讀取那些檔案)。最終,每一個階段對於其他的階段只會存在shuffle依賴
,並且會在內部計算多個運算元。實際的這些運算元的管道化過程會發生在多個RDD的RDD.compute()方法中。

In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred
locations to run each task on, based on the current cache status, and passes these to the
low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
a small number of times before cancelling the whole stage.

除了生成一個包含很多階段的DAG圖之外,DAG排程器還要依據當前的快取狀態決定每個任務執行的最佳位置,並且將這些資訊傳遞給更為底
層的任務排程器。不僅如此,它還要處理因shuffle輸出檔案丟失而導致的失敗任務,在這種情況下,舊的階段可能需要被重新提交。而那
些在一個階段的內部但不是因為shuffle檔案丟失導致的任務失敗會由任務排程器來處理,(可以理解為DAG排程器進行統籌規劃,然後任務調
度器來處理失敗的任務),處理時,任務排程器將會在取消整個階段之前先重試每個任務幾次,重試過後仍然失敗再取消階段。

When looking through this code, there are several key concepts:
- Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler.
For example, when the user calls an action, like count(), a job will be submitted through
submitJob. Each Job may require the execution of multiple stages to build intermediate data.

- Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each
task computes the same function on partitions of the same RDD. Stages are separated at shuffle
boundaries, which introduce a barrier (where we must wait for the previous stage to finish to
fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that
executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle.
Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.

- Tasks are individual units of work, each sent to one machine.

- Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them
and likewise remembers which shuffle map stages have already produced output files to avoid
redoing the map side of a shuffle.

- Preferred locations: the DAGScheduler also computes where to run each task in a stage based
on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.

- Cleanup: all data structures are cleared when the running jobs that depend on them finish,
to prevent memory leaks in a long-running application.

當檢視本篇文件時,有幾個關鍵的概念要特別注意一下:
- 作業(表示成[[ActiveJob]])是被提交到排程器上的最高級別的工作單元。比方說,當用戶呼叫一個action的方法時,如,count()
,一個作業會通過submitJob方法的呼叫來提交。每個作業會請求多個階段的執行來構建中間資料。

- 階段是計算作業中間結果的的任務集合,每個任務會在相同RDD的分割槽上計算相同的函式。階段會在shuffle的邊界處被分割,這將引入一
個屏障(可以將這個"barrier"理解為只有等到上一個階段完成之後才能抓取計算結果)。有兩種stage的型別,第一種叫做ResultStage,
是執行一個action的最終階段,第二種叫做ShuffleMapStage,會為一個shuffle過程寫出切片輸出檔案。如果這些作業重用相同的RDD時
階段經常會被跨作業共享。

- 任務是獨立的工作單位,一個任務會被髮送至一臺機器。

- 快取追蹤:DAG排程器會計算出哪些RDD需要被快取以避免重複的計算,同樣地,也會記住哪些shuffle map階段已經輸出了檔案從而避免
重複地進行shuffle的map端過程。

- 最佳位置:DAG排程器也會基於後臺的RDD的最佳位置計算在哪兒執行一個階段的每個任務,或是快取的或shuffle的資料的位置。

- 收尾:所有的資料結構在當依賴於它們的作業執行完成之後都會被清空,這是為了在一個長時間執行的應用程式中防止記憶體洩漏

 文件解讀完以後,我們獲得到的一個十分重要的資訊,那就是DAG排程器是基於階段來實現的,因此我們先要了解一下submitStage方法中到底幹了些什麼事兒吧!

/** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

首先第一行的註釋中寫了句話:在提交階段之前,先要遞迴地提交任何丟失的父階段,可以看到,在for(parent <- missing)迴圈中的確進行了階段的遞迴提交,那麼這個missing陣列又是怎麼被計算出來的呢?因此,我們就需要看一下getMissingParentStages的原始碼了,該方法原始碼如下:

private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new ArrayStack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

該方法中一共使用了兩種資料結構,missing物件以及visited物件是HashSet,而waitingForVisit物件是一個棧,對於每一個RDD,都有其對應的依賴列表,這個列表記錄的是每一個RDD經過各種變換的鏈條,而在遍歷這個列表的迴圈中,一旦發現shuffle依賴,就建立或得到一個新的ShuffleMapStage,並將階段放入missing集合中,如果遇到的是窄依賴,那就將窄依賴對應的上家RDD推入棧中,最後還會不斷地將棧中的元素彈出,直到所有窄依賴的上家RDD全都建立了階段才算完,而最後該方法的返回值missing就是需要被上層方法submitStage所提交的新創建出來的那些階段!!!

上述分析,可得到一個結論,那就是階段是通過是否存在shuffle行為來劃分的,而作業提交過程是遞迴執行的,一個stage劃分的過程可用下圖來表示:

2.3 Spark的shuffle過程詳解

根據上述stage劃分的流程圖可知,在遍歷每個RDD的依賴列表時,只要遇到shuffle依賴就會劃分階段,然後在shuffle之前的那個階段的最後一個RDD(必定為窄依賴)就會將資料寫入本地磁碟,然後這個資料會作為後一個階段的輸入檔案,那麼,問題來了,這個寫入過程Spark究竟是如何實現的呢?為了瞭解這個問題,我們首先需要檢視Spark中關於介面ShuffleManager以及該介面的實現子類(唯一的一個實現子類)SortShuffleManager的官方文件註釋以及原始碼了

ShuffleManager的文件註釋:

Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver
and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles
with it, and executors (or tasks running locally in the driver) can ask to read and write data.
NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
boolean isDriver as parameters.

ShuffleManager是shuffle系統的一個可插拔的介面。一個ShuffleManager是在SparkEnv中在驅動以及在每個執行器上被創
建的,基於spark.shuffle.manager的設定。驅動程式使用這個ShuffleManager來註冊shuffle,然後執行器(或者說是本地
地執行在驅動上的任務)能夠請求讀或者寫資料。注意:此介面會被SparkEnv例項化因此它的構造器能夠將SparkConf物件以及is
Driver布林值作為引數傳入。

檢視原始碼可知,該介面有這麼幾個抽象方法需要被實現:registerShuffle即註冊shuffle方法,getWriter方法以及getReader方法,那麼接下去,我們來仔細檢視一下該介面的唯一一個實現子類即SortShuffleManager的官方文件註釋

SortShuffleManager的文件註釋:

In sort-based shuffle, incoming records are sorted according to their target partition ids, then
written to a single map output file. Reducers fetch contiguous regions of this file in order to
read their portion of the map output. In cases where the map output data is too large to fit in
memory, sorted subsets of the output can be spilled to disk and those on-disk files are merged
to produce the final output file.

在基於排序的shuffle過程中,進來的記錄會根據它們的目標分割槽id進行排序,然後被寫入一個單獨的切片輸出檔案中去。Reducers
為了讀取它們的切片輸出的部分會抓取這個檔案的一片連續區域。為了防止切片輸出資料過大以至於記憶體放不下的情況發生,輸出檔案
經過排序的子集會被溢位到磁碟並且那些位於磁碟上的檔案會被合併成一份最終的輸出檔案

Sort-based shuffle has two different write paths for producing its map output files:

基於排序的shuffle在生成切片輸出檔案時有兩種不同的寫出方式:序列化排序以及非序列化排序

- Serialized sorting: used when all three of the following conditions hold:
1. The shuffle dependency specifies no aggregation or output ordering.
2. The shuffle serializer supports relocation of serialized values (this is currently
supported by KryoSerializer and Spark SQL's custom serializers).
3. The shuffle produces fewer than 16777216 output partitions.
- Deserialized sorting: used to handle all other cases.

- 序列化排序:在以下幾個條件滿足的情況下將會使用這種排序方式:
  1. shuffle依賴沒有指定聚合函式或輸出排序。
  2. shuffle序列化器支援序列化值的重定位(目前KryoSerializer序列化器以及Spark SQL的自定義序列化器支援該技術)
  3. shuffle過程生成的輸出分割槽數小於16777216個,即16m個檔案
- 非序列化排序:被用來處理其他所有的情況

-----------------------
Serialized sorting mode
-----------------------
In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
shuffle writer and are buffered in a serialized form during sorting. This write path implements
several optimizations:

序列化排序模式
在序列化排序模式下,進來的記錄一旦被傳遞給shuffle writer就會被即刻序列化並且在排序過程中被快取成一個序列化的形式。
這種寫出方式有幾大優點:

 - Its sort operates on serialized binary data rather than Java objects, which reduces memory
consumption and GC overheads. This optimization requires the record serializer to have certain
properties to allow serialized records to be re-ordered without requiring deserialization.
See SPARK-4550, where this optimization was first proposed and implemented, for more details.

- 該方式的排序是以序列化了的二進位制資料而不是以Java物件的方式進行的,因此這種方式就會大幅度降低記憶體消耗以及GC(垃圾回
 收)。這樣的優化手段需要記錄序列化器提供某些屬性來允許序列化了的記錄在不需要反序列化的情況下重新排序。

- It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
record in the sorting array, this fits more of the array into cache.

- 它使用了一種特殊的高效的快取排序機制([[ShuffleExternalSorter]])來對壓縮了記錄指標以及分割槽id的陣列進行排序。通
 過在排序陣列中對每條記錄僅使用8個位元組的空間(其實就是一個長整型的長度),能夠往快取中的陣列加入更多的資料。

- The spill merging procedure operates on blocks of serialized records that belong to the same
partition and does not need to deserialize records during the merge.

- 溢位合併的過程直接操作的就是序列化的資料而不需要在合併過程中對記錄進行反序列化操作。

- When the spill compression codec supports concatenation of compressed data, the spill merge
simply concatenates the serialized and compressed spill partitions to produce the final output
partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used
and avoids the need to allocate decompression or copying buffers during the merge.

- 當溢位壓縮編解碼器支援壓縮資料的串聯時,為了產生最終的輸出分割槽,溢位合併只會串聯串行了的並且是壓縮的溢位分割槽。這個過程會
 使用高效的資料拷貝技術(即零拷貝技術),就像是Java NIO技術中的"transferTo"方法,使用了這種技術就能夠避免在合併過程中拷貝
快取。

好了,看完官方的文件註釋了,我們開始解讀SortShuffleManager這個類的原始碼了,首先我們先來看一下registerShuffle方法,看一下這個方法中具體做了些什麼吧!

override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

首先,override表明registerShuffle方法在這個類中得到了重寫,if判斷中我們可以總結出以下資訊:

一共有三種手段進行shuffle前的落盤:如果應該使用BypassMergeSort(過時的合併排序)就新建一個BypassMergeSortShuffleHandle物件來處理

                    如果可以使用序列化的shuffle方式就新建一個SerializedShuffleHandle物件來處理

                 其餘情況,均使用BaseshuffleHandle來處理

除此之外,還能得到的資訊是這三種處理方式的優先順序為bypass > serialized > base

我們還可以看到,在第一種處理方式上方還有一段註釋,我們摘錄下來解讀以下:

    // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
   
   如果有少於spark.shuffle.sort.bypassMergeThreshold個分割槽數並且我們不需要map端聚合的情況下,就直接寫出分割槽
    數個檔案並且直接在最後進行串聯。這樣的話會避免在合併成一個大的溢位檔案時進行反覆的序列化和反序列化過程,以下程式碼就
    是同時開啟多個檔案並且需要分配更多的記憶體到緩衝區的情況了。

從以上文件我們可以知道,shuffle的實際註冊過程是通過不同的handle物件來完成的,那麼,這些handle物件對應的類是怎樣的呢?我們來繼續檢視原始碼:

ShuffleHandle抽象類:

abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}

ShuffleHandle是一個抽象類,它除了繼承了Serializable介面之外什麼都沒做,但是它有三種不同的實現子類

BaseShuffleHandle類:

private[spark] class BaseShuffleHandle[K, V, C](
    shuffleId: Int,
    val numMaps: Int,
    val dependency: ShuffleDependency[K, V, C])
  extends ShuffleHandle(shuffleId)

繼承了ShuffleHandle抽象類,它有以下兩個實現子類:

BypassMergeSortShuffleHandle類:

private[spark] class BypassMergeSortShuffleHandle[K, V](
  shuffleId: Int,
  numMaps: Int,
  dependency: ShuffleDependency[K, V, V])
  extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
}

SerializedShuffleHandle類:

private[spark] class SerializedShuffleHandle[K, V](
  shuffleId: Int,
  numMaps: Int,
  dependency: ShuffleDependency[K, V, V])
  extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
}

因此可得,這幾個Handle的繼承關係圖如下,並且這個類只是一個描述實體,並沒有具體的行為:

ShuffleHandle
    ^
    |--BaseShuffleHandle
        ^
        |---BypassMergeSortShuffleHandle
        |---SerializedShuffleHandle

至此,我們已經瞭解了三種不同的ShuffleHandle的實現子類,那麼它們又都是在什麼情況下使用的呢?因此,我們需要了解以下幾個方法的原始碼:

shouldBypassMergeSort方法:

def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    // We cannot bypass sorting if we need to do map-side aggregation.
    if (dep.mapSideCombine) {
      false
    } else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
  }

由原始碼可知,如果map端有聚合的話,就不能使用bypass的型別,如果map端無聚合,還要再看,使用者指定的分割槽個數是否小於等於200這一門限值,如果滿足這一條件,才能使用bypass的方式

canUseSerializedShuffle方法:

def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
    val shufId = dependency.shuffleId
    val numPartitions = dependency.partitioner.numPartitions
    if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
        s"${dependency.serializer.getClass.getName}, does not support object relocation")
      false
    } else if (dependency.mapSideCombine) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
        s"map-side aggregation")
      false
    } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
      log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
        s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
      false
    } else {
      log.debug(s"Can use serialized shuffle for shuffle $shufId")
      true
    }
  }

由上述原始碼可知,只有當序列化器支援序列化物件重定位技術,並且map端無聚合,並且分割槽數小於等於16777215,即16m個,才能使用該方式寫入磁碟

幾種不同的寫入磁碟的處理方式以及各自應該在什麼樣的情況下使用我們都已經瞭解了,那麼具體寫入磁碟的流程又是怎麼樣的呢?由此,我們需要進入SortShuffleManager類的getWriter方法先簡單看一下涉及了哪些不同的Write吧!

override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
    }
  }

以上原始碼告訴我們的資訊為:SortShuffleManager類重寫了其子類的抽象方法getWriter方法,該方法會根據傳入的引數ShuffleHandle來判斷應該使用哪一種Writer,如果是SerializedShuffleHandle,就使用UnsafeShuffleWriter;如果是BypassMergeSortShuffleHandle,就使用BypassMergeSortShuffleWriter;最後,如果是BaseShuffleHandle,就使用SortShuffleWriter

至此,我們可以整理出以下基本的根據不同的Handle判斷使用不同的Writer的時序圖:

 

之前在分析registerShuffle方法的原始碼時已經說明了三種方式的優先順序為bypass > serialized > base,那麼接下來,我們就按照這個順序來詳細瞭解以下這三種不同的Writer吧!

BypassSortShuffleWriter類:

這個類中最重要的兩個方法是write方法以及writePartitionedFile方法,write方法會先向本地磁碟寫入每個分割槽對應的檔案,然後writePartitionedFile方法則會將不同分割槽的檔案合併成一個大檔案。

write方法:

public void write(Iterator<Product2<K, V>> records) throws IOException {
    assert (partitionWriters == null);
    if (!records.hasNext()) {
      partitionLengths = new long[numPartitions];
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
      mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
      return;
    }
    final SerializerInstance serInstance = serializer.newInstance();
    final long openStartTime = System.nanoTime();
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    partitionWriterSegments = new FileSegment[numPartitions];
    for (int i = 0; i < numPartitions; i++) {
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
        blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      partitionWriters[i] =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
    }
    // Creating the file to write to and creating a disk writer both involve interacting with
    // the disk, and can take a long time in aggregate when we open many files, so should be
    // included in the shuffle write time.
    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }

    for (int i = 0; i < numPartitions; i++) {
      final DiskBlockObjectWriter writer = partitionWriters[i];
      partitionWriterSegments[i] = writer.commitAndGet();
      writer.close();
    }

    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    File tmp = Utils.tempFileWith(output);
    try {
      partitionLengths = writePartitionedFile(tmp);
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
  }

該方法維護了兩個陣列,partitionWriters以及partitionWriterSegments,前者先將每個分割槽的key和value寫入陣列中對應的位置,這個位置其實就是分割槽索引,後者再遍歷partitionWriter的每個元素,呼叫commitAndGet方法將其賦值給partitionWriterSegments陣列;接著先將臨時檔案命名成tmp再在之後的try語句塊中分別寫分割槽檔案以及索引檔案。

writePartitionedFile方法:

private long[] writePartitionedFile(File outputFile) throws IOException {
    // Track location of the partition starts in the output file
    final long[] lengths = new long[numPartitions];
    if (partitionWriters == null) {
      // We were passed an empty iterator
      return lengths;
    }

    final FileOutputStream out = new FileOutputStream(outputFile, true);
    final long writeStartTime = System.nanoTime();
    boolean threwException = true;
    try {
      for (int i = 0; i < numPartitions; i++) {
        final File file = partitionWriterSegments[i].file();
        if (file.exists()) {
          final FileInputStream in = new FileInputStream(file);
          boolean copyThrewException = true;
          try {
            lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
            copyThrewException = false;
          } finally {
            Closeables.close(in, copyThrewException);
          }
          if (!file.delete()) {
            logger.error("Unable to delete file for partition {}", i);
          }
        }
      }
      threwException = false;
    } finally {
      Closeables.close(out, threwException);
      writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
    }
    partitionWriters = null;
    return lengths;
  }

該方法先新建了一個輸出流,這個輸出流就是為了寫出最終的那個大檔案,然後遍歷分割槽數,根據分割槽索引從partitionWriterSegments陣列中獲取到檔案,依次採用追加的方式串聯到輸出流的末尾,合併過程採用的是Utils工具類的copyStream方法來完成,需要注意的是,這種方式需要分割槽數要小於等於200,這樣設定的原因是由於檔案不經過緩衝區直接寫入磁碟,因此需要同時開啟很多的檔案,而這會導致計算機的效能降低,因此不能將這個值設定得過大

UnsafeShuffleWriter類:

我們先看一下整體的框架,也就是write方法:

write方法:

public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
    // Keep track of success so we know if we encountered an exception
    // We do this rather than a standard try/catch/re-throw to handle
    // generic throwables.
    boolean success = false;
    try {
      while (records.hasNext()) {
        insertRecordIntoSorter(records.next());
      }
      closeAndWriteOutput();
      success = true;
    } finally {
      if (sorter != null) {
        try {
          sorter.cleanupResources();
        } catch (Exception e) {
          // Only throw this error if we won't be masking another
          // error.
          if (success) {
            throw e;
          } else {
            logger.error("In addition to a failure during writing, we failed during " +
                         "cleanup.", e);
          }
        }
      }
    }
  }

上述的write方法描述了整個落盤過程的框架,try-catch語句塊中,try裡面完成了將每一條記錄插入排序器的方法(insertRecordIntoSorter)以及寫出到磁碟的方法(closeAndWriteOutput),而catch裡面完成了資源的釋放並列印了一些異常,那麼我們先來看一下insertRecordIntoSorter方法的原始碼!

insertRecordIntoSorter方法:

void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
    assert(sorter != null);
    final K key = record._1();
    final int partitionId = partitioner.getPartition(key);
    serBuffer.reset();
    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    serOutputStream.flush();

    final int serializedRecordSize = serBuffer.size();
    assert (serializedRecordSize > 0);

    sorter.insertRecord(
      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
  }

從以上方法我們可以得到的第一個重要資訊就是該方法使用到了序列化技術來處理每一條記錄!而serBuffer物件就是一個序列化緩衝區,可以看到,該方法還呼叫了sorter排序器的insertRecord方法來將每一條序列化後了的記錄插入排序器中去,那麼現在我們再來看一下insertRecord方法具體做了些什麼!ctrl+滑鼠左鍵點選方法檢視原始碼進入到了一個新的類中,ShuffleExternalSorter,那麼在考察insertRecord方法之前,我們先來簡單讀一下ShuffleExternalSorter的官方文件吧!

ShuffleExternalSorter類:

An external sorter that is specialized for sort-based shuffle.
<p>
Incoming records are appended to data pages. When all records have been inserted (or when the
current thread's shuffle memory limit is reached), the in-memory records are sorted according to
their partition ids (using a {@link ShuffleInMemorySorter}). The sorted records are then
written to a single output file (or multiple files, if we've spilled). The format of the output
files is the same as the format of the final output file written by
{@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
written as a single serialized, compressed stream that can be read with a new decompression and
deserialization stream.
<p>
Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its
spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
specialized merge procedure that avoids extra serialization/deserialization.

一個外部的排序器是專門為基於排序的shuffle而設計的。

進來的記錄會被追加到資料頁的末尾。當所有的記錄都被插入至排序器(或者說噹噹前執行緒的shuffle記憶體限制已經達到),位於記憶體中
的記錄就會根據它們的分割槽id進行排序(使用到的是ShuffleInMemorySorter類)。排序好了的記錄然後就會被寫出到一個單獨的輸出
檔案中去(也有可能會是多個檔案,如果出現溢位的情況的話)。輸出檔案的格式是和SortShuffleWriter類中的輸出檔案格式相同的:
每一個輸出分割槽的記錄會被以單獨的序列化了的,壓縮了的流的方式寫出並且能夠被一個新的解壓縮的並且是非序列化的流讀取。

不同於org.apache.spark.util.collection包下的ExternalSorter類,這個排序器並不會合併它的溢位檔案。相反,合併過程
會在UnsafeShuffleWriter類中被執行,而執行過程是做過優化的,能夠避免過多的序列化以及反序列化過程。

ShuffleExternalSorter.insertRecord方法:

/**
   * Write a record to the shuffle sorter.
   */
  public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
    throws IOException {

    // for tests
    assert(inMemSorter != null);
    if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
      logger.info("Spilling data because number of spilledRecords crossed the threshold " +
        numElementsForSpillThreshold);
      spill();
    }

    growPointerArrayIfNecessary();
    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    // Need 4 or 8 bytes to store the record length.
    final int required = length + uaoSize;
    acquireNewPageIfNecessary(required);

    assert(currentPage != null);
    final Object base = currentPage.getBaseObject();
    final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
    UnsafeAlignedOffset.putSize(base, pageCursor, length);
    pageCursor += uaoSize;
    Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
    pageCursor += length;
    inMemSorter.insertRecord(recordAddress, partitionId);
  }

我們逐行進行程式碼的檢視:

1. 首先判斷記憶體排序器中記錄的數量是否大於等於門限值,一旦大於,就會將資料溢位到磁碟

2. 然後使用到了一個重要技術,那就是記憶體拷貝(又稱零拷貝技術),Platform.copyMemoey(recordBase, recordOffset, base, pageCursor, length)方法中將之前序列化緩衝區中的資料從記錄偏移量位置處從base陣列中的pageCursor位置處拷貝了length長度的位元組,該方法比普通的拷貝技術更為高效

3. 記憶體拷貝結束後,最終才呼叫了inMemSorter的insertRecord方法將之前得到的記錄指標和分割槽id打包成一個整體插入到記憶體排序器中去,所以,下一步,我們來看一下inMemSorter物件的insertRecord方法中做了什麼

inMemSorter.insertRecord方法:

public void insertRecord(long recordPointer, int partitionId) {
    if (!hasSpaceForAnotherRecord()) {
      throw new IllegalStateException("There is no space for new record");
    }
    array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
    pos++;
  }

此方法中我們又得到了一個重要資訊,那就是記憶體排序器陣列中插入的是一個記錄指標和分割槽id的打包物件,那麼,我們接下去就看一下這個打包過程是如何完成的吧!

PackedRecordPointer.packPointer方法:

 public static long packPointer(long recordPointer, int partitionId) {
    assert (partitionId <= MAXIMUM_PARTITION_ID);
    // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
    // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
    final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
    final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
    return (((long) partitionId) << 40) | compressedAddress;
  }

總之就是將這兩個數字拼接成一個新的數字,最後返回的是一個長整型

至此,我們終於把如何將一條條的資料插入記憶體排序器的方法大致看完了,接下去,我們正式開始closeAndWriteOutput方法的原始碼解析環節!

closeAndWriteOutput方法:

@VisibleForTesting
  void closeAndWriteOutput() throws IOException {
    assert(sorter != null);
    updatePeakMemoryUsed();
    serBuffer = null;
    serOutputStream = null;
    final SpillInfo[] spills = sorter.closeAndGetSpills();
    sorter = null;
    final long[] partitionLengths;
    final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    final File tmp = Utils.tempFileWith(output);
    try {
      try {
        partitionLengths = mergeSpills(spills, tmp);
      } finally {
        for (SpillInfo spill : spills) {
          if (spill.file.exists() && ! spill.file.delete()) {
            logger.error("Error while deleting spill file {}", spill.file.getPath());
          }
        }
      }
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
  }

我們首先檢視一下closeAndGetSpills方法,看一下在這個方法內部是否執行了落盤的操作

sorter.closeAndGetSpills方法:

public SpillInfo[] closeAndGetSpills() throws IOException {
    if (inMemSorter != null) {
      // Do not count the final file towards the spill count.
      writeSortedFile(true);
      freeMemory();
      inMemSorter.free();
      inMemSorter = null;
    }
    return spills.toArray(new SpillInfo[spills.size()]);
  }

可以看到,該方法內部呼叫了writeSortedFile方法,並且最終的返回值是一個SpliiInfo陣列,那麼我們來看一下這個writeSortedFile方法

ShuffleExternalSorter.writeSortedFile方法:

private void writeSortedFile(boolean isLastFile) {

    final ShuffleWriteMetrics writeMetricsToUse;

    if (isLastFile) {
      // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
      writeMetricsToUse = writeMetrics;
    } else {
      // We're spilling, so bytes written should be counted towards spill rather than write.
      // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
      // them towards shuffle bytes written.
      writeMetricsToUse = new ShuffleWriteMetrics();
    }

    // This call performs the actual sort.
    final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
      inMemSorter.getSortedIterator();

    // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
    // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
    // data through a byte array. This array does not need to be large enough to hold a single
    // record;
    final byte[] writeBuffer = new byte[diskWriteBufferSize];

    // Because this output will be read during shuffle, its compression codec must be controlled by
    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
    // createTempShuffleBlock here; see SPARK-3426 for more details.
    final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
      blockManager.diskBlockManager().createTempShuffleBlock();
    final File file = spilledFileInfo._2();
    final TempShuffleBlockId blockId = spilledFileInfo._1();
    final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

    // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
    // Our write path doesn't actually use this serializer (since we end up calling the `write()`
    // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
    // around this, we pass a dummy no-op serializer.
    final SerializerInstance ser = DummySerializerInstance.INSTANCE;

    final DiskBlockObjectWriter writer =
      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);

    int currentPartition = -1;
    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    while (sortedRecords.hasNext()) {
      sortedRecords.loadNext();
      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
      assert (partition >= currentPartition);
      if (partition != currentPartition) {
        // Switch to the new partition
        if (currentPartition != -1) {
          final FileSegment fileSegment = writer.commitAndGet();
          spillInfo.partitionLengths[currentPartition] = fileSegment.length();
        }
        currentPartition = partition;
      }

      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
      final Object recordPage = taskMemoryManager.getPage(recordPointer);
      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
      while (dataRemaining > 0) {
        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
        Platform.copyMemory(
          recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
        writer.write(writeBuffer, 0, toTransfer);
        recordReadPosition += toTransfer;
        dataRemaining -= toTransfer;
      }
      writer.recordWritten();
    }

    final FileSegment committedSegment = writer.commitAndGet();
    writer.close();
    // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
    // then the file might be empty. Note that it might be better to avoid calling
    // writeSortedFile() in that case.
    if (currentPartition != -1) {
      spillInfo.partitionLengths[currentPartition] = committedSegment.length();
      spills.add(spillInfo);
    }

    if (!isLastFile) {  // i.e. this is a spill file
      // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
      // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
      // relies on its `recordWritten()` method being called in order to trigger periodic updates to
      // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
      // counter at a higher-level, then the in-progress metrics for records written and bytes
      // written would get out of sync.
      //
      // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
      // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
      // metrics to the true write metrics here. The reason for performing this copying is so that
      // we can avoid reporting spilled bytes as shuffle write bytes.
      //
      // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
      // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
      // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
      writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
      taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
    }
  }

至此,我們找到了進行排序以及落盤的關鍵程式碼了,考察這段程式碼,我們可以得到以下資訊:

1. 這個方法的引數isLastFile判斷的是寫入磁碟的到底是溢位的檔案還是正常的之前儲存在排序器中的快取,如果isLastFile為true,寫出的就是快取資料,反之,則寫出溢位檔案

2. 重要:緊接著,我們終於看到排序的程式碼了!註釋寫道://This call performs the actual sort,之後我們會單獨講解這個方法

3. 排序器陣列已經完成了最終排序,接下去,又可以看到一個記憶體拷貝的方法,先從排好序的陣列sortedRecords中獲取記錄指標,然後根據這個記錄指標獲得到記憶體空間,再將這片空間又拷貝給了writeBuffer物件,最後寫出到磁碟

現在我們再單獨講解以下排序的方法:

public ShuffleSorterIterator getSortedIterator() {
    int offset = 0;
    if (useRadixSort) {
      offset = RadixSort.sort(
        array, pos,
        PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
        PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
    } else {
      MemoryBlock unused = new MemoryBlock(
        array.getBaseObject(),
        array.getBaseOffset() + pos * 8L,
        (array.size() - pos) * 8L);
      LongArray buffer = new LongArray(unused);
      Sorter<PackedRecordPointer, LongArray> sorter =
        new Sorter<>(new ShuffleSortDataFormat(buffer));

      sorter.sort(array, 0, pos, SORT_COMPARATOR);
    }
    return new ShuffleSorterIterator(pos, array, offset);
  }

不斷進入sort方法,最終我們看到了一段非常底層的排序演算法,翻譯過來叫做”二進位制排序演算法“,程式碼如下,由於關於這個演算法的講解已經超過了此篇部落格的範圍,我們先將其碼一下,以後再來講解:

/**
   * Sorts the specified portion of the specified array using a binary
   * insertion sort.  This is the best method for sorting small numbers
   * of elements.  It requires O(n log n) compares, but O(n^2) data
   * movement (worst case).
   *
   * If the initial part of the specified range is already sorted,
   * this method can take advantage of it: the method assumes that the
   * elements from index {@code lo}, inclusive, to {@code start},
   * exclusive are already sorted.
   *
   * @param a the array in which a range is to be sorted
   * @param lo the index of the first element in the range to be sorted
   * @param hi the index after the last element in the range to be sorted
   * @param start the index of the first element in the range that is
   *        not already known to be sorted ({@cod