1. 程式人生 > >spark2原理分析-RDD的Partitioner原理分析

spark2原理分析-RDD的Partitioner原理分析

概述

本文介紹了Spark分割槽的實現原理,並對其原始碼進行了分析。

分割槽器(Partitioner)的基本概念

關於Spark分割槽的基本概念和介紹,可以參考我的這篇文章:Spark2.0-RDD分割槽原理分析。這裡我們再回顧一下Spark分割槽的概念:

從概念上講,分割槽器(Partitioner)定義瞭如何分佈記錄,從而定義每個任務將處理哪些記錄。

從實現層面來說,Partitioner是一個帶有以下兩個方法的抽象類:numPartitions和getPartition。該類的定義如下:

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}
  • numPartitions:定義分割槽後RDD中的分割槽數。
  • getPartition:定義從key到分割槽的整數索引的對映,其中應該傳送具有該key的記錄。

Spark提供的分割槽器(Partitioner)物件有兩種實現:HashPartitioner和RangePartitioner(在2.3中有更多的實現)。如果這些都不滿足需要,可以自定義分割槽程式。

HashPartitioner

當RDD沒有Partitioner時,會把HashPartitioner作為預設的Partitioner。它通過計算key的hashcode來對資料進行分割槽。該類的實現程式碼如下:

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  // 確定的分割槽的數量
  def numPartitions: Int = partitions

  // key到分割槽id的對映,這裡是通過取模的方式實現
  def getPartition(key: Any): Int = key match {
    case null => 0
    // 取模運算:hashcode%分割槽數
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  // 重新定義equal函式,若是HashPartitioner且分割槽數相等,返回true
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }
  // 把HashPartitioner的hashCode設定為分割槽數
  override def hashCode: Int = numPartitions
}

注意:傳給HashPartitioner(partitions: Int)的引數partitions不能為負。

HashPartitioner具有以下特點:

  • HashPartitioner根據key的雜湊值(hashcode)確定子分割槽的索引位置。
  • HashPartitioner需要一個分割槽引數,該引數確定輸出RDD中的分割槽數和雜湊函式中使用的分割槽數。若沒有指定該引數,Spark則使用SparkConf中spark.default.parallelism值的值來確定分割槽數。
  • 若沒有設定預設並行度值(spark.default.parallelism引數的值),則Spark預設為RDD在其血緣(lineage)中具有的最大分割槽數。
  • 在使用HashPartitioner的寬轉換(wide transform)(例如aggregateByKey)中,可選的分割槽數引數用作雜湊分割槽程式的引數。

RangePartitioner

RangePartitioner(範圍分割槽)將其key位於相同範圍內的記錄分配給給定分割槽。排序需要RangePartitioner,因為RangePartitioner能夠確保:通過對給定分割槽內的記錄進行排序,完成整個RDD的排序。

RangePartitioner首先通過取樣確定每個分割槽的範圍邊界:優化跨分割槽的記錄進行均勻分佈。然後,RDD中的每個記錄將被shuffled到其範圍界限內包括該key的分割槽。

高度不平衡的資料(即,某些key的許多值而不是其他key,如果key的分佈不均勻)會使取樣不準確,不均勻的分割槽可能導致下游任務比其他任務更慢,而導致整個任務變慢。

如果與某個關鍵字相關聯的所有記錄的重複key太多而被分配到一個執行器(executor),則範圍分割槽(如雜湊分割槽)可能會導致記憶體錯誤。與排序相關的效能問題通常是由範圍分割槽步驟的這些問題引起的。

使用Spark建立RangePartitioner不僅需要分割槽數量的引數,還需要實際的RDD,用來獲取樣本。 RDD必須是元組,並且key必須具有已定義的順序。

實際上,取樣需要部分評估RDD,從而導致執行圖(graph)中斷。 因此,範圍分割槽實際上既是轉換(transformation)操作又是action(動作)操作。 在範圍分割槽中取樣需要消耗資源,有一定成本,通常,RangePartitioner(範圍分割槽)比HashPartitioner(雜湊分割槽)更耗效能。由於key要求被排序,這樣就無法在元組的所有RDD上進行範圍分割槽。

因此,鍵/值操作(例如聚合)需要使用HashPartitioner作為預設值,這些操作需要每個key都位於同一臺機器上但不以特定方式排序的記錄。但是,也可以使用自定義分割槽程式或範圍分割槽程式執行這些方法。

RangePartitioner的實現:

成員變數 說明
ascending 定義分割槽資料的排序方式,預設是升序。定義如下:private var ascending: Boolean = true
samplePointsPerPartitionHint 每個分割槽具有的樣本數目。
partitions 分割槽數,可以為0。當該引數為0時,表示對空RDD進行排序。
ordering 排序需要用到的工具類,定義了:大於,小於等操作
rangeBounds 儲存分割槽數的,帶上限的陣列
numPartitions 該RDD的分割槽數量。
binarySearch 獲取一個二分查詢的物件。由於key是可排序的,所以使用二分加快查詢效能。
getPartition 獲取分割槽對應的索引id

自定義分割槽器(Partitioner)

通過繼承Partitioner抽象類,可以定製自己的分割槽器。要定義自己的分割槽器可能需要實現以下函式:

成員名 說明
numPartitions 分割槽數
getPartition key作為引數(與被分割槽的RDD型別相同),返回表示分割槽索引的整數,該分割槽指定具有該key的記錄所在的位置。 返回的整數必須介於零和分割槽數之間(在numPartitions定義)。
equals 定義兩個分割槽是否相等的方法。
hashcode 僅當重寫了equals方法時才需要此方法。 HashPartitioner的hashcode就是它的分割槽數。 RangePartitioner的hashcode是從範圍邊界派生的雜湊函式。

總結

本文介紹了RDD的Partitioner原理,並對其實現進行了簡要分析。