1. 程式人生 > >Spark中map、mapPartitions、foreach、foreachPartitions運算元

Spark中map、mapPartitions、foreach、foreachPartitions運算元

map 與 mapPartitions


  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   *
   */
  def mapPartitions[U: ClassTag]

mapPartitions 是對每個Partition執行一個方法, 而map是對每條資料執行一個方法。
其中map的底層呼叫了mapPartitions運算元, mapPartitions運算元的效率更高。

foreach 與 foreachPartitions


  /**
   * Applies a function f to all elements of this RDD.
   */
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

  /**
   * Applies a function f to each partition of this RDD.
   */
  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }

foreach 與 foreachPartitions 與上兩者相似, 不同的是這兩者是action運算元。
當執行向資料庫插入資訊等操作時, 應當使用foreachP