1. 程式人生 > >spark——sparkCore原始碼解析之RangePartitioner

spark——sparkCore原始碼解析之RangePartitioner

img

HashPartitioner分割槽可能導致每個分割槽中資料量的不均勻。而RangePartitioner分割槽則儘量保證每個分割槽中資料量的均勻,將一定範圍內的數對映到某一個分割槽內。分割槽與分割槽之間資料是有序的,但分割槽內的元素是不能保證順序的。

  RangePartitioner分割槽執行原理:

  1. 計算總體的資料抽樣大小sampleSize,計算規則是:至少每個分割槽抽取20個數據或者最多1M的資料量。

  2. 根據sampleSize和分割槽數量計算每個分割槽的資料抽樣樣本數量最大值sampleSizePrePartition

  3. 根據以上兩個值進行水塘抽樣,返回RDD的總資料量,分割槽ID和每個分割槽的取樣資料。

  4. 計算出資料量較大的分割槽通過RDD.sample進行重新抽樣。

  5. 通過抽樣陣列 candidates: ArrayBuffer[(K, wiegth)]計算出分割槽邊界的陣列BoundsArray

  6. 在取資料時,如果分割槽數小於128則直接獲取,如果大於128則通過二分法,獲取當前Key屬於那個區間,返回對應的BoundsArray下標即為partitionsID

1. 獲取區間陣列

1.1. 給定樣本總數

給定總的資料抽樣大小,最多1M的資料量(10^6),最少20倍的RDD分割槽數量,也就是每個RDD分割槽至少抽取20條資料


class RangePartitioner(partitions,rdd) {

// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6) 
}

1.2. 計算樣本最大值

RDD各分割槽中的資料量可能會出現傾斜的情況,乘於3的目的就是保證資料量小的分割槽能夠取樣到足夠的資料,而對於資料量大的分割槽會進行第二次取樣


class RangePartitioner(partitions,rdd) {
	

// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt

}

 

1.3. 水塘抽樣

根據以上兩個值進行水塘抽樣,返回RDD的總資料量,分割槽ID和每個分割槽的取樣資料。其中總資料量是估計值,不是通過rdd.count計算得到的


class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt

    
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition)

}

1.4. 是否需要二次取樣

如果有較大RDD存在,則按照平均值去取樣的話資料量太少,容易造成資料傾斜,所以需要進行二次取樣

判斷是否需要重新取樣方法:

樣本數量佔比乘以當前RDD的總行數大於預設的每個RDD最大抽取數量,說明這個RDD的資料量比較大,需要取樣更多的資料:eg: 0.2100=20<60;0.220000=2000>60


class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt  
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition)
    
    
// 4. 是否需要二次取樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄需要重新取樣的RDD的ID
	imbalancedPartitions += idx 

}

1.5. 計算樣本權重

計算每個取樣資料的權重佔比,根據取樣資料的ID和權重生成出RDD分割槽邊界陣列

權重計算方法:總資料量/當前RDD的取樣資料量


class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt  
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition) 
// 4. 是否需要二次取樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄需要重新取樣的RDD的ID
	imbalancedPartitions += idx

}else{

     
// 5. 計算樣本權重
	val weight = (
	  // 取樣資料的佔比
		n.toDouble / sample.length).toFloat 
            for (key <- sample) {
			// 記錄取樣資料key和權重
              candidates += ((key, weight))
            }
	}
}

1.6. 二次抽樣

對於資料分佈不均衡的RDD分割槽,重新進行二次抽樣。

二次抽樣採用的是RDD的取樣方法:RDD.sample


class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt  
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition) 
// 4. 是否需要二次取樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄需要重新取樣的RDD的ID
	imbalancedPartitions += idx

}else{  
// 5. 計算樣本權重
	val weight = (
	  // 取樣資料的佔比
		n.toDouble / sample.length).toFloat 
            for (key <- sample) {
			// 記錄取樣資料key和權重
              candidates += ((key, weight))
            }
	}

    
// 6. 對於資料分佈不均衡的RDD分割槽,重新資料抽樣
if (imbalancedPartitions.nonEmpty) {
	// 利用rdd的sample抽樣函式API進行資料抽樣
   val reSampled = imbalanced.sample(
    withReplacement = false, fraction, seed).collect()
}


1.7. 生成邊界陣列

將最終的抽樣資料計算出分割槽邊界陣列返回,邊界數組裡面存放的是RDD裡面資料的key值,

比如最終返回的陣列是:array[0,10,20,30..]

其中0,10,20,30是取樣資料中的key值,對於每一條資料都會判斷其在此陣列的那個區間中間,例如有一條資料key值是3則其在0到10之間,屬於第一個分割槽,同理Key值為15的資料在第二個分割槽


class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize =  math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = 
	math.ceil(

		3.0 * sampleSize / rdd.partitions.length

	).toInt  
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition) 
// 4. 是否需要二次取樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄需要重新取樣的RDD的ID
	imbalancedPartitions += idx

}else{  
// 5. 計算樣本權重
	val weight = (
	  // 取樣資料的佔比
		n.toDouble / sample.length).toFloat 
            for (key <- sample) {
			// 記錄取樣資料key和權重
              candidates += ((key, weight))
            }
	}
// 6. 對於資料分佈不均衡的RDD分割槽,重新資料抽樣
if (imbalancedPartitions.nonEmpty) {
	// 利用rdd的sample抽樣函式API進行資料抽樣
   val reSampled = imbalanced.sample(
    withReplacement = false, fraction, seed).collect()
}

    
// 7. 生成邊界陣列
RangePartitioner.determineBounds(candidates, partitions)

}

2. 水塘抽樣演算法

水塘抽樣概念:

它是一系列的隨機演算法,其目的在於從包含n個專案的集合S中選取k個樣本,使得每條資料抽中的概率是k/n。其中n為一很大或未知的數量,尤其適用於不能把所有n個專案都存放到主記憶體的情況

我們可以:定義取出的行號為choice,第一次直接以第一行作為取出行 choice ,而後第二次以二分之一概率決定是否用第二行替換 choice ,第三次以三分之一的概率決定是否以第三行替換 choice ……,以此類推。由上面的分析我們可以得出結論,在取第n個數據的時候,我們生成一個0到1的隨機數p,如果p小於1/n,保留第n個數。大於1/n,繼續保留前面的數。直到資料流結束,返回此數,演算法結束。

詳見:https://www.iteblog.com/archives/1525.html

3. 定位分割槽ID

如果分割槽邊界陣列的大小小於或等於128的時候直接變數陣列,否則採用二分查詢法確定key屬於某個分割槽。

3.1. 陣列直接獲取

遍歷陣列,判斷當前key值是否屬於當前區間


// 根據RDD的key值返回對應的分割槽id。從0開始

def getPartition(key: Any): Int = {

​    // 強制轉換key型別為RDD中原本的資料型別

​    val k = key.asInstanceOf[K]

​    var partition = 0

​    if (rangeBounds.length <= 128) {

​      // 如果分割槽資料小於等於128個,那麼直接本地迴圈尋找當前k所屬的分割槽下標

​      // ordering.gt(x,y):如果x>y,則返回true

​      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {

​        partition += 1

​      }

 

3.2. 二分法查詢

對於分割槽數大於128的情況,取樣二分法查詢


// 根據RDD的key值返回對應的分割槽id。從0開始

 def getPartition(key: Any): Int = {

// 如果分割槽數量大於128個,那麼使用二分查詢方法尋找對應k所屬的下標;

     // 但是如果k在rangeBounds中沒有出現,實質上返回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分割槽,比所有資料都大)

     // Determine which binary search method to use only once.

     partition = binarySearch(rangeBounds, k)

     // binarySearch either returns the match location or -[insertion point]-1

     if (partition < 0) {

       partition = -partition-1

     }

     if (partition > rangeBounds.length) {

       partition = rangeBounds.length

     }

檢視完整原始碼...