1. 程式人生 > >Spark運算元執行流程詳解之八

Spark運算元執行流程詳解之八

針對rdd的每個元素利用f進行處理

/**
 * Applies a function f to all elements of this RDD.
 */
  def foreach(f: T => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

注意這是個action,會觸發f函式的執行

其次要注意,如果對RDD執行foreach,只會在Executor端有效,而並不是Driver端。

比如:rdd.foreach(println),只會在Executor的stdout中打印出來,Driver端是看不到的。

通過accumulator共享變數與foreach結合,可以統計rdd裡面的數值

scala> var cnt = sc.accumulator(0)

cnt: org.apache.spark.Accumulator[Int] = 0

scala> var rdd1 = sc.makeRDD(1 to 10,2)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21

scala> rdd1.foreach(x => cnt += x)

scala> cnt.value

res51: Int = 55

41.foreachPartition

foreachPartition和foreach一樣,針對每個分割槽,它們的區別類似於map和mappartitions操作

/*
 * Applies a function f to each partition of this RDD.
 */
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

42.subtract

將存在於本RDD中的記錄從other RDD中抹去,返回本RDD中剩餘的記錄
/*
 * Return an RDD with the elements from `this` that are not in `other`.
 *
 * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
 * RDD will be <= us.
 */
  def subtract(other: RDD[T]): RDD[T] = withScope {
//既然要相減,那麼就必須知道this rdd的分佈情況,即其分割槽函式
  subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}
繼續往下看: 
/**
 * Return an RDD with the elements from `this` that are not in `other`.
 */
  def subtract(
    other: RDD[T],
    p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  if (partitioner == Some(p)) {
//結果RDD的分割槽函式和本rdd相同,則重新生成一個p2,由於這個p2沒有定義equals函式,則意味著任何與其比較其實就是比較類地址,這樣就會導致接下去兩個rdd都會存在shuffle的動作,至於為什麼這樣設計,沒怎麼想明白
    // Our partitioner knows how to handle T (which, since we have a partitioner, is
    // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
    val p2 = new Partitioner() {
      override def numPartitions: Int = p.numPartitions
      override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
    }
    // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
    // anyway, and when calling .keys, will not have a partitioner set, even though
    // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
    // partitioned by the right/real keys (e.g. p).
    this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
  } else {
//如果不相等,則採用預設的hash分割槽
    this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
  }
}
主要是將本RDD和other rdd轉化為KV對,其中V為null,然後呼叫subtractByKey函式,且看subtractByKey的實現: 
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
  def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope {
  new SubtractedRDD[K, V, W](self, other, p)
}
返回一個SubtractedRDD,繼續往下看: 
private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
    @transient var rdd1: RDD[_ <: Product2[K, V]],
    @transient var rdd2: RDD[_ <: Product2[K, W]],
    part: Partitioner)
  extends RDD[(K, V)](rdd1.context, Nil) {
  
  private var serializer: Option[Serializer] = None
  
  /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
  def setSerializer(serializer: Serializer): SubtractedRDD[K, V, W] = {
    this.serializer = Option(serializer)
    this
  }
  
  override def getDependencies: Seq[Dependency[_]] = {
//根據分割槽函式獲取結果RDD和rdd1,rdd2的依賴關係
    Seq(rdd1, rdd2).map { rdd =>
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency(rdd, part, serializer)
      }
    }
  }
  
  override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](part.numPartitions)
    for (i <- 0 until array.length) {
      // Each CoGroupPartition will depend on rdd1 and rdd2
      array(i) = new CoGroupPartition(i, Seq(rdd1, rdd2).zipWithIndex.map { case (rdd, j) =>
        dependencies(j) match {
          case s: ShuffleDependency[_, _, _] =>
            None
          case _ =>
            Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
        }
      }.toArray)
    }
    array
  }
  
  override val partitioner = Some(part)
  
  override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
    val partition = p.asInstanceOf[CoGroupPartition]
//存相同的KEY的VALUE
    val map = new JHashMap[K, ArrayBuffer[V]]
    def getSeq(k: K): ArrayBuffer[V] = {
      val seq = map.get(k)
      if (seq != null) {
        seq
      } else {
        val seq = new ArrayBuffer[V]()
        map.put(k, seq)
        seq
      }
    }
    def integrate(depNum: Int, op: Product2[K, V] => Unit) = {
      dependencies(depNum) match {
//如果是窄依賴,則直接讀取父RDD的資料
        case oneToOneDependency: OneToOneDependency[_] =>
          val dependencyPartition = partition.narrowDeps(depNum).get.split
          oneToOneDependency.rdd.iterator(dependencyPartition, context)
            .asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
        //如果是寬依賴,則直接其對應的shuffle中間資料
        case shuffleDependency: ShuffleDependency[_, _, _] =>
          val iter = SparkEnv.get.shuffleManager
            .getReader(
              shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
            .read()
          iter.foreach(op)
      }
    }
  
    // the first dep is rdd1; add all values to the map
//將本RDD的KV對快取至記憶體integrate(0, t => getSeq(t._1) += t._2)
    // the second dep is rdd2; remove all of its keys
//然後遍歷other Rdd的對應shuffle分割槽資料,去除掉相同的key的值integrate(1, t => map.remove(t._1))
    //將(k,Seq(v))轉化成(k,v)列表map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten
  }
}
從實現可以看出subtractByKey用於rdd1比rdd2少很多的情況,因為rdd1是存在記憶體,rdd2只要遍歷stream即可。如果rdd1很大,且reduce數較少的情況可能發生OOM。如果rdd1很大可以考慮使用cogroup來實現。
Subtract的具體執行流程如下:

43.keyBy

利用函式f生成KV對
/**
 * Creates tuples of the elements in this RDD by applying `f`.
 */
  def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
  val cleanedF = sc.clean(f)
//利用map操作生成KV對
  map(x => (cleanedF(x), x))
}
例項如下: 
List<Integer> data = Arrays.asList(1,4,3,2,5,6);
JavaRDD<Integer> JavaRDD = jsc.parallelize(data, 2);
JavaPairRDD<Integer,Integer> pairRDD = JavaRDD.keyBy(new Function<Integer, Integer>() {
    @Override
    public Integer call(Integer v1) throws Exception {
        return v1;
    }
});
  for(Tuple2<Integer,Integer> tuple2:pairRDD.collect()){
    System.out.println(tuple2._1()+" "+tuple2._2());
}
列印如下:
1 1
4 4
3 3
2 2
5 5
6 6
其執行流程如下: