1. 程式人生 > >Spark運算元執行流程詳解之四

Spark運算元執行流程詳解之四

針對RDD的每個分割槽進行處理,返回一個新的RDD
/**
 * Return a new RDD by applying a function to each partition of this RDD.
 *
 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
 */
  
def mapPartitions[U: ClassTag](     f: Iterator[T] => Iterator[U],     preservesPartitioning: Boolean = false): RDD[U] = withScope {   val cleanedF = sc.clean(f)   new MapPartitionsRDD(     this,     (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),     preservesPartitioning)
}

注意preservesPartitioning引數,如果子RDD需要保留父RDD的分割槽資訊的話,則必須設定為true,否則經過轉化之後的RDD的partitioner為None了。

同時需要注意的是:map是對rdd中的每一個元素進行操作,而mapPartitions(foreachPartition)則是對rdd中的每個分割槽的迭代器進行操作。如果在map過程中需要頻繁建立額外的物件(例如將rdd中的資料通過jdbc寫入資料庫,map需要為每個元素建立一個連結而mapPartition為每個partition建立一個連結),則mapPartitions效率比map高的多。

SparkSql或DataFrame預設會對程式進行mapPartition的優化。

19.mapPartitionsWithIndex