Spark運算元執行流程詳解之六
coalesce顧名思義為合併,就是把多個分割槽的RDD合併成少量分割槽的RDD,這樣可以減少任務排程的時間,但是請記住:合併之後不能保證結果RDD中的每個分割槽的記錄數量是均衡的,因為合併的時候並沒有考慮合併前每個分割槽的記錄數,合併只會減少RDD的分割槽個數,因此並不能利用它來解決資料傾斜的問題。
def coalesce(numPartitions: Int, shuffle: Boolean =false)(implicitord:Ordering[T]
=null) //將原來的記錄對映為(K,記錄)對,其中K為隨機數的不斷疊加 items.map { t => //針對(k,記錄)進行一次Hash分割槽new CoalescedRDD( } |
先看其shuffle引數,如果為true的話,則先生成一個ShuffleRDD,然後在這基礎上產生CoalescedRDD,如果為false的話,則直接生成CoalescedRDD。因此先看下其ShuffleRDD的生成過程:
以上是將3個分割槽合併成2個分割槽,當shuffle為true的時候,其CoalescedRDD父RDD即ShuffledRDD的生成過程,如果shuffle為false的時候,則直接利用其本身取生成CoalescedRDD。
再來看CoalescedRDD的計算過程:
private[spark] classCoalescedRDD[T: ClassTag]( } /** * parentsIndices它代表了當前CoalescedRDD對應分割槽索引的分割槽是由父RDD的哪幾個分割槽組成的 * @param preferredLocation
the preferred location for this partition |
CoalescedRDD的分割槽結果由CoalescedRDDPartition決定,其中parentsIndices引數代表了CoalescedRDD的某個分割槽索引的分割槽來源於其父RDD的哪幾個分割槽,然後就是利用flatMap把父RDD的多個分割槽串聯起來。因此主要關注CoalescedRDD是如何生成CoalescedRDDPartition的,即
override def getPartitions: Array[Partition] = { } |
通過PartitionCoalescer來計算生成CoalescedRDDPartition:
/** //設定一個個group throwBalls() // assign partitions (balls) to each group (bins) } |
首先生成一個個PartitionGroup,裡面的arr儲存了父rdd的分割槽索引,然後把其他父rdd沒有分配的分割槽投放至PartitionGroup裡面。先看setupGroups的過程,它首先生成targetLen個PartitionGroup,裡面包含了初始預設的父rdd的分割槽索引,其流程如下:
/** * Initializes targetLen partition groups and assigns a preferredLocation * This uses coupon collector to estimate how many preferredLocations it must rotate through * until it has seen most of the preferred locations (2 * n log(n)) * @param targetLen */ def setupGroups(targetLen: Int) { val rotIt = new LocationIterator(prev) // deal with empty case, just create targetLen partition groups with no preferred location //如果父RDD的分割槽沒有本地性,則直接生成targetLen個PartitionGroup返回if (!rotIt.hasNext) { (1 to targetLen).foreach(x => groupArr += PartitionGroup()) return } noLocality = false // number of iterations needed to be certain that we've seen most preferred locations val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt var numCreated = 0 var tries = 0 // rotate through until either targetLen unique/distinct preferred locations have been created // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations, // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines) //優先針對每臺主機建立其對應的PartitionGroup,目的是為了讓之後的計算更加分散while (numCreated < targetLen && tries < expectedCoupons2) { tries += 1 // rotIt.next()的返回值為(String, Partition),其中nxt_replica為主機名 nxt_part為分割槽索引val (nxt_replica, nxt_part) = rotIt.next() if (!groupHash.contains(nxt_replica)) { val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup addPartToPGroup(nxt_part, pgroup)//將其分割槽索引新增進此PartitionGroup groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple numCreated += 1 } } //如果還沒有足夠多的PartitionGroup,實在不行則針對同一個主機名可以建立多個PartitionGroup while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates //(String, Partition) 主機名 分割槽索引 var (nxt_replica, nxt_part) = rotIt.next() val pgroup = PartitionGroup(nxt_replica) groupArr += pgroup // val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]() (主機名,PartitionGroup(主機名)),同一個主機名可能存在多個PartitionGroup groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup var tries = 0 //將其分割槽索引新增進此PartitionGroupwhile (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part nxt_part = rotIt.next()._2 tries += 1 } numCreated += 1 } } |
然後將剩餘沒有分配的父rdd的分割槽分配至對應的PartitionGroup
def throwBalls() { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed 沒有本地性,少分割槽合併成多分割槽,無法合併,保持原樣 if (maxPartitions > groupArr.size) { // just return prev.partitions for ((p, i) <- prev.partitions.zipWithIndex) { groupArr(i).arr += p } } else { // no locality available, then simply split partitions based on positions in array for (i <- 0 until maxPartitions) {//否則無本地性要求的情況下,簡單的按區間進行合併 val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) } } } } else {//遍歷父rdd的分割槽,且之前沒有被分配過,則進行分配 for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group //選擇某個PartitionGroup,然後新增至arr pickBin(p).arr += p
}
}
}
|
那麼pickBin是如何計算的呢?且看:
/** * Takes a parent RDD partition and decides which of the partition groups to put it in * Takes locality into account, but also uses power of 2 choices to load balance * It strikes a balance between the two use the balanceSlack variable * @param p partition (ball to be thrown) * @return partition group (bin to be put in) */ def pickBin(p: Partition): PartitionGroup = { //獲取父rdd的該Partition的本地性所在主機的列表,並按其包含的分割槽數目從少到多排序 val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs //如果沒有列表,則返回none,如果有,則返回最少的那個主機名的PartitionGroupval prefPart = if (pref == Nil) None else pref.head //隨機選擇2個PartitionGroup中包含分割槽數目最小的PartitionGroup val r1 = rnd.nextInt(groupArr.size) val r2 = rnd.nextInt(groupArr.size) val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2) if (prefPart.isEmpty) {//如果無本地性要求,則返回minPowerOfTwo // if no preferred locations, just use basic power of two return minPowerOfTwo } val prefPartActual = prefPart.get //否則根據平衡因子來選擇 if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows minPowerOfTwo // prefer balance over locality } else { prefPartActual // prefer locality over balance } } |
因此合併的原則就是:
1.保證CoalescedRDD的每個分割槽個數相同
2.CoalescedRDD的每個分割槽,儘量跟它的Parent RDD的本地性相同。比如說CoalescedRDD的分割槽1對應於它的Parent RDD的1到10這10個分割槽,但是1到7這7個分割槽在節點1.1.1.1上,那麼 CoalescedRDD的分割槽1所要執行的節點就是1.1.1.1。這麼做的目的是為了減少節點間的資料通訊,提升處理能力。
3.CoalescedRDD的分割槽儘量分配到不同的節點執行
比如說:
1)3個分割槽合併成2個分割槽,shuffle為true
ShuffleRDD的getPreferredLocations為Nil
2)2個分割槽合併成3個分割槽,shuffle為true
ShuffleRDD的getPreferredLocations為Nil
3)5個分割槽合併成3個分割槽,shuffle為 false,父RDD的每個分割槽都包含本地性
4)5個分割槽合併成3個分割槽,shuffle為 false,父RDD的每個分割槽不包含本地性
5)3個分割槽合併成5個分割槽,shuffle為 false,父RDD的每個分割槽都包含本地性
6)3個分割槽合併成5個分割槽,shuffle為 false,父RDD的每個分割槽不包含本地性