1. 程式人生 > >Spark運算元執行流程詳解之六

Spark運算元執行流程詳解之六

coalesce顧名思義為合併,就是把多個分割槽的RDD合併成少量分割槽的RDD,這樣可以減少任務排程的時間,但是請記住:合併之後不能保證結果RDD中的每個分割槽的記錄數量是均衡的,因為合併的時候並沒有考慮合併前每個分割槽的記錄數,合併只會減少RDD的分割槽個數,因此並不能利用它來解決資料傾斜的問題。

def coalesce(numPartitions: Int, shuffle: Boolean =false)(implicitord:Ordering[T] =null)
    : RDD[T] = withScope {
  if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
   

val distributePartition = (index: Int, items:Iterator[T]) => {
      var position = (newRandom(index)).nextInt(numPartitions)//針對不同的分割槽索引初始化一個隨機數

//將原來的記錄對映為(K,記錄)對,其中K為隨機數的不斷疊加      items.map { t =>
        // Note that the hash code of the key will just be the key itself. The HashPartitioner
        // will mod it with the number of total partitions.
       

position = position + 1
       
(position, t)
      }
    } : Iterator[(Int, T)]
    // include a shuffle step so that our upstream tasks are still distributed

//針對(k,記錄)進行一次Hash分割槽new CoalescedRDD(
      new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
      new HashPartitioner(numPartitions)),
      numPartitions).values//

由於是KV對,最後再取其V即可
  } else {
    new CoalescedRDD(this, numPartitions)
  }

}

先看其shuffle引數,如果為true的話,則先生成一個ShuffleRDD,然後在這基礎上產生CoalescedRDD,如果為false的話,則直接生成CoalescedRDD。因此先看下其ShuffleRDD的生成過程:


以上是將3個分割槽合併成2個分割槽,當shuffle為true的時候,其CoalescedRDD父RDD即ShuffledRDD的生成過程,如果shuffle為false的時候,則直接利用其本身取生成CoalescedRDD。

       再來看CoalescedRDD的計算過程:

private[spark] classCoalescedRDD[T: ClassTag](
    @transient varprev: RDD[T],
    maxPartitions: Int,
    balanceSlack: Double = 0.10)
  extends RDD[T](prev.context,Nil) { // Nil since we implement getDependencies
 
override def getPartitions: Array[Partition] = {
    val pc = newPartitionCoalescer(maxPartitions, prev, balanceSlack)
    pc.run().zipWithIndex.map {
      case (pg, i) =>
        val ids = pg.arr.map(_.index).toArray
        new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
    }
  }
  override def compute(partition: Partition, context: TaskContext):Iterator[T] = {
    partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
      firstParent[T].iterator(parentPartition, context)
    }
  }
  ……

}

/**
 * Class that captures a coalesced RDD by essentially keeping track of parent partitions
 *
@param index of this coalesced partition
 *
@param rdd which it belongs to

* parentsIndices它代表了當前CoalescedRDD對應分割槽索引的分割槽是由父RDD的哪幾個分割槽組成的
 *
@param parentsIndices list of indices in the parent that have been coalesced into this partition

* @param preferredLocation the preferred location for this partition
 */
private[spark] case class CoalescedRDDPartition(
    index: Int,
    @transient rdd: RDD[_],
    parentsIndices: Array[Int],
    @transient preferredLocation: Option[String] = None)extendsPartition {
  var parents:Seq[Partition] =parentsIndices.map(rdd.partitions(_))

  @throws(classOf[IOException])
  private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException{
    // Update the reference to parent partition at the time of task serialization
   
parents
= parentsIndices.map(rdd.partitions(_))
    oos.defaultWriteObject()
  }

  /**
   * Computes the fraction of the parents' partitions containing preferredLocation within
   * their getPreferredLocs.
   *
@return locality of this coalesced partition between 0 and 1
   */
 
def localFraction: Double = {
    val loc = parents.count { p =>
      val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
      preferredLocation.exists(parentPreferredLocations.contains)
    }
    if (parents.size ==0)0.0 else(loc.toDouble /parents.size.toDouble)
  }
}

CoalescedRDD的分割槽結果由CoalescedRDDPartition決定,其中parentsIndices引數代表了CoalescedRDD的某個分割槽索引的分割槽來源於其父RDD的哪幾個分割槽,然後就是利用flatMap把父RDD的多個分割槽串聯起來。因此主要關注CoalescedRDD是如何生成CoalescedRDDPartition的,即

override def getPartitions: Array[Partition] = {
  val pc = newPartitionCoalescer(maxPartitions, prev, balanceSlack)

  pc.run().zipWithIndex.map {
    case (pg, i) =>
      val ids = pg.arr.map(_.index).toArray
      new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
  }

}

通過PartitionCoalescer來計算生成CoalescedRDDPartition:

/**
 * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
 * load balanced and grouped by locality
 *
@return array of partition groups
 */
def run(): Array[PartitionGroup] = {

//設定一個個group
  setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
 
 //然後把父rdd的還沒有分配的partition放置到一個個group

throwBalls() // assign partitions (balls) to each group (bins)
 
getPartitions

}

首先生成一個個PartitionGroup,裡面的arr儲存了父rdd的分割槽索引,然後把其他父rdd沒有分配的分割槽投放至PartitionGroup裡面。先看setupGroups的過程,它首先生成targetLen個PartitionGroup,裡面包含了初始預設的父rdd的分割槽索引,其流程如下:

/**
 * Initializes targetLen partition groups and assigns a preferredLocation
 * This uses coupon collector to estimate how many preferredLocations it must rotate through
 * until it has seen most of the preferred locations (2 * n log(n))
 * @param targetLen
 */
  def setupGroups(targetLen: Int) {
  val rotIt = new LocationIterator(prev)
  // deal with empty case, just create targetLen partition groups with no preferred location
//如果父RDD的分割槽沒有本地性,則直接生成targetLenPartitionGroup返回if (!rotIt.hasNext) {
    (1 to targetLen).foreach(x => groupArr += PartitionGroup())
    return
  }
  noLocality = false
  // number of iterations needed to be certain that we've seen most preferred locations
  val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
  var numCreated = 0
  var tries = 0
  // rotate through until either targetLen unique/distinct preferred locations have been created
  // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
  // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
//優先針對每臺主機建立其對應的PartitionGroup,目的是為了讓之後的計算更加分散while (numCreated < targetLen && tries < expectedCoupons2) {
    tries += 1
    // rotIt.next()的返回值為(String, Partition),其中nxt_replica為主機名 nxt_part為分割槽索引val (nxt_replica, nxt_part) = rotIt.next()
    if (!groupHash.contains(nxt_replica)) {
      val pgroup = PartitionGroup(nxt_replica)
      groupArr += pgroup
      addPartToPGroup(nxt_part, pgroup)//將其分割槽索引新增進此PartitionGroup
        groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple
      numCreated += 1
    }
  }
//如果還沒有足夠多的PartitionGroup,實在不行則針對同一個主機名可以建立多個PartitionGroup
  while (numCreated < targetLen) {  // if we don't have enough partition groups, create duplicates
    //(String, Partition) 主機名 分割槽索引
    var (nxt_replica, nxt_part) = rotIt.next()
    val pgroup = PartitionGroup(nxt_replica)
    groupArr += pgroup
    //  val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]() (主機名,PartitionGroup(主機名)),同一個主機名可能存在多個PartitionGroup
    groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
    var tries = 0
//將其分割槽索引新增進此PartitionGroupwhile (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
      nxt_part = rotIt.next()._2
      tries += 1
    }
    numCreated += 1
  }
}

然後將剩餘沒有分配的父rdd的分割槽分配至對應的PartitionGroup

def throwBalls() {
  if (noLocality) {  // no preferredLocations in parent RDD, no randomization needed 沒有本地性,少分割槽合併成多分割槽,無法合併,保持原樣
  if (maxPartitions > groupArr.size) { // just return prev.partitions
      for ((p, i) <- prev.partitions.zipWithIndex) {
        groupArr(i).arr += p
      }
    } else { // no locality available, then simply split partitions based on positions in array
      for (i <- 0 until maxPartitions) {//否則無本地性要求的情況下,簡單的按區間進行合併
  val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
        val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
        (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
      }
    }
  } else {//遍歷父rdd的分割槽,且之前沒有被分配過,則進行分配
    for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
      //選擇某個PartitionGroup,然後新增至arr
pickBin(p).arr += p
    }
  }
}

那麼pickBin是如何計算的呢?且看:

/**
 * Takes a parent RDD partition and decides which of the partition groups to put it in
 * Takes locality into account, but also uses power of 2 choices to load balance
 * It strikes a balance between the two use the balanceSlack variable
 * @param p partition (ball to be thrown)
 * @return partition group (bin to be put in)
 */
  def pickBin(p: Partition): PartitionGroup = {
//獲取父rdd的該Partition的本地性所在主機的列表,並按其包含的分割槽數目從少到多排序
    val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
//如果沒有列表,則返回none,如果有,則返回最少的那個主機名的PartitionGroupval prefPart = if (pref == Nil) None else pref.head
    //隨機選擇2個PartitionGroup中包含分割槽數目最小的PartitionGroup
  val r1 = rnd.nextInt(groupArr.size)
  val r2 = rnd.nextInt(groupArr.size)
  val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
  if (prefPart.isEmpty) {//如果無本地性要求,則返回minPowerOfTwo
    // if no preferred locations, just use basic power of two
    return minPowerOfTwo
  }
  
  val prefPartActual = prefPart.get
  //否則根據平衡因子來選擇
  if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows
    minPowerOfTwo  // prefer balance over locality
  } else {
    prefPartActual // prefer locality over balance
  }
}
因此合併的原則就是:
1.保證CoalescedRDD的每個分割槽個數相同
2.CoalescedRDD的每個分割槽,儘量跟它的Parent RDD的本地性相同。比如說CoalescedRDD的分割槽1對應於它的Parent RDD的1到10這10個分割槽,但是1到7這7個分割槽在節點1.1.1.1上,那麼 CoalescedRDD的分割槽1所要執行的節點就是1.1.1.1。這麼做的目的是為了減少節點間的資料通訊,提升處理能力。
3.CoalescedRDD的分割槽儘量分配到不同的節點執行
比如說:
1)3個分割槽合併成2個分割槽,shuffle為true
ShuffleRDD的getPreferredLocations為Nil
2)2個分割槽合併成3個分割槽,shuffle為true
ShuffleRDD的getPreferredLocations為Nil
3)5個分割槽合併成3個分割槽,shuffle為 false,父RDD的每個分割槽都包含本地性
4)5個分割槽合併成3個分割槽,shuffle為 false,父RDD的每個分割槽不包含本地性
5)3個分割槽合併成5個分割槽,shuffle為 false,父RDD的每個分割槽都包含本地性
6)3個分割槽合併成5個分割槽,shuffle為 false,父RDD的每個分割槽不包含本地性

27.repartition