結合原始碼徹底講解 Aggregate vs treeAggregate
Aggregate
本文主要是講解兩個常見的聚合操作:aggregate vs treeAggregate
首先講解aggregate,該函式的方法具體名稱如下:
def aggregate [ U : ClassTag](zeroValue: U )( seqOp: ( U , T ) => U , combOp: ( U , U ) => U ): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils. clone (zeroValue , sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator [ T ]) => it.aggregate(zeroValue)(cleanSeqOp , cleanCombOp)
val mergeResult = (index: Int, taskResult: U ) => jobResult = combOp(jobResult , taskResult)
sc.runJob( this , aggregatePartition , mergeResult)
jobResult
}
引數定義:
首先可以看到,有個U型別的引數叫做zeroValue,然後有兩個方法引數,第一個是seqOp: (U, T) => U將U和T型別的資料轉化為T型別的資料,第二個函式combOp: (U, U) => U將兩個U型別的資料轉化為U型別,返回的是一個U型別的資料。
引數作用:
zeroValue是給定的初始值,該值將會在seqOp和combOp兩個函式中都使用。
seqOp在Executor端對每個分割槽進行操作,會用到初始值zeroValue。
combOp在driver端執行,也會用到初始值。
原始碼簡介:
片段一:
val aggregatePartition = (it: Iterator [ T ]) => it.aggregate(zeroValue)(cleanSeqOp , cleanCombOp)
這個原始碼就是針對每個RDD分割槽,進行執行的時候的函式,因為實際上每個分割槽最終都是一個迭代器,然後執行迭代器的aggregate,引數也是我們給定的引數。Iterator 的aggregate方法實際上三個引數是沒有用到的,也即CombOp沒有用到。
片段二:
val mergeResult = (index: Int, taskResult: U ) => jobResult = combOp(jobResult , taskResult)
該段程式碼是在Driver端執行combOp操作。
具體的執行邏輯不是本文要講解的主要內容,後面有機會浪尖會逐步給大家分析。
由上面我們可以總結,aggregate執行結構圖,如下:
這種聚合操作是有缺陷的,就是所有SeqOp操作對分割槽的執行結果都只能全部返回給Driver端,然後在對返回的結果和初始值執行CombOp操作,這樣資料量大的時候很容易導致Driver端記憶體溢位,所以,就出現了優化函式treeAggregate。
treeAggregate
treeAggregate函式的具體內容如下:
def treeAggregate [ U : ClassTag](zeroValue: U )(
seqOp: ( U , T ) => U ,
combOp: ( U , U ) => U ,
depth: Int = 2 ): U = withScope {
require (depth >= 1 , s"Depth must be greater than or equal to 1 but got $ depth ." )
if (partitions.length == 0 ) {
Utils. clone (zeroValue , context.env.closureSerializer.newInstance())
} else {
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator [ T ]) => it.aggregate(zeroValue)(cleanSeqOp , cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator (aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math. max (math. ceil (math. pow (numPartitions , 1.0 / depth)).toInt , 2 )
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
// Don't trigger TreeAggregation when it doesn't save wall-clock time
while (numPartitions > scale + math. ceil (numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i , iter) => iter.map((i % curNumPartitions , _))
}.reduceByKey( new HashPartitioner(curNumPartitions) , cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
}
}
引數定義:
首先可以看到,有個U型別的引數叫做zeroValue,然後有兩個方法引數,第一個是seqOp: (U, T) => U將U和T型別的資料轉化為T型別的資料,第二個函式combOp: (U, U) => U將兩個U型別的資料轉化為U型別,返回的是一個U型別的資料。
引數作用:
zeroValue是給定的初始值,該值將會在seqOp和combOp兩個函式中都使用。
seqOp在Executor端對每個分割槽進行操作,會用到初始值zeroValue。
combOp在Executor端和driver端都會執行,不會用到初始值。
原始碼簡介:
片段一:
val aggregatePartition =
(it: Iterator [ T ]) => it.aggregate(zeroValue)(cleanSeqOp , cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
在Executor端執行的第一層任務,主要操作是對源資料和初始值zeroValue執行seqOp操作。
片段二:
var numPartitions = partiallyAggregated.partitions.length
val scale = math. max (math. ceil (math. pow (numPartitions , 1.0 / depth)).toInt , 2 )
while (numPartitions > scale + math. ceil (numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i , iter) => iter.map((i % curNumPartitions , _))
}.reduceByKey( new HashPartitioner(curNumPartitions) , cleanCombOp).values
}
在執行完成第一層任務之後,執行combOp操作,主要是逐漸降低分割槽數,來逐層進行combOp操作,該操作是在Executor端執行,並且該操作並未用到初始值。
片段三:
partiallyAggregated.reduce(cleanCombOp)
在Executor端初步聚合後,對結果資料使用combOp操作再次執行reduce操作。
由上面我們可以總結,aggregate執行結構圖,如下:
aggregate VS treeAggregate
1, aggregate和treeAggregate的作用一樣,最終的結果區別是treeAggregate執行combOp並沒有用到初始值zeroValue。
2,treeAggregate比aggregate多執行了n次任務,n可計算。
3,treeAggregate降低了aggregate在driver端記憶體溢位的風險。
可以舉個例子:
def seq (a: Int, b: Int ): Int ={
println ( "seq:" +a+ ":" +b)
a+b
}
def comb (a: Int, b: Int ): Int ={
println ( "comb:" +a+ ":" +b)
a+b
}
val res = sc.parallelize( List ( 1 , 2 , 4 , 5 , 8 , 9 , 7 , 2 ) , 3 )
res .aggregate( 1 )(seq , comb)
res . treeAggregate ( 1 )(seq ,comb)
aggregate結果應該是:1+2+4+5+8+9+7+2+3*1 +1=42
treeAggregate結果應該是:1+2+4+5+8+9+7+2+3*1=41