spark——sparkCore原始碼解析之RangePartitioner
HashPartitioner分割槽可能導致每個分割槽中資料量的不均勻。而RangePartitioner分割槽則儘量保證每個分割槽中資料量的均勻,將一定範圍內的數對映到某一個分割槽內。分割槽與分割槽之間資料是有序的,但分割槽內的元素是不能保證順序的。
RangePartitioner分割槽執行原理:
-
計算總體的資料抽樣大小sampleSize,計算規則是:至少每個分割槽抽取20個數據或者最多1M的資料量。
-
根據sampleSize和分割槽數量計算每個分割槽的資料抽樣樣本數量最大值sampleSizePrePartition
-
根據以上兩個值進行水塘抽樣,返回RDD的總資料量,分割槽ID和每個分割槽的取樣資料。
-
計算出資料量較大的分割槽通過RDD.sample進行重新抽樣。
-
通過抽樣陣列 candidates: ArrayBuffer[(K, wiegth)]計算出分割槽邊界的陣列BoundsArray
-
在取資料時,如果分割槽數小於128則直接獲取,如果大於128則通過二分法,獲取當前Key屬於那個區間,返回對應的BoundsArray下標即為partitionsID
1. 獲取區間陣列
1.1. 給定樣本總數
給定總的資料抽樣大小,最多1M的資料量(10^6),最少20倍的RDD分割槽數量,也就是每個RDD分割槽至少抽取20條資料
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize = math.min(20.0 * partitions, 1e6) }
1.2. 計算樣本最大值
RDD各分割槽中的資料量可能會出現傾斜的情況,乘於3的目的就是保證資料量小的分割槽能夠取樣到足夠的資料,而對於資料量大的分割槽會進行第二次取樣
class RangePartitioner(partitions,rdd) { // 1. 計算樣本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 計算樣本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt }
1.3. 水塘抽樣
根據以上兩個值進行水塘抽樣,返回RDD的總資料量,分割槽ID和每個分割槽的取樣資料。其中總資料量是估計值,不是通過rdd.count計算得到的
class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
val sampleSize = math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition =
math.ceil(
3.0 * sampleSize / rdd.partitions.length
).toInt
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
rdd.map(_._1), sampleSizePerPartition)
}
1.4. 是否需要二次取樣
如果有較大RDD存在,則按照平均值去取樣的話資料量太少,容易造成資料傾斜,所以需要進行二次取樣
判斷是否需要重新取樣方法:
樣本數量佔比乘以當前RDD的總行數大於預設的每個RDD最大抽取數量,說明這個RDD的資料量比較大,需要取樣更多的資料:eg: 0.2100=20<60;0.220000=2000>60
class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
val sampleSize = math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition =
math.ceil(
3.0 * sampleSize / rdd.partitions.length
).toInt
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
rdd.map(_._1), sampleSizePerPartition)
// 4. 是否需要二次取樣
val imbalancedPartitions = mutable.Set.empty[Int]
if (fraction * n > sampleSizePerPartition) {
// 記錄需要重新取樣的RDD的ID
imbalancedPartitions += idx
}
1.5. 計算樣本權重
計算每個取樣資料的權重佔比,根據取樣資料的ID和權重生成出RDD分割槽邊界陣列
權重計算方法:總資料量/當前RDD的取樣資料量
class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
val sampleSize = math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition =
math.ceil(
3.0 * sampleSize / rdd.partitions.length
).toInt
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
rdd.map(_._1), sampleSizePerPartition)
// 4. 是否需要二次取樣
val imbalancedPartitions = mutable.Set.empty[Int]
if (fraction * n > sampleSizePerPartition) {
// 記錄需要重新取樣的RDD的ID
imbalancedPartitions += idx
}else{
// 5. 計算樣本權重
val weight = (
// 取樣資料的佔比
n.toDouble / sample.length).toFloat
for (key <- sample) {
// 記錄取樣資料key和權重
candidates += ((key, weight))
}
}
}
1.6. 二次抽樣
對於資料分佈不均衡的RDD分割槽,重新進行二次抽樣。
二次抽樣採用的是RDD的取樣方法:RDD.sample
class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
val sampleSize = math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition =
math.ceil(
3.0 * sampleSize / rdd.partitions.length
).toInt
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
rdd.map(_._1), sampleSizePerPartition)
// 4. 是否需要二次取樣
val imbalancedPartitions = mutable.Set.empty[Int]
if (fraction * n > sampleSizePerPartition) {
// 記錄需要重新取樣的RDD的ID
imbalancedPartitions += idx
}else{
// 5. 計算樣本權重
val weight = (
// 取樣資料的佔比
n.toDouble / sample.length).toFloat
for (key <- sample) {
// 記錄取樣資料key和權重
candidates += ((key, weight))
}
}
// 6. 對於資料分佈不均衡的RDD分割槽,重新資料抽樣
if (imbalancedPartitions.nonEmpty) {
// 利用rdd的sample抽樣函式API進行資料抽樣
val reSampled = imbalanced.sample(
withReplacement = false, fraction, seed).collect()
}
1.7. 生成邊界陣列
將最終的抽樣資料計算出分割槽邊界陣列返回,邊界數組裡面存放的是RDD裡面資料的key值,
比如最終返回的陣列是:array[0,10,20,30..]
其中0,10,20,30是取樣資料中的key值,對於每一條資料都會判斷其在此陣列的那個區間中間,例如有一條資料key值是3則其在0到10之間,屬於第一個分割槽,同理Key值為15的資料在第二個分割槽
class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
val sampleSize = math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition =
math.ceil(
3.0 * sampleSize / rdd.partitions.length
).toInt
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
rdd.map(_._1), sampleSizePerPartition)
// 4. 是否需要二次取樣
val imbalancedPartitions = mutable.Set.empty[Int]
if (fraction * n > sampleSizePerPartition) {
// 記錄需要重新取樣的RDD的ID
imbalancedPartitions += idx
}else{
// 5. 計算樣本權重
val weight = (
// 取樣資料的佔比
n.toDouble / sample.length).toFloat
for (key <- sample) {
// 記錄取樣資料key和權重
candidates += ((key, weight))
}
}
// 6. 對於資料分佈不均衡的RDD分割槽,重新資料抽樣
if (imbalancedPartitions.nonEmpty) {
// 利用rdd的sample抽樣函式API進行資料抽樣
val reSampled = imbalanced.sample(
withReplacement = false, fraction, seed).collect()
}
// 7. 生成邊界陣列
RangePartitioner.determineBounds(candidates, partitions)
}
2. 水塘抽樣演算法
水塘抽樣概念:
它是一系列的隨機演算法,其目的在於從包含n個專案的集合S中選取k個樣本,使得每條資料抽中的概率是k/n。其中n為一很大或未知的數量,尤其適用於不能把所有n個專案都存放到主記憶體的情況
我們可以:定義取出的行號為choice,第一次直接以第一行作為取出行 choice ,而後第二次以二分之一概率決定是否用第二行替換 choice ,第三次以三分之一的概率決定是否以第三行替換 choice ……,以此類推。由上面的分析我們可以得出結論,在取第n個數據的時候,我們生成一個0到1的隨機數p,如果p小於1/n,保留第n個數。大於1/n,繼續保留前面的數。直到資料流結束,返回此數,演算法結束。
詳見:https://www.iteblog.com/archives/1525.html
3. 定位分割槽ID
如果分割槽邊界陣列的大小小於或等於128的時候直接變數陣列,否則採用二分查詢法確定key屬於某個分割槽。
3.1. 陣列直接獲取
遍歷陣列,判斷當前key值是否屬於當前區間
// 根據RDD的key值返回對應的分割槽id。從0開始
def getPartition(key: Any): Int = {
// 強制轉換key型別為RDD中原本的資料型別
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// 如果分割槽資料小於等於128個,那麼直接本地迴圈尋找當前k所屬的分割槽下標
// ordering.gt(x,y):如果x>y,則返回true
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
3.2. 二分法查詢
對於分割槽數大於128的情況,取樣二分法查詢
// 根據RDD的key值返回對應的分割槽id。從0開始
def getPartition(key: Any): Int = {
// 如果分割槽數量大於128個,那麼使用二分查詢方法尋找對應k所屬的下標;
// 但是如果k在rangeBounds中沒有出現,實質上返回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分割槽,比所有資料都大)
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}