1. 程式人生 > >spark運算元詳解------Transformation運算元介紹

spark運算元詳解------Transformation運算元介紹

本文首發自個人部落格:https://blog.smile13.com/articles/2018/12/02/1543738193387.html

一、Value資料型別的Transformation運算元 

1.輸入分割槽與輸出分割槽一對一型別的運算元

1.1.map運算元

功能:map是對RDD中的每個元素都執行一個指定的函式來產生一個新的RDD,任何原RDD中的元素在新RDD中都有且僅有一個元素與之對應。
原始碼:

/**
* Return a new RDD by applying a function to all elements of this RDD. 
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
示例:

scala> val a = sc.parallelize(1 to 10,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val b = a.map(_ * 2)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> a.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> b.collect
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

1.2.flatMap運算元

功能:將RDD中的每個元素通過函式f轉換為新的元素,並將生成的RDD的每個集合中的元素合併為一個集合,生成MapPartitionsRDD。
原始碼:

/**
* Return a new RDD by first applying a function to all elements of this *  RDD, and then flattening the results. 
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
示例:

scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val b = a.flatMap(x => 1 to x)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at flatMap at <console>:25

scala> b.collect
res2: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

1.3.mapPartitions運算元

功能:mapPartitions是map的一個變種。map的輸入函式是應用於RDD中每個元素,而mapPartitions的輸入函式是應用於每個分割槽。mapPartitions獲取麼個分割槽的迭代器,在函式中通過這個分割槽整體的迭代器對整個分割槽的元素進行操作。
原始碼:

/**
* Return a new RDD by applying a function to each partition of this RDD. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
示例:

scala> val a = sc.parallelize(1 to 6, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala>   def doubleFunc(iter: Iterator[Int]) : Iterator[(Int,Int)] = {
|     var res = List[(Int,Int)]()
|     while (iter.hasNext)
|     {
|       val cur = iter.next;
|       res .::= (cur,cur*2)
|     }
|     res.iterator
|   }
doubleFunc: (iter: Iterator[Int])Iterator[(Int, Int)]

scala> val result = a.mapPartitions(doubleFunc)
result: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at mapPartitions at <console>:27

scala> println(result.collect().mkString)
(2,4)(1,2)(4,8)(3,6)(6,12)(5,10)

1.4.mapPartitionsWithIndex運算元

功能:函式作用同mapPartitions,不過提供了兩個引數,第一個引數為分割槽的索引,第二個引數為輸入函式,即對每個分割槽操作的函式。
原始碼:

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
示例:

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> def mapPartIndexFunc(i1:Int,iter: Iterator[Int]):Iterator[(Int,Int)]={
|       val result = List[(Int, Int)]()
|       var i = 0
|       while(iter.hasNext){
|         i += iter.next()
|       }
|       result.::(i1, i).iterator
|     }
mapPartIndexFunc: (i1: Int, iter: Iterator[Int])Iterator[(Int, Int)]

scala> val b = a.mapPartitionsWithIndex(mapPartIndexFunc)
b: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:27

scala> b.foreach(println(_))
(0,6)
(1,15)
(2,24)

1.5.glom運算元

功能:將每個分割槽內的元素組成一個數組,分割槽數不變。
原始碼:

/**
* Return an RDD created by coalescing all elements within each partition into an array. 
*/
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
示例:

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> a.collect
res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> val b = a.glom
b: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[3] at glom at <console>:25

scala> b.collect
res3: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))

1.6.randomSplit運算元

功能:根據weight(權重值)將一個RDD劃分成多個RDD,權重越高劃分得到的元素較多的機率就越大。
1.需要注意的是第一個引數weight陣列內資料的加和應為1。
2.第二個引數seed是可選引數 ,作為random的種子,如果每次隨機的種子相同,生成的隨機數序列總是相同的。
原始碼:

/**
* Randomly splits this RDD with the provided weights. * * @param weights weights for splits, will be normalized if they don't sum to 1
* @param seed random seed
* * @return split RDDs in an array
*/def randomSplit(
weights: Array[Double],
seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
require(weights.forall(_ >= 0),
s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
require(weights.sum > 0,
s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")

withScope {
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
randomSampleWithRange(x(0), x(1), seed)
}.toArray
}
}
示例:

scala> val a = sc.parallelize(1 to 9)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> val b= a.randomSplit(Array(0.2,0.3,0.5))
b: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[27] at randomSplit at <console>:25, MapPartitionsRDD[28] at randomSplit at <console>:25, MapPartitionsRDD[29] at randomSplit at <console>:25)

scala> b.size
res20: Int = 3

scala> b(0).collect
res21: Array[Int] = Array(2, 3, 8)

scala> b(1).collect
res22: Array[Int] = Array(1, 5, 9)

scala> b(2).collect
res23: Array[Int] = Array(4, 6, 7)

下面是測試相同的種子會生成相同的結果
scala> val c= a.randomSplit(Array(0.2,0.8), 2)
c: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[30] at randomSplit at <console>:25, MapPartitionsRDD[31] at randomSplit at <console>:25)

scala> c(0).collect
res25: Array[Int] = Array(2, 3, 7)

scala> c(1).collect
res26: Array[Int] = Array(1, 4, 5, 6, 8, 9)

scala> val d= a.randomSplit(Array(0.2,0.8), 2)
d: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[32] at randomSplit at <console>:25, MapPartitionsRDD[33] at randomSplit at <console>:25)

scala> d(0).collect
res27: Array[Int] = Array(2, 3, 7)

scala> d(1).collect
res28: Array[Int] = Array(1, 4, 5, 6, 8, 9)

scala> val e= a.randomSplit(Array(0.2,0.8), 3)
e: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[34] at randomSplit at <console>:25, MapPartitionsRDD[35] at randomSplit at <console>:25)

scala> e(0).collect
res29: Array[Int] = Array(1, 5, 9)

scala> e(1).collect
res30: Array[Int] = Array(2, 3, 4, 6, 7, 8)

2.輸入分割槽與輸出分割槽多對一型別的運算元

2.1.union運算元

功能:求兩個運算元的並集,並且不去重,需要保證兩個 RDD 元素的資料型別相同。
原始碼:

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple 
* times (use `.distinct()` to eliminate them).
*/
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}
示例:

scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24

scala> val b = sc.parallelize(3 to 8)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val c = a.union(b)
c: org.apache.spark.rdd.RDD[Int] = UnionRDD[38] at union at <console>:27

scala> c.collect
res31: Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7, 8)

2.2.cartesian運算元

功能:對 兩 個 RDD 內 的 所 有 元 素進 行 笛 卡 爾 積 操 作。 操 作 後, 內 部 實 現 返 回CartesianRDD。
原始碼:

/**
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of 
* elements (a, b) where a is in `this` and b is in `other`.
*/
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
new CartesianRDD(sc, this, other)
}
示例:

scala> val rdd2 = sc.parallelize(5 to 9,1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val rdd3 = rdd1.cartesian(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[38] at cartesian at <console>:27

scala> rdd3.collect
res15: Array[(Int, Int)] = Array((1,5), (1,6), (1,7), (1,8), (1,9), (2,5), (2,6), (2,7), (2,8), (2,9), (3,5), (3,6), (3,7), (3,8), (3,9))

3.輸入分割槽與輸出分割槽多對多型別的運算元

3.1.groupBy運算元

功能:將元素通過函式生成相應的 Key,資料就轉化為 Key-Value 格式,之後將 Key 相同的元素分為一組。
原始碼:

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements 
* mapping to that key. The ordering of elements within each group is not guaranteed, and 
* may even differ each time the resulting RDD is evaluated. 
* 
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[39] at parallelize at <console>:24

scala> val rdd2 = rdd1.groupBy(x => { if (x % 2 == 0) "even" else "odd" })
rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[41] at groupBy at <console>:25

scala> rdd2.collect
res17: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))

3.2.coalesce運算元

功能:該函式用於將RDD進行重分割槽,預設不進行shuffle。
1.如果分割槽數減少,預設不進行shuffle,此時父RDD和子RDD之間是窄依賴。比如:1000個分割槽被重新設定成10個分割槽,這樣不會發生shuffle。
2.如果分割槽數量增大時,比如Rdd的原分割槽數是100,想設定成1000,此時,需要把shuffle設定成true才行,因為如果設定成false,
不會進行shuffle操作,此時父RDD和子RDD之間是窄依賴,這時並不會增加RDD的分割槽。
原始碼:

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
* 
* This results in a narrow dependency, e.g. if you go from 1000 partitions 
* to 100 partitions, there will not be a shuffle, instead each of the 100 
* new partitions will claim 10 of the current partitions. If a larger number 
* of partitions is requested, it will stay at the current number of partitions. 
* 
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, 
* this may result in your computation taking place on fewer nodes than 
* you like (e.g. one node in the case of numPartitions = 1). To avoid this, 
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever 
* the current partitioning is). 
* 
* @note With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions, 
* say 100, potentially with a few partitions being abnormally large. Calling 
* coalesce(1000, shuffle = true) will result in 1000 partitions with the 
* data distributed using a hash partitioner. The optional partition coalescer 
* passed in must be serializable. 
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.  position = position + 1
(position, t)
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
示例:

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> a.partitions.size
res11: Int = 3

scala> val b = a.coalesce(1)
b: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[9] at coalesce at <console>:25

scala> b.partitions.size
res12: Int = 1

scala> val c = a.coalesce(4)
c: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[10] at coalesce at <console>:25

scala> c.partitions.size
res13: Int = 3

scala> val d = a.coalesce(4, true)
d: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at coalesce at <console>:25

scala> d.partitions.size
res14: Int = 4

3.3.repartition運算元

功能:repartition方法其實就是呼叫了coalesce方法,shuffle設定為true的情況。
原始碼:

/**
* Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle. * * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207. 
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
示例:

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala> val b = a.repartition(1)
b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at repartition at <console>:25

scala> b.partitions.size
res15: Int = 1

scala> val c = a.repartition(4)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at repartition at <console>:25

scala> c.partitions.size
res16: Int = 4

4.輸出分割槽為輸入分割槽子集型的運算元

4.1.filter運算元

功能:filter 是對RDD中的每個元素都執行一個指定的函式來過濾產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。
原始碼:
/**
* Return a new RDD containing only the elements that satisfy a predicate. 
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at parallelize at <console>:24

scala> val rdd2 = rdd1.filter(_ % 2 == 0)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[43] at filter at <console>:25

scala> rdd2.collect
res18: Array[Int] = Array(2, 4, 6, 8)

4.2.distinct運算元

功能:distinct將RDD中的元素進行去重操作。
原始碼:

/**
* Return a new RDD containing the distinct elements in this RDD. 
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
示例:

scala> c.collect
res31: Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7, 8)

scala> val d = c.distinct()
d: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at distinct at <console>:25

scala> d.collect
res32: Array[Int] = Array(8, 1, 2, 3, 4, 5, 6, 7)

4.3.intersection運算元

功能:求兩個RDD的交集。
原始碼:

/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate 
* elements, even if the input RDDs did. 
* 
* @note This method performs a shuffle internally.
*/
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
示例:

scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at parallelize at <console>:24

scala> val b = sc.parallelize(3 to 8)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:24

scala> val c = a.intersection(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[49] at intersection at <console>:27

scala> c.collect
res33: Array[Int] = Array(4, 5, 3)

4.4.subtract運算元

功能:求兩個RDD的差集。
原始碼:

/**
* Return an RDD with the elements from `this` that are not in `other`.
* * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}
示例:

scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at parallelize at <console>:24

scala> val b = sc.parallelize(3 to 8)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:24

scala> val d = a.subtract(b)
d: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[53] at subtract at <console>:27

scala> d.collect
res34: Array[Int] = Array(1, 2)

4.5.sample運算元

功能:將 RDD 這個集合內的元素進行取樣,獲取所有元素的子集。使用者可以設定是否有放回的抽樣、百分比、隨機種子,進而決定取樣方式。
原始碼:

/**
* Return a sampled subset of this RDD. 
* 
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
*  without replacement: probability that each element is chosen; fraction must be [0, 1] 
*  with replacement: expected number of times each element is chosen; fraction must be greater 
*  than or equal to 0 * @param seed seed for the random number generator
* 
* @note This is NOT guaranteed to provide exactly the fraction of the count
* of the given [[RDD]].
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}")

withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}
示例:

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[44] at parallelize at <console>:24

scala> val rdd2 = rdd1.sample(false, 0.3)
rdd2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[45] at sample at <console>:25

scala> rdd2.collect
res20: Array[Int] = Array(5, 8, 9)

4.6.takeSample運算元

功能:和sample函式是一個原理,但是不使用相對比例取樣,而是按設定的取樣個數進行取樣,同時返回結果不再是RDD,而是相當於對取樣後的資料進行
Collect(),返回結果的集合為單機的陣列。
原始碼:

/**
* Return a fixed-size sampled subset of this RDD in an array 
* 
* @param withReplacement whether sampling is done with replacement
* @param num size of the returned sample
* @param seed seed for the random number generator
* @return sample of specified size in an array
* 
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory. 
*/
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] = withScope {
val numStDev = 10.0

require(num >= 0, "Negative number of elements requested")
require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
"Cannot support a sample size > Int.MaxValue - " +
s"$numStDev * math.sqrt(Int.MaxValue)")

if (num == 0) {
new Array[T](0)
} else {
val initialCount = this.count()
if (initialCount == 0) {
new Array[T](0)
} else {
val rand = new Random(seed)
if (!withReplacement && num >= initialCount) {
Utils.randomizeInPlace(this.collect(), rand)
} else {
val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
withReplacement)
var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

// If the first sample didn't turn out large enough, keep trying to take samples;
// this shouldn't happen often because we use a big multiplier for the initial size  var numIters = 0
while (samples.length < num) {
logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
numIters += 1
}
Utils.randomizeInPlace(samples, rand).take(num)
}
} }}
示例:

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> val rdd2 = rdd1.takeSample(false, 4)
rdd2: Array[Int] = Array(3, 1, 2, 9)

5.Cache型的運算元

5.1.persist運算元

功能:對RDD 進行快取操作。資料快取在哪裡依據 StorageLevel 這個列舉型別進行確定。 可以快取到記憶體或者磁碟。
原始碼:

/**
* Set this RDD's storage level to persist its values across operations after the first time 
* it is computed. This can only be used to assign a new storage level if the RDD does not 
* have a storage level set yet. Local checkpointing is an exception. 
*/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with // one that is explicitly requested by the user (after adapting it to use disk).  persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
快取等級:

StorageLevel.DISK_ONLY
StorageLevel.DISK_ONLY_2
StorageLevel.MEMORY_ONLY
StorageLevel.MEMORY_ONLY_2
StorageLevel.MEMORY_AND_DISK
StorageLevel.MEMORY_AND_DISK_2
StorageLevel.OFF_HEAP

5.2.cache運算元

功能:將 RDD 元素從磁碟快取到記憶體。 相當於 persist(MEMORY_ONLY) 函式的功能。
原始碼:

/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()

二、Key-Value資料型別的Transformation運算元 

1.輸入分割槽與輸出分割槽一對一型別的運算元

1.1.mapValues運算元

功能:該函式用於處理key-value的Value,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函式只適用於元素為key-value對的RDD。
原始碼:

/**
* Pass each value in the key-value pair RDD through a map function without changing the keys; 
* this also retains the original RDD's partitioning. 
*/
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("D", 4)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rdd2 = rdd1.mapValues(10 + _)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at mapValues at <console>:25

scala> rdd2.collect
res4: Array[(String, Int)] = Array((A,11), (B,12), (C,13), (D,14))

1.2.flatMapValues運算元

功能:flatMapValues類似於mapValues,不同的在於flatMapValues應用於元素為KV對的RDD中Value。每個一元素的Value被輸入函式對映為一系列的值,然後這些值再與原RDD中的Key組成一系列新的KV對。
原始碼:

/**
* Pass each value in the key-value pair RDD through a flatMap function without changing the 
* keys; this also retains the original RDD's partitioning. 
*/
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
cleanF(v).map(x => (k, x))
},
preservesPartitioning = true)
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("D", 4)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> val rdd2 = rdd1.flatMapValues(1 to _)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at flatMapValues at <console>:25

scala> rdd2.collect
res5: Array[(String, Int)] = Array((A,1), (B,1), (B,2), (C,1), (C,2), (C,3), (D,1), (D,2), (D,3), (D,4))

1.3.sortByKey運算元

功能:該函式用於對Key-Value形式的RDD進行排序。
原始碼:

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index 
* of the original partition. 
* 
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 3), ("C", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("B", 2), ("D", 1), ("E", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[7] at union at <console>:27                       ^

scala> val rdd5 = rdd3.sortByKey(true)
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at sortByKey at <console>:25

scala> rdd5.collect
res3: Array[(String, Int)] = Array((A,1), (B,3), (B,2), (C,2), (D,1), (E,2))

1.4.sortBy運算元

功能:sortBykey的升級版,可以指定按key或者value排序。
原始碼:

/**
* Return this RDD sorted by the given key function. 
*/
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}
示例:

scala> val rdd1 = sc.parallelize(Array(("a",1),("b",2),("c",3),("a",4),("d",5),("b",6),("e",7),("c",8),("d",9)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> val rdd2 = rdd1.reduceByKey(_+_)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[13] at reduceByKey at <console>:25

scala> val rdd3 = rdd2.sortBy(_._2,false)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at sortBy at <console>:25

scala> rdd3.collect
res4: Array[(String, Int)] = Array((d,14), (c,11), (b,8), (e,7), (a,5))

1.5.zip運算元

功能:zip函式用於將兩個非key-value的RDD,通過以一對應的關係壓縮為key-vale的RDD,兩個RDD的分割槽數需要相同,分割槽中的元素個數也要相等。
原始碼:

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index 
* of the original partition. 
* 
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
示例:

scala> val a = sc.makeRDD(List(1,2,3))
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at makeRDD at <console>:24

scala> val b = sc.makeRDD(List("a","b","c"))
b: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[20] at makeRDD at <console>:24

scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[21] at zip at <console>:27

scala> c.collect
res5: Array[(Int, String)] = Array((1,a), (2,b), (3,c))

1.6.zipPartitions運算元

功能:zipPartitions函式將多個RDD按照partition組合成為新的RDD,該函式需要組合的RDD具有相同的分割槽數,但對於每個分割槽內的元素數量沒有要求。
原始碼:

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index 
* of the original partition. 
* 
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
示例:

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> def mapPartIndexFunc(i1:Int,iter: Iterator[Int]):Iterator[(Int,Int)]={
|       val result = List[(Int, Int)]()
|       var i = 0
|       while(iter.hasNext){
|         i += iter.next()
|       }
|       result.::(i1, i).iterator
|     }
mapPartIndexFunc: (i1: Int, iter: Iterator[Int])Iterator[(Int, Int)]

scala> val b = a.mapPartitionsWithIndex(mapPartIndexFunc)
b: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:27

scala> b.foreach(println(_))
(0,6)
(1,15)
(2,24)

1.7.zipWithIndex運算元

功能:該函式將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。
原始碼:

/**
* Zips this RDD with its element indices. The ordering is first based on the partition index 
* and then the ordering of items within each partition. So the first item in the first 
* partition gets index 0, and the last item in the last partition receives the largest index. 
* * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. 
* This method needs to trigger a spark job when this RDD contains more than one partitions. 
* * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
* elements in a partition. The index assigned to each element is therefore not guaranteed, 
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee 
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file. 
*/
def zipWithIndex(): RDD[(T, Long)] = withScope {
new ZippedWithIndexRDD(this)
}
示例:

scala> val a = sc.parallelize(1 to 5,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val b = a.zipWith
zipWithIndex   zipWithUniqueId

scala> val b = a.zipWithIndex()
b: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[25] at zipWithIndex at <console>:25

scala> b.collect
res6: Array[(Int, Long)] = Array((1,0), (2,1), (3,2), (4,3), (5,4))

1.8.zipWithUniqueId運算元

功能:該函式將RDD中元素和一個唯一ID組合成鍵/值對,該唯一ID生成演算法如下:
每個分割槽中第一個元素的唯一ID值為:該分割槽索引號;
每個分割槽中第N個元素的唯一ID值為:(前一個元素的唯一ID值) + (該RDD總的分割槽數);
原始碼:

/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, 
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method 
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
* 
* @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
* elements in a partition. The unique ID assigned to each element is therefore not guaranteed, 
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee 
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file. 
*/
def zipWithUniqueId(): RDD[(T, Long)] = withScope {
val n = this.partitions.length.toLong
this.mapPartitionsWithIndex { case (k, iter) =>
Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
(item, i * n + k)
}
}}
示例:

scala> val a = sc.parallelize(1 to 5,2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val b = a.zipWithUniqueId()
b: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[1] at zipWithUniqueId at <console>:25

scala> b.collect
collect   collectAsMap   collectAsync

scala> b.collect
res0: Array[(Int, Long)] = Array((1,0), (2,2), (3,1), (4,3), (5,5))

//總分割槽數為2`
//第一個分割槽第一個元素ID為0,第二個分割槽第一個元素ID為1`
//第一個分割槽第二個元素ID為0+2=2,第一個分割槽第三個元素ID為2+2=4`
//第二個分割槽第二個元素ID為1+2=3,第二個分割槽第三個元素ID為3+2=5`

2.對單個RDD或兩個RDD聚集的運算元

2.1.combineByKey運算元

功能:該函式用於將RDD[K,V]轉換成RDD[K,C],這裡的V型別和C型別可以相同也可以不同。該函式有三個引數:
第一個引數:給定一個初始值,用函式生成初始值。
第二個引數:combinbe聚合邏輯。
第三個引數:reduce端聚合邏輯。
原始碼:

/**
* Generic function to combine the elements for each key using a custom set of aggregation 
* functions. This method is here for backward compatibility. It does not provide combiner 
* classtag information to the shuffle.
* 
* @see `combineByKeyWithClassTag`
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}

------------------引數說明:
createCombiner:組合器函式,用於將V型別轉換成C型別,輸入引數為RDD[K,V]中的V,輸出為C
mergeValue:合併值函式,將一個C型別和一個V型別值合併成一個C型別,輸入引數為(C,V),輸出為C
mergeCombiners:分割槽合併組合器函式,用於將兩個C型別值合併成一個C型別,輸入引數為(C,C),輸出為C
numPartitions:結果RDD分割槽數,預設保持原有的分割槽數
partitioner:分割槽函式,預設為HashPartitioner
mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,預設為true
示例:

scala> val rdd1 = sc.parallelize(List(1,2,2,3,3,3,3,4,4,4,4,4), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val rdd2 = rdd1.map((_, 1))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[11] at map at <console>:25

scala> val rdd3 = rdd2.combineByKey(-_, (x:Int, y:Int) => x + y,(x:Int, y:Int) => x + y)
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[12] at combineByKey at <console>:25

scala> rdd2.collect
res6: Array[(Int, Int)] = Array((1,1), (2,1), (2,1), (3,1), (3,1), (3,1), (3,1), (4,1), (4,1), (4,1), (4,1), (4,1))

scala> rdd3.collect
res7: Array[(Int, Int)] = Array((4,3), (2,0), (1,-1), (3,0))

在上述程式碼中,(1,1), (2,1), (2,1), (3,1), (3,1), (3,1) 被劃分到第一個partition,(3,1), (4,1), (4,1), (4,1), (4,1), (4,1) 被劃分到第二個。於是有如下操作:
(1, 1):由於只有1個,所以在值取負的情況下,自然輸出(1, -1) 
(2, 1):由於有2個,第一個取負,第二個不變,因此combine後為(2, 0) 
(3, 1):partition1中有3個,參照上述規則,combine後為(3, 1),partition2中有1個,因此combine後為(3, -1)。在第二次combine時,不會有初始化操作,因此直接相加,結果為(3, 0) 
(4, 1):過程同上,結果為(4, 3)

2.2.reduceByKey運算元

功能:reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行reduce,因此,Key相同的多個元素的值被reduce為一個值,然後與原RDD中的Key組成一個新的KV對。
原始碼:

/**
* Merge the values for each key using an associative and commutative reduce function. This will 
* also perform the merging locally on each mapper before sending results to a reducer, similarly 
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ 
* parallelism level. 
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 3), ("C", 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("B", 2), ("D", 1), ("E", 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[7] at union at <console>:27

scala> val rdd4 = rdd3.reduceByKey(_ + _)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:25

scala> rdd4.collect
res2: Array[(String, Int)] = Array((A,1), (B,5), (C,2), (D,1), (E,2))

2.3.partitionBy運算元

功能:該函式根據partitioner函式生成新的ShuffleRDD,將原RDD重新分割槽。
原始碼:

/**
* Return a copy of the RDD partitioned using the specified partitioner. 
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("D", 4)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val rdd2 = rdd1.glom()
rdd2: org.apache.spark.rdd.RDD[Array[(String, Int)]] = MapPartitionsRDD[3] at glom at <console>:25

scala> rdd2.collect
res1: Array[Array[(String, Int)]] = Array(Array((A,1), (B,2)), Array((C,3), (D,4)))

scala> val rdd3 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at partitionBy at <console>:25

scala> rdd3.partitions.size
res2: Int = 2

scala> val rdd4 = rdd3.glom
rdd4: org.apache.spark.rdd.RDD[Array[(String, Int)]] = MapPartitionsRDD[5] at glom at <console>:25

scala> rdd4.collect
res3: Array[Array[(String, Int)]] = Array(Array((B,2), (D,4)), Array((A,1), (C,3)))

2.4.groupByKey運算元

功能:根據key值進行分組,groupByKey()方法的資料本身就是一種key-value型別的。
原始碼:

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the 
* resulting RDD with the existing partitioner/parallelism level. The ordering of elements 
* within each group is not guaranteed, and may even differ each time the resulting RDD is * evaluated. 
* 
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
*/
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
示例:

scala> val a = sc.makeRDD(Array(("A",1),("B",2),("C",1),("A",3)))
a: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at <console>:24

scala> val b = a.groupByKey()
b: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[4] at groupByKey at <console>:25

scala> b.collect
res1: Array[(String, Iterable[Int])] = Array((A,CompactBuffer(1, 3)), (B,CompactBuffer(2)), (C,CompactBuffer(1)))

2.5.foldByKey運算元

功能:該函式用於RDD[K,V]根據K將V做摺疊、合併處理,其中的引數zeroValue表示先根據對映函式將zeroValue應用於V,進行初始化V,再將對映函式應用於初始化後的V。
原始碼:

/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. * * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("A", 4)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala>  val rdd2 = rdd1.foldByKey(10)(_ + _)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at foldByKey at <console>:25

scala> rdd2.collect
res8: Array[(String, Int)] = Array((B,12), (A,25), (C,13))

//將rdd1中每個key對應的V進行累加,注意zeroValue=10,需要先初始化V,對映函式為+操作,比如("A",1), ("A",4),先將zeroValue應用於每個V,得到:("A",1+10), ("A",4+10),即:("A",11), ("A",14),再將對映函式應用於初始化後的V,最後得到(A,11+14),即(A,25)

2.6.reduceByKeylocally運算元

功能:該函式將RDD[K,V]中每個K對應的V值根據對映函式來運算,運算結果對映到一個Map[K,V]中,而不是RDD[K,V]。
原始碼:

/**
* Merge the values for each key using an associative and commutative reduce function, but return 
* the results immediately to the master as a Map. This will also perform the merging locally on 
* each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. 
*/
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
val cleanedF = self.sparkContext.clean(func)

if (keyClass.isArray) {
throw new SparkException("reduceByKeyLocally() does not support array keys")
}

val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]

val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.asScala.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
m1
} : JHashMap[K, V]

self.mapPartitions(reducePartition).reduce(mergeMaps).asScala
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("A", 4)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:24

scala>  val rdd2 = rdd1.reduceByKeyLocally((x,y) => x * y)
rdd2: scala.collection.Map[String,Int] = Map(A -> 4, B -> 2, C -> 3)

2.7.cogroup運算元

功能:該函式用於將多個RDD中的同一個key對應的不同的value組合到一起。返回一個結果RDD,包含了一個元組,元組裡面的每一個key,對應多個RDD中匹配的value。
原始碼:

/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values 
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("D", 4)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("A", 5), ("B", 6), ("E", 7), ("F", 8)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[20] at cogroup at <console>:27

scala> rdd3.collect
res10: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((D,(CompactBuffer(4),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(5))), (E,(CompactBuffer(),CompactBuffer(7))), (B,(CompactBuffer(2),CompactBuffer(6))), (F,(CompactBuffer(),CompactBuffer(8))), (C,(CompactBuffer(3),CompactBuffer())))

2.8.subtractByKey運算元

功能:類似於subtract,刪掉 RDD 中鍵與 other RDD 中的鍵相同的元素。
原始碼:

/**
* Return an RDD with the pairs from `this` whose keys are not in `other`.
* * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be less than or equal to us. 
*/
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope {
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length)))
}
示例:

scala> val a = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")))
a: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val b = sc.makeRDD(Array(("B","4"),("C","5"),("D","6")))
b: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> val c = a.subtractByKey(b)
c: org.apache.spark.rdd.RDD[(String, String)] = SubtractedRDD[2] at subtractByKey at <console>:27

scala> c.collect
res0: Array[(String, String)] = Array((A,1))

###3.連線型別的運算元

3.1.join運算元

功能:對兩個需要連線的 RDD 進行 cogroup函式操作,將相同 key 的資料能夠放到一個分割槽,在 cogroup 操作之後形成的新 RDD 對每個key 下的元素進行笛卡爾積的操作,返回的結果再展平,對應 key 下的所有元組形成一個集合。最後返回 RDD[(K, (V, W))]。
原始碼:

/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("D", 4)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("A", 5), ("B", 6), ("E", 7), ("F", 8)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[22] at parallelize at <console>:24

scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[25] at join at <console>:27

scala> rdd3.collect
res12: Array[(String, (Int, Int))] = Array((A,(1,5)), (B,(2,6)))

3.2.leftOutJoin運算元

功能:leftOuterJoin類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯。
原始碼:

/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
* partition the output RDD. 
*/
def leftOuterJoin[W](
other: RDD[(K, W)],
partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.iterator.map(v => (v, None))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("D", 4)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:24

scala>  val rdd2 = sc.parallelize(List(("A", 5), ("B", 6), ("E", 7), ("F", 8)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> val rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[30] at leftOuterJoin at <console>:27

scala> rdd3.collect
res13: Array[(String, (Int, Option[Int]))] = Array((D,(4,None)), (A,(1,Some(5))), (B,(2,Some(6))), (C,(3,None)))

3.3.rightOutJoin運算元

功能:rightOuterJoin類似於SQL中的有外關聯right outer join,返回結果以引數中的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯。
原始碼:

/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
* resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
* pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
* partition the output RDD. 
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.iterator.map(w => (None, w))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
}
}
}
示例:

scala> val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("C", 3), ("D", 4)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[31] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(("A", 5), ("B", 6), ("E", 7), ("F", 8)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:24

scala> val rdd3 = rdd1.rightOuterJoin(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Option[Int], Int))] = MapPartitionsRDD[35] at rightOuterJoin at <console>:27

scala> rdd3.collect
res14: Array[(String, (Option[Int], Int))] = Array((A,(Some(1),5)), (E,(None,7)), (B,(Some(2),6)), (F,(None,8)))

 

版權宣告:本文為博主原創文章,轉載請註明出處!