1. 程式人生 > >[Spark 進階]-- 優化Spark作業以獲得最佳效能

[Spark 進階]-- 優化Spark作業以獲得最佳效能

感謝原文作者:https://michalsenkyr.github.io/2018/01/spark-performance

Spark作業的開發在表面上看起來很容易,而且大部分都是如此。提供的 API設計精良且功能豐富,如果您熟悉Scala集合或Java流,您將立即完成實施。實際上,當在叢集上執行它們並且滿負載時,硬體部分實際上是因為並非所有作業在效能方面都是相同的。不幸的是,要以最佳方式實現您的工作,您必須瞭解Spark及其內部結構。

在本文中,我將討論在開發Spark應用程式時可能遇到的最常見的效能問題以及如何避免或減輕它們。

1.Transformations

使用RDD API時,最常見的效能問題是使用不適合特定用例的轉換。這可能源於許多使用者對SQL查詢語言的熟悉以及他們對查詢優化的依賴。重要的是要意識到RDD API不應用任何此類優化。

我們來看看同一計算的這兩個定義:

val input = sc.parallelize(1 to 10000000, 42).map(x => (x % 42, x))
val definition1 = input.groupByKey().mapValues(_.sum)
val definition2 = input.reduceByKey(_ + _)
RDD 平均時間 閔。時間 最大。時間
定義1 2646.3ms 1570ms 8444ms
定義2 270.7ms 96ms 1569ms

Lineage(定義1):

(42) MapPartitionsRDD[3] at mapValues at <console>:26 []
 |   ShuffledRDD[2] at groupByKey at <console>:26 []
 +-(42) MapPartitionsRDD[1] at map at <console>:24 []
    |   ParallelCollectionRDD[0] at parallelize at <console>:24 []

Lineage(定義2):

(42) ShuffledRDD[4] at reduceByKey at <console>:26 []
 +-(42) MapPartitionsRDD[1] at map at <console>:24 []
    |   ParallelCollectionRDD[0] at parallelize at <console>:24 []

第二個定義比第一個定義快得多,因為它在我們的用例上下文中更有效地處理資料,而不是不必要地收集所有元素。

在進行笛卡爾連線並稍後對結果資料進行過濾而不是轉換為RDD並使用內部連線時,我們可以觀察到類似的效能問題:

val input1 = sc.parallelize(1 to 10000, 42)
val input2 = sc.parallelize(1.to(100000, 17), 42)
val definition1 = input1.cartesian(input2).filter { case (x1, x2) => x1 % 42 == x2 % 42 }
val definition2 = input1.map(x => (x % 42, x)).join(input2.map(x => (x % 42, x))).map(_._2)
RDD 平均時間 閔。時間 最大。時間
定義1 9255.3ms 3750ms 12077ms
定義2 1525ms 623ms 2759ms

Lineage(定義1):

(1764) MapPartitionsRDD[34] at filter at <console>:30 []
  |    CartesianRDD[33] at cartesian at <console>:30 []
  |    ParallelCollectionRDD[0] at parallelize at <console>:24 []
  |    ParallelCollectionRDD[1] at parallelize at <console>:24 []

Lineage(定義2):

(42) MapPartitionsRDD[40] at map at <console>:30 []
 |   MapPartitionsRDD[39] at join at <console>:30 []
 |   MapPartitionsRDD[38] at join at <console>:30 []
 |   CoGroupedRDD[37] at join at <console>:30 []
 +-(42) MapPartitionsRDD[35] at map at <console>:30 []
 |  |   ParallelCollectionRDD[0] at parallelize at <console>:24 []
 +-(42) MapPartitionsRDD[36] at map at <console>:30 []
    |   ParallelCollectionRDD[1] at parallelize at <console>:24 []

這裡的經驗法則是始終使用轉換邊界處的最小資料量。RDD API盡最大努力優化任務排程,基於資料區域性性的首選位置等背景內容。但它並不優化計算本身。事實上,它實際上是不可能的,因為每個轉換都是由不透明的函式定義的,而Spark無法檢視我們正在使用的資料以及如何處理。

還有另一條經驗法則可以從中得出:使用豐富的變換,即在單個變換的上下文中儘可能多地進行變換。一個有用的工具是combineByKeyWithClassTag方法:

val input = sc.parallelize(1 to 1000000, 42).keyBy(_ % 1000)
val combined = input.combineByKeyWithClassTag((x: Int) => Set(x / 1000), (s: Set[Int], x: Int) => s + x / 1000, (s1: Set[Int], s2: Set[Int]) => s1 ++ s2)

Lineage:

(42) ShuffledRDD[61] at combineByKeyWithClassTag at <console>:28 []
 +-(42) MapPartitionsRDD[57] at keyBy at <console>:25 []
    |   ParallelCollectionRDD[56] at parallelize at <console>:25 []

DataFrames and Datasets

Spark社群實際上認識到了這些問題,並開發了兩套高階API來解決這個問題:DataFrame和Dataset。這些API帶有關於資料的附加資訊,並定義了整個框架中可識別的特定轉換。在呼叫動作時,計算圖被大量優化並轉換為相應的RDD圖,並執行該圖。

為了演示,我們可以嘗試兩種等效的計算,以一種非常不同的方式定義,並比較它們的執行時間和作業圖:

val input1 = sc.parallelize(1 to 10000, 42).toDF("value1")
val input2 = sc.parallelize(1.to(100000, 17), 42).toDF("value2")
val definition1 = input1.crossJoin(input2).where('value1 % 42 === 'value2 % 42)
val definition2 = input1.join(input2, 'value1 % 42 === 'value2 % 42)
資料幀 平均時間 閔。時間 最大。時間
定義1 1598.3ms 929ms 2765ms
定義2 1770.9ms 744ms 2954ms

解析邏輯計劃(定義1):

'Filter (('value1 % 42) = ('value2 % 42))
+- Join Cross
   :- Project [value#2 AS value1#4]
   :  +- SerializeFromObject [input[0, int, false] AS value#2]
   :     +- ExternalRDD [obj#1]
   +- Project [value#9 AS value2#11]
      +- SerializeFromObject [input[0, int, false] AS value#9]
         +- ExternalRDD [obj#8]

解析邏輯計劃(定義2):

Join Inner, ((value1#4 % 42) = (value2#11 % 42))
:- Project [value#2 AS value1#4]
:  +- SerializeFromObject [input[0, int, false] AS value#2]
:     +- ExternalRDD [obj#1]
+- Project [value#9 AS value2#11]
   +- SerializeFromObject [input[0, int, false] AS value#9]
      +- ExternalRDD [obj#8]

物理計劃(定義1):

*SortMergeJoin [(value1#4 % 42)], [(value2#11 % 42)], Cross
:- *Sort [(value1#4 % 42) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning((value1#4 % 42), 200)
:     +- *Project [value#2 AS value1#4]
:        +- *SerializeFromObject [input[0, int, false] AS value#2]
:           +- Scan ExternalRDDScan[obj#1]
+- *Sort [(value2#11 % 42) ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning((value2#11 % 42), 200)
      +- *Project [value#9 AS value2#11]
         +- *SerializeFromObject [input[0, int, false] AS value#9]
            +- Scan ExternalRDDScan[obj#8]

物理計劃(定義2):

*SortMergeJoin [(value1#4 % 42)], [(value2#11 % 42)], Inner
:- *Sort [(value1#4 % 42) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning((value1#4 % 42), 200)
:     +- *Project [value#2 AS value1#4]
:        +- *SerializeFromObject [input[0, int, false] AS value#2]
:           +- Scan ExternalRDDScan[obj#1]
+- *Sort [(value2#11 % 42) ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning((value2#11 % 42), 200)
      +- *Project [value#9 AS value2#11]
         +- *SerializeFromObject [input[0, int, false] AS value#9]
            +- Scan ExternalRDDScan[obj#8]

優化之後,原始型別和轉換順序無關緊要,這要歸功於一種稱為基於規則的查詢優化的功能。由於基於成本的查詢優化,資料大小也被考慮在內以正確的方式重新排序作業。最後,DataFrame API還將有關作業實際所需的列的資訊推送到資料來源讀取器以限制輸入讀取(這稱為謂詞下推)。編寫RDD作業實際上非常難以與DataFrame API提供的內容相提並論。

但是,有一個方面,DataFrames並不出色,並且促使建立另一種,第三種方式來表示Spark計算:型別安全性。由於資料列僅出於轉換定義的目​​的而由名稱表示,並且僅在執行時檢查它們對實際資料型別的有效使用,這往往會導致繁瑣的開發過程,我們需要跟蹤所有正確的型別或我們最終在執行過程中出錯。資料集API是作為此解決方案建立的。

Dataset API使用Scala的型別推斷和基於implicits的技術來傳遞Encoders,這是描述Spark優化器資料型別的特殊類,就像DataFrames一樣,同時保留編譯時鍵入以進行型別檢查和寫入轉換自然。如果這聽起來很複雜,這是一個例子:

val input = sc.parallelize(1 to 10000000, 42)
val definition = input.toDS.groupByKey(_ % 42).reduceGroups(_ + _)
資料集 平均時間 閔。時間 最大。時間
定義 544.9ms 472ms 728ms

解析的邏輯計劃:

'Aggregate [value#301], [value#301, unresolvedalias(reduceaggregator([email protected], Some(unresolveddeserializer(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"), value#298)), Some(int), Some(StructType(StructField(value,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#303, input[0, scala.Tuple2, true]._2 AS value#304, newInstance(class scala.Tuple2), input[0, int, false] AS value#296, IntegerType, false, 0, 0), Some(<function1>))]
+- AppendColumns <function1>, int, [StructField(value,IntegerType,false)], cast(value#298 as int), [input[0, int, false] AS value#301]
   +- SerializeFromObject [input[0, int, false] AS value#298]
      +- ExternalRDD [obj#297]

實體計劃:

ObjectHashAggregate(keys=[value#301], functions=[reduceaggregator([email protected], Some(value#298), Some(int), Some(StructType(StructField(value,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#303, input[0, scala.Tuple2, true]._2 AS value#304, newInstance(class scala.Tuple2), input[0, int, false] AS value#296, IntegerType, false, 0, 0)], output=[value#301, ReduceAggregator(int)#309])
+- Exchange hashpartitioning(value#301, 200)
   +- ObjectHashAggregate(keys=[value#301], functions=[partial_reduceaggregator([email protected], Some(value#298), Some(int), Some(StructType(StructField(value,IntegerType,false))), input[0, scala.Tuple2, true]._1 AS value#303, input[0, scala.Tuple2, true]._2 AS value#304, newInstance(class scala.Tuple2), input[0, int, false] AS value#296, IntegerType, false, 0, 0)], output=[value#301, buf#383])
      +- AppendColumnsWithObject <function1>, [input[0, int, false] AS value#298], [input[0, int, false] AS value#301]
         +- Scan ExternalRDDScan[obj#297]

後來人們意識到DataFrames可以被認為只是這些資料集的一個特例,並且API是統一的(使用一個名為Row的特殊優化類作為DataFrame的資料型別)。

但是,在涉及資料集時,請記住一點需要注意。作為開發人員熟悉了採集樣RDD API,資料集API提供了自己的變異是其最流行的方法- filtermapreduce。這些工作(如預期的那樣)具有任意功能。因此,Spark無法理解這些函式的細節,並且其優化能力變得有些受損,因為它無法再正確傳播某些資訊(例如,用於謂詞下推)。這將在序列化一節中進一步解釋。

val input = spark.read.parquet("file:///tmp/test_data")
val dataframe = input.select('key).where('key === 1)
val dataset = input.as[(Int, Int)].map(_._1).filter(_ == 1)

解析的邏輯計劃(資料幀):

'Filter ('key = 1)
+- Project [key#43]
   +- Relation[key#43,value#44] parquet

解析邏輯計劃(資料集):

'TypedFilter <function1>, int, [StructField(value,IntegerType,false)], unresolveddeserializer(upcast(getcolumnbyordinal(0, IntegerType), IntegerType, - root class: "scala.Int"))
+- SerializeFromObject [input[0, int, false] AS value#57]
   +- MapElements <function1>, class scala.Tuple2, [StructField(_1,IntegerType,false), StructField(_2,IntegerType,false)], obj#56: int
      +- DeserializeToObject newInstance(class scala.Tuple2), obj#55: scala.Tuple2
         +- Relation[key#43,value#44] parquet

物理計劃(資料框):

*Project [key#43]
+- *Filter (isnotnull(key#43) && (key#43 = 1))
   +- *FileScan parquet [key#43] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_data], PartitionFilters: [], PushedFilters: [IsNotNull(key), EqualTo(key,1)], ReadSchema: struct<key:int>

物理計劃(資料集):

*SerializeFromObject [input[0, int, false] AS value#57]
+- *Filter <function1>.apply$mcZI$sp
   +- *MapElements <function1>, obj#56: int
      +- *DeserializeToObject newInstance(class scala.Tuple2), obj#55: scala.Tuple2
         +- *FileScan parquet [key#43,value#44] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_data], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:int,value:int>

Parallel transformations

Spark可以並行執行多個計算。這可以通過在驅動程式上啟動多個執行緒並在每個執行緒中發出一組轉換來輕鬆實現。然後,生成的任務將同時執行並共享應用程式的資源。這確保了資源永遠不會保持空閒(例如,在等待特定轉換的最後任務完成時)。預設情況下,任務以FIFO方式處理(在作業級別),但可以通過使用備用應用程式內排程程式來確保公平性(通過設定spark.scheduler.modeFAIR)。然後,期望執行緒通過將spark.scheduler.pool本地屬性(使用SparkContext.setLocalProperty)設定為適當的池名來設定其排程池。然後應在一箇中提供每池資源分配配置spark.scheduler.allocation.file設定定義的XML檔案(預設情況下,這是fairscheduler.xml在Spark的conf資料夾中)。

def input(i: Int) = sc.parallelize(1 to i*100000)
def serial = (1 to 10).map(i => input(i).reduce(_ + _)).reduce(_ + _)
def parallel = (1 to 10).map(i => Future(input(i).reduce(_ + _))).map(Await.result(_, 10.minutes)).reduce(_ + _)
計算 平均時間 閔。時間 最大。時間
序列 173.1ms 140ms的 336ms
平行 141ms 122ms 200毫秒

2.分割槽

大多數Spark作業遭遇的第二個問題是資料分割槽不足。為了使我們的計算有效,重要的是將我們的資料劃分為足夠大的分割槽,這些分割槽的大小盡可能接近(統一),以便Spark可以排程正在執行的各個任務。他們以不可知的方式仍然可以預測地執行。如果分割槽不統一,我們說分割槽是傾斜的。這可能由於多種原因以及我們計算的不同部分而發生。

分割槽偏斜的例子

從資料來源讀取時,我們的輸入可能已經傾斜。在RDD API中,這通常使用textFilewholeTextFiles方法完成,這些方法具有令人驚訝的不同分割槽行為。該textFile方法旨在從(通常較大的)檔案中讀取單獨的文字行,預設情況下將每個輸入檔案塊作為單獨的分割槽載入。它還提供了一個minPartitions引數,當大於塊數時,它會嘗試進一步拆分這些分割槽以滿足指定的值。另一方面,wholeTextFiles方法,用於讀取(通常較小的)檔案的全部內容,將相關檔案的塊按其在叢集內的實際位置組合到池中,預設情況下,為每個池建立一個分割槽(有關詳細資訊,請參閱Hadoop的CombineFileInputFormat,用於其實現)。minPartitions在這種情況下,引數控制這些池的最大大小(等於totalSize/minPartitions)。所有minPartitions引數的預設值為2.這意味著wholeTextFiles如果使用預設設定而不在叢集上明確管理資料位置,則更容易獲得非常少數量的分割槽。用於資料讀入RDDS其它方法包括其它格式,例如sequenceFilebinaryFilesbinaryRecords,以及通用的方法hadoopRDDnewAPIHadoopRDD採用自定義格式實現(允許自定義分割槽)。

在隨機邊界上,分割槽特徵經常發生變化。因此,暗示shuffle的操作提供了numPartitions指定新分割槽計數的引數(預設情況下,分割槽計數保持與原始RDD中的相同)。也可以通過shuffle引入Skew,尤其是在連線資料集時。

val input = sc.parallelize(1 to 1000, 42).keyBy(Math.min(_, 10))
val joined = input.cogroup(input)

加入分割槽大小

由於這些情況下的分割槽完全取決於所選鍵(特別是其Murmur3雜湊),因此必須注意避免為公共鍵建立異常大的分割槽(例如,空鍵是常見的特殊情況)。一種有效的解決方案是分離相關記錄,將鹽(隨機值)引入其鍵並在多個階段為它們執行後續操作(例如,減少)以獲得正確的結果。

val input1 = sc.parallelize(1 to 1000, 42).keyBy(Math.min(_, 10) + Random.nextInt(100) * 100)
val input2 = sc.parallelize(1 to 1000, 42).keyBy(Math.min(_, 10) + Random.nextInt(100) * 100)
val joined = input1.cogroup(input2)

鹽漬連線分割槽大小

有時甚至有更好的解決方案,例如,如果其中一個數據集足夠小,則使用地圖側連線。

val input = sc.parallelize(1 to 1000000, 42)
val lookup = Map(0 -> "a", 1 -> "b", 2 -> "c")
val joined = input.map(x => x -> lookup(x % 3))

DataFrames和Datasets

高階API共享一種分割槽資料的特殊方法。輸入檔案的所有資料塊都被新增到公共池中,就像在wholeTextFiles,但是根據兩個設定將池分成多個分割槽:spark.sql.files.maxPartitionBytes指定最大分割槽大小(預設為128MB),並spark.sql.files.openCostInBytes指定估計的成本以位元組為單位開啟一個可以讀取的新檔案(預設為4MB)。該框架將根據此資訊自動確定輸入資料的最佳分割槽。

在shuffle上進行分割槽時,遺憾的是,高階API非常缺乏(至少從Spark 2.2開始)。只能通過指定spark.sql.shuffle.partitions設定(預設為200)在作業級別上靜態指定分割槽數。

高階API可以自動將連線操作轉換為廣播連線。這是由控制的spark.sql.autoBroadcastJoinThreshold,它指定考慮廣播的表的最大大小(預設為10MB)spark.sql.broadcastTimeout,並控制執行者等待廣播表的時間(預設為5分鐘)。

重新分割槽

所有API還提供了兩種方法來操作分割槽數。第一個是repartition強制shuffle以便在指定數量的分割槽之間重新分配資料(通過前面提到的Murmur雜湊)。由於洗牌資料是一項代價高昂的操作,因此應儘可能避免重新分割槽。此操作還有更具體的變體:可排序對RDD repartitionAndSortWithinPartitions可以與自定義分割槽程式一起使用,而DataFrames和Datasets具有repartition列引數來控制分割槽特徵。

所有API提供的第二種方法coalescerepartition不重新整理資料更有效,但只指示Spark將幾個現有分割槽作為一個讀取。但是,這隻能用於減少分割槽數量,不能用於更改分割槽特徵。通常沒有理由使用它,因為Spark旨在利用大量的小分割槽,除了減少輸出檔案的數量或與一起使用時批量的數量foreachPartition(例如將結果傳送到資料庫) 。

3.序列化

正確處理的另一件事是序列化,它有兩種型別:資料序列化和閉包序列化。資料序列化是指對儲存在RDD中的實際資料進行編碼的過程,而閉包序列化是指相同的過程,但是對於外部引入計算的資料(如共享欄位或變數)。區分這兩者很重要,因為它們在Spark中的工作方式非常不同。

資料序列化

Spark支援兩種不同的序列化程式用於資料序列化。預設的是Java序列化,雖然它很容易使用(通過簡單地實現Serializable介面),效率非常低。這就是為什麼建議切換到第二個支援的序列化器Kryo,用於大多數生產用途。這是通過設定spark.serializerorg.apache.spark.serializer.KryoSerializer。Kryo效率更高,不需要實現類Serializable(因為它們是由Kryo的FieldSerializer序列化的)預設情況下)。但是,在非常罕見的情況下,Kryo可能無法序列化某些類,這是它仍然不是Spark的預設值的唯一原因。註冊所有預期要序列化的類也是一個好主意(Kryo將能夠使用索引而不是完整的類名來識別資料型別,減少序列化資料的大小,從而進一步提高效能)。

case class Test(a: Int = Random.nextInt(1000000),
                b: Double = Random.nextDouble,
                c: String = Random.nextString(1000),
                d: Seq[Int] = (1 to 100).map(_ => Random.nextInt(1000000))) extends Serializable

val input = sc.parallelize(1 to 1000000, 42).map(_ => Test()).persist(DISK_ONLY)
input.count() // Force initialization
val shuffled = input.repartition(43).count()
RDD 平均時間 閔。時間 最大。時間
java的 65990.9ms 64482ms 68148ms
KRYO 30196.5ms 28322ms 33012ms

Lineage(Java):

(42) MapPartitionsRDD[1] at map at <console>:25 [Disk Serialized 1x Replicated]
 |        CachedPartitions: 42; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 3.8 GB
 |   ParallelCollectionRDD[0] at parallelize at <console>:25 [Disk Serialized 1x Replicated]

Lineage(Kryo):

(42) MapPartitionsRDD[1] at map at <console>:25 [Disk Serialized 1x Replicated]
 |        CachedPartitions: 42; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 3.1 GB
 |   ParallelCollectionRDD[0] at parallelize at <console>:25 [Disk Serialized 1x Replicated]

DataFrames和Datasets

高階API在資料序列化方面效率更高,因為他們知道他們正在使用的實際資料型別。多虧了這一點,他們可以生成專門針對這些型別定製的優化序列化程式碼,以及Spark將在整個計算環境中使用它們的方式。對於某些轉換,它也可能只生成部分序列化程式碼(例如計數或陣列查詢)。此程式碼生成步驟是Project Tungsten的一個元件,它是使高階API具有高效能的重要組成部分。

值得注意的是,Spark可以在此過程中瞭解應用轉換的屬性,因為它可以傳播有關在整個作業圖中使用哪些列的資訊(謂詞下推)。在轉換中使用不透明函式(例如,資料集' mapfilter)時,此資訊將丟失。

val input = sc.parallelize(1 to 1000000, 42).map(_ => Test()).toDS.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
input.count() // Force initialization
val shuffled = input.repartition(43).count()
資料幀 平均時間 閔。時間 最大。時間
1102.9ms 912ms 1776ms

Lineage:

(42) MapPartitionsRDD[13] at rdd at <console>:30 []
 |   MapPartitionsRDD[12] at rdd at <console>:30 []
 |   MapPartitionsRDD[11] at rdd at <console>:30 []
 |   *SerializeFromObject [assertnotnull(input[0, $line16.$read$$iw$$iw$Test, true]).a AS a#5, assertnotnull(input[0, $line16.$read$$iw$$iw$Test, true]).b AS b#6, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, $line16.$read$$iw$$iw$Test, true]).c, true) AS c#7, newInstance(class org.apache.spark.sql.catalyst.util.GenericArrayData) AS d#8]
+- Scan ExternalRDDScan[obj#4]
 MapPartitionsRDD[4] at persist at <console>:27 []
 |       CachedPartitions: 42; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 3.2 GB
 |   MapPartitionsRDD[3] at persist at <console>:27 []
 |   MapPartitionsRDD[2] at persist at <console>:27 []
 |   MapPartitionsRDD[1] at map at <console>:27 []
 |   ParallelCollectionRDD[0] at parallelize at <console>:27 []

關閉序列化

在大多數Spark應用程式中,不僅需要序列化資料本身。還有在各個轉換中使用的外部欄位和變數。讓我們考慮以下程式碼片段:

val factor = config.multiplicationFactor
rdd.map(_ * factor)

這裡我們使用從應用程式配置載入的值作為計算本身的一部分。但是,由於轉換函式外部發生的所有事情都發生在驅動程式上,因此Spark必須將值傳輸到相關的執行程式。因此Spark計算所謂的函式閉包map包含它使用的所有外部值,序列化這些值並通過網路傳送它們。由於閉包可能非常複雜,因此決定僅在那裡支援Java序列化。因此,閉包的序列化比資料本身的序列化效率低,但是由於閉包僅針對每個轉換而不是每個轉換的每個執行器進行序列化,因此這通常不會導致效能問題。(然而,需要這些值來實現令人不快的副作用Serializable。)

閉包中的變數很容易跟蹤。使用欄位可能會有很多混亂。我們來看下面的例子:

class SomeClass(d: Int) extends Serializable {
  val c = 1
  val e = new SomeComplexClass

  def closure(rdd: RDD[Int], b: Int): RDD[Int] = {
    val a = 0
    rdd.map(_ + a + b + c + d)
  }
}

在這裡我們可以看到它a只是一個變數(就像factor之前一樣),因此被序列化為Intb是一個方法引數(也表現為變數),因此也被序列化為Int。但是c是一個類欄位,因此無法單獨序列化。這意味著為了序列化它,Spark需要SomeClass用它來序列化整個例項(所以它必須擴充套件Serializable,否則我們會得到一個執行時異常)。d由於建構函式引數在內部轉換為欄位,因此也是如此。因此,在這兩種情況下,星火也必須傳送的值cde為遺囑執行人。如e序列化的成本可能非常高,這絕對不是一個好的解決方案。我們可以通過避免閉包中的類欄位來解決這個問題:

class SomeClass(d: Int) {
  val c = 1
  val e = new SomeComplexClass

  def closure(rdd: RDD[Int], b: Int): RDD[Int] = {
    val a = 0
    val sum = a + b + c + d
    rdd.map(_ + sum)
  }
}

這裡我們通過將值儲存在區域性變數中來準備值sum。然後將其序列化為一個簡單的Int並且不會拖動整個例項SomeClass(因此它不再需要擴充套件Serializable)。

Spark還定義了一個特殊的構造,以便在我們需要為多個轉換序列化相同的值時提高效能。它被稱為廣播變數,並且在計算之前被序列化並僅傳送給所有執行器一次。這對於查詢表等大變數特別有用。

val broadcastMap = sc.broadcast(Map(0 -> "a", 1 -> "b", 2 -> "c"))
val input = sc.parallelize(1 to 1000000, 42)
val joined = input.map(x => x -> broadcastMap.value(x % 3))

Spark提供了一個有用的工具來確定名為SizeEstimator的記憶體中物件的實際大小,這可以幫助我們確定特定物件是否是廣播變數的良好候選物件。

4.記憶體管理

應用程式以有效的方式使用其記憶體空間非常重要。由於每個應用程式的記憶體要求不同,Spark將應用程式驅動程式和執行程式的記憶體劃分為多個部分,這些部分由適當的規則管理,並通過應用程式設定將其大小規範留給使用者。

司機記憶

Spark驅動程式記憶體圖

驅動程式的記憶體結構非常簡單。它僅使用其配置的所有記憶體(由spark.driver.memory設定控制,預設為1GB)作為其共享堆空間。在群集部署設定中,還添加了一個開銷,以防止YARN過早地使用過多資源來殺死驅動程式容器。

執行者記憶

Spark執行器記憶體圖

執行者需要將他們的記憶體用於幾個主要目的:當前轉換的中間資料(執行記憶體),快取的持久資料(儲存記憶體)和轉換中使用的自定義資料結構(使用者記憶體)。由於Spark可以計算每個儲存記錄的實際大小,因此它能夠監視執行和儲存部分並做出相應的反應。執行記憶體的大小通常非常不穩定,需要立即執行,而儲存記憶體使用壽命更長,更穩定,通常可以逐出磁碟,應用程式通常只需要整個計算的某些部分(有時根本不需要) )。因此,Spark為兩者定義了共享空間,優先考慮執行記憶體。所有這些都由幾個設定控制:spark.executor.memory(預設為1GB)定義可用堆空間的總大小,spark.memory.fraction設定(預設為0.6)定義執行和儲存共享的記憶體的一小部分堆(減去300MB緩衝區)spark.memory.storageFraction(預設為0.5)定義了執行不可儲存的儲存記憶體部分。以最適合您的應用的方式定義它們很有用。例如,如果應用程式大量使用快取資料並且不使用過多的聚合,則可以增加儲存記憶體的比例以適應將所有快取資料儲存在RAM中,從而加快資料的讀取速度。另一方面,如果應用程式使用昂貴的聚合並且不太依賴於快取,則增加執行記憶體可以通過逐出不需要的快取資料來改進計算本身。此外,請記住,您的自定義物件必須適合使用者記憶體。

Spark還可以使用堆外記憶體進行儲存和部分執行,這由設定spark.memory.offHeap.enabled(預設為false)和spark.memory.offHeap.size(預設為0)和OFF_HEAP永續性級別控制。這可以減輕垃圾收集暫停。

DataFrames和資料集

作為Project Tungsten的一部分,高階API使用自己的記憶體管理方式。由於資料型別是框架已知的,並且它們的生命週期定義得非常好,因此可以通過預先分配記憶體塊並明確地對這些塊進行微管理來完全避免垃圾收集。這樣可以很好地重用已分配的記憶體,從而有效地消除了執行記憶體中垃圾收集的需要。這種優化實際上執行良好,使得堆外記憶體幾乎沒有額外的好處(儘管仍有一些)。

5.叢集資源

通常導致效能降低的最後一個重點是群集資源分配不足。這需要多種形式,從低效使用資料區域性性,處理分散執行程式到防止在不需要時佔用叢集資源。

Data locality

為了獲得良好的效能,我們的應用程式的計算應儘可能接近實際資料,以避免不必要的傳輸。這意味著在同樣儲存資料本身的機器上執行執行程式是一個非常好的主意。使用HDFS時,Spark可以以最大化此概率的方式優化執行程式的分配。但是,我們可以通過良好的設計進一步提高這一點。

我們可以通過增加單個執行器的資源來減少所需的節點間通訊量,同時減少執行器的總數,從而基本上強制任務由有限數量的節點處理。採用以下示例資源分配:

num_executors executor_cores executor_memory
15 1 1克
3 3克
3 5克

在所有情況下,我們將使用相同數量的資源(15核和15GB記憶體)。但是,隨著我們減少執行程式的總數,我們也減少了在它們之間傳輸資料的需要。制定第三種選擇通常是最快的。另一方面,節點級別的I / O吞吐量可能存在限制,具體取決於所請求的操作,因此我們無法無限期地增加它。例如,對於HDFS I / O,每個執行器的核心數量被認為在效能上達到峰值,大約為5。

我們還可以使用spark.locality.wait設定(預設為3秒)及其子部分(spark.locality.wait預設情況下相同)從群集中讀取資料時調整Spark的區域性性配置。這些定義了基於位置的排程的超時(在到達時降低了位置限制)。

Dynamic allocation

顯式應用程式範圍的執行程式分配可能有其缺點。在某些情況下,我們可能不希望在整個計算期間擁有統一數量的執行程式,而是希望進行一些擴充套件。在給定時間叢集上可用的資源不足,但是我們想要執行我們的計算,我們可能正在處理需要更少資源並且不想比我們需要的更多的轉換,等等。這是其中,動態分配的用武之地。

通過動態分配(通過設定spark.dynamicAllocation.enabled為true 啟用)Spark通過嘗試分配儘可能多的執行程式(最多為給定階段的最大並行度或spark.dynamicAllocation.maxExecutors預設為無窮大)來開始每個階段,其中第一階段必須至少得到spark.dynamicAllocation.initialExecutors(相同於spark.dynamicAllocation.minExecutorsspark.executor.instances預設情況下)。

在計算過程中,如果執行程式空閒超過spark.dynamicAllocation.executorIdleTimeout(預設為60秒),它將被刪除(除非它會使執行程式的數量低於spark.dynamicAllocation.minExecutors(預設為0)。這可確保我們的應用程式在執行時不會不必要地佔用叢集資源更便宜的轉型。

為了能夠啟用動態分配,我們還必須啟用Spark的外部shuffle服務。它充當在群集中的每臺計算機上執行的單獨伺服器,當適當的執行程式不再存在(已被刪除或丟失)時,該計算機能夠管理隨機檔案。這在丟失執行者的情況下也是有益的(例如由於先發制人),因為不必重新計算所討論的混洗資料。

Speculative execution

有時,即使我們正確地執行了所有操作,由於我們無法控制的情況(與Spark無關的重負載,硬體故障等),我們仍可能在特定計算機上的效能不佳。對於這些情況,我們可能會指示Spark在檢測到此類落後者後自動重新執行任務。為此,請啟用該spark.speculation設定。可以使用以下設定來配置檢測例程:spark.speculation.interval定義檢查落後者的頻率(預設為100毫秒),spark.speculation.multiplier定義落後者必須慢多少倍(預設為1.5)並spark.speculation.quantile定義必須執行的任務的分數。完成,直到檢測程式啟動(預設為0.75)。

結論

正如您所看到的,為效能設計Spark應用程式可能非常具有挑戰性,並且每一步都會增加複雜性,降低通用性或延長特定用例的分析。幸運的是,很少需要實現所有這些,因為無論如何典型的Spark應用程式都不是效能敏感的。此外,只需使用高階API(DataFrames或Datasets)即可實現很多功能。儘管在開發過程中必須儘早做出使用它們的決定,因為切換它們並非易事。

此外,還有許多其他技術可以幫助您進一步提高Spark作業的效能。即GC調整,適當的硬體配置和調整Spark的眾多配置選項