1. 程式人生 > >spark運算元詳解------Action運算元介紹

spark運算元詳解------Action運算元介紹

本文首發自個人部落格:https://blog.smile13.com/articles/2018/11/30/1543589289882.html

一、無輸出的運算元

1.foreach運算元

功能:對 RDD 中的每個元素都應用 f 函式操作,無返回值。
原始碼:

/**
* Applies a function f to all elements of this RDD. 
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala>  rdd1.foreach(x => printf("%d ", x))
1 2 3 4 5 6 7 8 9 

2.foreachPartition運算元

功能:該函式和foreach類似,不同的是,foreach是直接在每個partition中直接對iterator執行foreach操作,傳入的function只是在foreach內部使用,
而foreachPartition是在每個partition中把iterator給傳入的function,讓function自己對iterator進行處理(可以避免記憶體溢位)。

簡單來說,foreach的iterator是針對的rdd中的元素,而foreachPartition的iterator是針對的分割槽本身。
原始碼:

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> rdd1.foreachPartition(x => printf("%s ", x.size))
4 5 

二、輸出到HDFS等檔案系統的運算元

1.saveAsTextFile運算元

功能:該函式將資料輸出,以文字檔案的形式寫入本地檔案系統或者HDFS等。Spark將對每個元素呼叫toString方法,將資料元素轉換為文字檔案中的一行記錄。若將檔案儲存到本地檔案系統,那麼只會儲存在executor所在機器的本地目錄。
原始碼:

/**
* Save this RDD as a text file, using string representations of elements. 
*/
def saveAsTextFile(path: String): Unit = withScope {
// https://issues.apache.org/jira/browse/SPARK-2075
// 
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit 
// Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` 
// in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an 
// Ordering for `NullWritable`. That's why the compiler will generate different anonymous 
// classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. 
// 
// Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate 
// same bytecodes for `saveAsTextFile`.  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> rdd1.saveAsTextFile("file:///opt/app/test/saveAsTextFileTest.txt")

2.saveAsObjectFile運算元

功能:該函式用於將RDD以ObjectFile形式寫入本地檔案系統或者HDFS等。
原始碼:

/**
* Save this RDD as a SequenceFile of serialized objects. 
*/
def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}
示例:

scala> val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[40] at parallelize at <console>:24

scala> rdd1.saveAsObjectFile("file:///opt/app/test/saveAsObejctFileTest.txt")

3.saveAsHadoopFile運算元

功能:該函式將RDD儲存在HDFS上的檔案中,可以指定outputKeyClass、outputValueClass以及壓縮格式,每個分割槽輸出一個檔案。
原始碼:

/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD. 
* 
* @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly. 
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad 
* result of using direct output committer with speculation enabled. */def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
CompressionType.BLOCK.toString)
}

// Use configured output committer if already set
if (conf.getOutputCommitter == null) {
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.  val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use an output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
}
示例:

val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2)
rdd1.saveAsHadoopFile("hdfs://192.168.199.201:8020/test",classOf[ClassTag[Text]],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

4.saveAsSequenceFile運算元

功能:該函式用於將RDD以Hadoop SequenceFile的形式寫入本地檔案系統或者HDFS等。
原始碼:

/**
* Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key 
* and value types. If the key or value are Writable, then we use their classes directly; 
* otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc, 
* byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
* file system. 
*/
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
def anyToWritable[U <% Writable](u: U): Writable = u

// TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and
// valueWritableClass at the compile time. To implement that, we need to add type parameters to
// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a 
// breaking change.  val convertKey = self.keyClass != _keyWritableClass
val convertValue = self.valueClass != _valueWritableClass

logInfo("Saving as sequence file of type " +
s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
val jobConf = new JobConf(self.context.hadoopConfiguration)
if (!convertKey && !convertValue) {
self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (!convertKey && convertValue) {
self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (convertKey && !convertValue) {
self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
} else if (convertKey && convertValue) {
self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
}
}
示例:

scala> val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> rdd1.saveAsSequenceFile("file:///opt/app/test/saveAsSequenceFileTest1.txt")

5.saveAsHadoopDataset運算元

功能:該函式使用舊的Hadoop API將RDD輸出到任何Hadoop支援的儲存系統,例如Hbase,為該儲存系統使用Hadoop JobConf 物件。
原始碼:

/**
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for 
* that storage system. The JobConf should set an OutputFormat and any output paths required 
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop 
* MapReduce job. 
*/
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf))
SparkHadoopWriter.write(
rdd = self,
config = config)
}
示例:

val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2)
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/test/")
rdd1.saveAsHadoopDataset(jobConf)

6.saveAsNewAPIHadoopFile運算元

功能:該函式用於將RDD資料儲存到HDFS上,使用新版本Hadoop API。用法基本同saveAsHadoopFile。
原始碼:

/**
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
def saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val job = NewAPIHadoopJob.getInstance(hadoopConf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapreduce.output.fileoutputformat.outputdir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}
示例:

val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2)
rdd1.saveAsNewAPIHadoopFile("hdfs://192.168.199.201:8020/test",classOf[Text],classOf[IntWritable],classOf[output.TextOutputFormat[Text,IntWritable]])

7.saveAsNewAPIHadoopDataset運算元

功能:使用新的Hadoop API將RDD輸出到任何Hadoop支援的儲存系統,例如Hbase,為該儲存系統使用Hadoop Configuration物件。Conf設定一個OutputFormat和任何需要的輸出路徑(如要寫入的表名),就像為Hadoop MapReduce作業配置的那樣。
原始碼:

/**
* Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop 
* Configuration object for that storage system. The Conf should set an OutputFormat and any 
* output paths required (e.g. a table name to write to) in the same way as it would be 
* configured for a Hadoop MapReduce job. 
* 
* @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly. 
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad 
* result of using direct output committer with speculation enabled. 
*/
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
val config = new HadoopMapReduceWriteConfigUtil[K, V](new SerializableConfiguration(conf))
SparkHadoopWriter.write(
rdd = self,
config = config)
}
示例:

val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2)
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/test/")
rdd1.saveAsNewAPIHadoopDataset(jobConf)

三、輸出scala集合和資料型別的運算元

1.first運算元

功能:返回RDD中的第一個元素,不排序。
原始碼:

/**
* Return the first element in this RDD. 
*/
def first(): T = withScope {
take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = rdd1.first()
rdd2: Int = 1

scala> print(rdd2)
1

2.count運算元

功能:返回RDD中的元素數量。
原始碼:

/**
* Return the number of elements in the RDD. 
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
示例:

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> println(rdd1.count())
9

3.reduce運算元

功能:將RDD中元素兩兩傳遞給輸入函式,同時產生一個新值,新值與RDD中下一個元素再被傳遞給輸入函式,直到最後只有一個值為止。
原始碼:

/**
* Reduces the elements of this RDD using the specified commutative and 
* associative binary operator. 
*/
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}  var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
} }  sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val rdd2 = rdd1.reduce((x,y) => x + y)
rdd2: Int = 45

4.collect運算元

功能:將一個RDD以一個Array陣列形式返回其中的所有元素。
原始碼:

/**
* Return an array that contains all of the elements in this RDD. 
* 
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory. 
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd1.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

5.take運算元

功能:返回一個包含資料集前n個元素的陣列(從0下標到n-1下標的元素),不排序。
原始碼:

/**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the 
* results from that partition to estimate the number of additional partitions needed to satisfy 
* the limit. 
* 
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory. 
* 
* @note Due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of `Nothing` or `Null`.
*/
def take(num: Int): Array[T] = withScope {
val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.  var numPartsToTry = 1L
val left = num - buf.size
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end.  if (buf.isEmpty) {
numPartsToTry = partsScanned * scaleUpFactor
} else {
// As left > 0, numPartsToTry is always >= 1
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

res.foreach(buf ++= _.take(num - buf.size))
partsScanned += p.size
}

buf.toArray
}
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val rdd2 = rdd1.take(3)
rdd2: Array[Int] = Array(1, 2, 3)

6.top運算元

功能:從按降序排列的RDD中獲取前N個元素,或者有可選的key函式決定順序,返回一個數組。
原始碼:

/**
* Returns the top k (largest) elements from this RDD as defined by the specified 
* implicit Ordering[T] and maintains the ordering. This does the opposite of 
* [[takeOrdered]]. For example:
* {{{
*   sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
*   // returns Array(12) 
* 
*   sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2) 
*   // returns Array(6, 5) 
* }}}
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory. 
* 
* @param num k, the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
takeOrdered(num)(ord.reverse)
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val rdd2 = rdd1.top(3)
rdd2: Array[Int] = Array(9, 8, 7)

7.takeOrdered運算元

功能:返回RDD中前n個元素,並按預設順序排序(升序)或者按自定義比較器順序排序。
原始碼:

/**
* Returns the first k (smallest) elements from this RDD as defined by the specified 
* implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
* For example: 
* {{{
*   sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)
*   // returns Array(2) 
* 
*   sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2) 
*   // returns Array(2, 3) * }}}
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory. 
* 
* @param num k, the number of elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= collectionUtils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}}
示例:

scala> val rdd1 = sc.makeRDD(Seq(5,4,2,1,3,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24

scala> val rdd2 = rdd1.takeOrdered(3)
rdd2: Array[Int] = Array(1, 2, 3)

8.aggregate運算元

功能:aggregate函式將每個分割槽裡面的元素進行聚合(seqOp),然後用combine函式將每個分割槽的結果和初始值(zeroValue)進行combine操作。這個函式最終返回的型別不需要和RDD中元素型別一致。
原始碼:

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using 
* given combine functions and a neutral "zero value". This function can return a different result 
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U 
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are 
* allowed to modify and return their first argument instead of creating a new U to avoid memory 
* allocation. 
* 
* @param zeroValue the initial value for the accumulated result of each partition for the
* `seqOp` operator, and also the initial value for the combine results from
*                  different partitions for the `combOp` operator - this will typically be the
*                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
* @param seqOp an operator used to accumulate results within a partition
* @param combOp an associative operator used to combine results from different partitions
*/def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
》
scala>  val rdd2 =  rdd1.aggregate((0,0))(
|       (acc,number) => (acc._1 + number, acc._2 + 1),
|       (par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)
|     )
rdd2: (Int, Int) = (45,9)

9.fold運算元

功能:通過op函式聚合各分割槽中的元素及合併各分割槽的元素,op函式需要兩個引數,在開始時第一個傳入的引數為zeroValue,T為RDD資料集的資料型別,,其作用相當於SeqOp和comOp函式都相同的aggregate函式。
原始碼:

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a 
* given associative function and a neutral "zero value". The function 
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object 
* allocation; however, it should not modify t2. 
* 
* This behaves somewhat differently from fold operations implemented for non-distributed 
* collections in functional languages like Scala. This fold operation may be applied to 
* partitions individually, and then fold those results into the final result, rather than 
* apply the fold to each element sequentially in some defined ordering. For functions 
* that are not commutative, the result may differ from that of a fold applied to a 
* non-distributed collection. 
* 
* @param zeroValue the initial value for the accumulated result of each partition for the `op`
*                  operator, and also the initial value for the combine results from different
*                  partitions for the `op` operator - this will typically be the neutral
*                  element (e.g. `Nil` for list concatenation or `0` for summation)
* @param op an operator used to both accumulate results within a partition and combine results
*                  from different partitions */def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
示例:

scala> val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 5), ("a", 4)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> val rdd2 = rdd1.fold(("e", 0))((val1, val2) => { if (val1._2 >= val2._2) val1 else val2})
rdd2: (String, Int) = (d,5)

scala> println(rdd2)
(d,5)

10.lookup運算元

功能:該函式對(Key,Value)型的RDD操作,返回指定Key對應的元素形成的Seq。 這個函式處理優化的部分在於,如果這個RDD包含分割槽器,則只會對應處理K所在的分割槽,然後返回由(K,V)形成的Seq。 如果RDD不包含分割槽器,則需要對全RDD元素進行暴力掃描處理,搜尋指定K對應的元素
原始碼:

/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to. 
*/
def lookup(key: K): Seq[V] = self.withScope {
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
} : Seq[V]
val res = self.context.runJob(self, process, Array(index))
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
}
}
示例:

scala> val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("a", 5)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> val rdd2 = rdd1.lookup("a")
rdd2: Seq[Int] = WrappedArray(1, 5)

11.countByKey運算元

功能:用於統計RDD[K,V]中每個K的數量,返回具有每個key的計數的(k,int)pairs的Map。
原始碼:

/**
* Count the number of elements for each key, collecting the results to a local Map. 
* 
* @note This method should only be used if the resulting map is expected to be small, as
* the whole thing is loaded into the driver's memory. 
* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. 
*/
def countByKey(): Map[K, Long] = self.withScope {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}
示例:

scala> val rdd1 = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("a", 5)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> val rdd2 = rdd1.countByKey()
rdd2: scala.collection.Map[String,Long] = Map(d -> 1, b -> 1, a -> 2, c -> 1)

 

版權宣告:本文為博主原創文章,轉載請註明出處!