1. 程式人生 > >Spark Programming Guide(四)

Spark Programming Guide(四)

Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

某些操作將會引發spark的“shuffle”操作。“shuffle”是spark重新分配資料的機制,將資料重新分組到不同的分割槽。這通常涉及到在執行器和機器之間複製資料,使shuffle成為複雜而昂貴的操作。

Background

To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result

為了能更好的理解“shuffle”過程中發生了什麼我們可以以reduceByKey為例來分析
。reduceByKey操作將具備相同鍵值的元素執行reduce函式聚合到一個tuple中生成一個新的RDD。難題是一個key對應的所有值並不一定在同一個分割槽裡,甚至可能不在同一臺機器上,但是它們必須被共同計算。

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.

在spark中,一些特定的操作需要資料不跨分割槽分佈。在計算期間,一個任務在一個分割槽上執行,為了所有資料都在單個 reduceByKey 的 reduce 任務上執行,我們需要執行一個 all-to-all 操作。它必須從所有分割槽讀取所有的 key 和 key對應的所有的值,並且跨分割槽聚集去計算每個 key 的結果 - 這個過程就叫做 shuffle。

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:

儘管每個分割槽新 shuffle 的資料集將是確定的,分割槽本身的順序也是這樣,但是這些資料的順序是不確定的。如果希望 shuffle 後的資料是有序的,可以使用:

mapPartitions to sort each partition using, for example, .sorted
repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning
sortBy to make a globally ordered RDD

mapPartitions 對每個 partition 分割槽進行排序,例如, .sorted
repartitionAndSortWithinPartitions 在分割槽的同時對分割槽進行高效的排序.
sortBy 對 RDD 進行全域性的排序

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

觸發的 shuffle 操作包括 repartition 操作,如 repartition 和 coalesce, ‘ByKey 操作 (除了 counting 之外) 像 groupByKey 和 reduceByKey, 和 join 操作, 像 cogroup 和 join.

Performance Impact

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations

Shuffle是一個代價高昂的操作,涉及了磁碟I/O,資料序列化,網路I/O。為了組織shuffle的資料,Spark將生成任務集 - map任務組織資料,reduce任務做資料的聚合。這些術語來自 MapReduce,跟 Spark 的 map 操作和 reduce 操作沒有關係。

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

在內部,一個 map 任務的所有結果資料會儲存在記憶體,直到記憶體不能全部儲存為止。然後,這些資料將基於目標分割槽進行排序並寫入一個單獨的檔案中。在 reduce 時,任務將讀取相關的已排序的資料塊。

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and ‘ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

某些 shuffle 操作會大量消耗堆記憶體空間,因為 shuffle 操作在資料轉換前後,需要在使用記憶體中的資料結構對資料進行組織。需要特別說明的是,reduceByKey 和 aggregateByKey 在 map 時會建立這些資料結構,’ByKey 操作在 reduce 時建立這些資料結構。當記憶體滿的時候,Spark 會把溢位的資料存到磁碟上,這將導致額外的磁碟 I/O 開銷和垃圾回收開銷的增加。

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period of time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.

shuffle 操作還會在磁碟上生成大量的中間檔案。在 Spark 1.3 中,這些檔案將會保留至對應的 RDD 不在使用並被垃圾回收為止。這麼做的好處是,如果在 Spark 重新計算 RDD 的血統關係(lineage)時,shuffle 操作產生的這些中間檔案不需要重新建立。如果 Spark 應用長期保持對 RDD 的引用,或者垃圾回收不頻繁,這將導致垃圾回收的週期比較長。這意味著,長期執行 Spark 任務可能會消耗大量的磁碟空間。臨時資料儲存路徑可以通過 SparkContext 中設定引數 spark.local.dir 進行配置。

Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.

shuffle 操作的行為可以通過調節多個引數進行設定。詳細的說明請看 Spark 配置指南 中的 “Shuffle 行為” 部分。