1. 程式人生 > >【實踐】Spark RDD API實戰

【實踐】Spark RDD API實戰

  • map

Applies a transformation function on each item of the RDD and returns the result as a new RDD.

//3表示指定為3個Partitions
var a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
//以a各元素的長度建議新的RDD
var b = a.map(_.length)
//將兩個RDD組合新一個新的RDD
var c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3
), (salmon,6), (salmon,6), (rat,3), (elephant,8))*
  • zip

Joins two RDDs by combining the i-th of either partition with each other. The resulting RDD will consist of two-component tuples which are interpreted as key-va lue pairs by the methods provided by the PairRDDFunctions extension.

var
a1 = sc.parallelize(1 to 10, 3) var b1 = sc.parallelize(11 to 20, 3) a1.zip(b1).collect res1: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), \ (5,15), (6,16), (7,17), (8,18), (9,19), (10,20)) var a2 = sc.parallelize(1 to 10, 3) var b2 = sc.parallelize(11 to 20, 3) var c2 = sc.parallelize(21 to 30
, 3) a2.zip(b2).zip(c2).collect res3: Array[((Int, Int), Int)] = Array(((1,11),21), ((2,12),22), ((3,13),23), ((4,14),24), ((5,15),25), ((6,16),26), ((7,17),27), ((8,18),28), ((9,19),29), ((10,20),30)) a2.zip(b2).zip(c2).map((x) => (x._1._1, x._1._2, x._2 )).collect res2: Array[(Int, Int, Int)] = Array((1,11,21), (2,12,22), (3,13,23), (4,14,24), (5,15,25), (6,16,26), (7,17,27), (8,18,28), (9,19,29), (10,20,30))
  • filter

Evaluates a boolean function for each data item of the RDD and puts the items fo r which the function returned true into the resulting RDD.Joins two RDDs by combining the i-th of either partition with each other. The resulting RDD will consist of two-component tuples which are interpreted as key-va lue pairs by the methods provided by the PairRDDFunctions extension.

val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect
res4: Array[Int] = Array(2, 4, 6, 8, 10)
  • flatMap
    Similar to map, but allows emitting more than one item in the map function. map是一個元素,變成另一個元素。flatMap是一個元素變成1個或多個元素。
var a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect
res8: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4,
5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8,
1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res9: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

var x  = sc.parallelize(1 to 5, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(5))(_)).collect
res10: Array[Int] = Array(1, 1, 2, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5)
  • mapPartitions
    This is a specialized map that is called only once for each partition. The entir e content of the respective partitions is available as a sequential stream of va lues via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from th e following result due to the partitioning we chos 對每一個Partiion中的各個元素,以指定的函式進行處理,生成新的RDD。
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  var res = List[(T, T)]()
  var pre = iter.next
  while (iter.hasNext)
  {
    val cur = iter.next;
    res .::= (pre, cur)
    pre = cur;
  }
  res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
  • mapPartitionsWithIndex
    Similar to mapPartitions, but takes two parameters. The first parameter is the i ndex of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after ap plying whatever transformation the function encodes.
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
  iter.toList.map(x => index + "," + x).iterator
}
x.mapPartitionsWithIndex(myfunc).collect()
res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)
  • sample
    Randomly selects a fraction of the items of a RDD and returns them in a new RDD.
val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1, 0).count
res24: Long = 960

a.sample(true, 0.3, 0).count
res25: Long = 2888

a.sample(true, 0.3, 13).count
res26: Long = 2985
  • union, ++
    Performs the standard set operation: A union B. union,++是兩個RDD中的元素,都直接作為新RDD的元素。zip是兩個RDD中的元素組合成tupl e,tuple作為新RDD的元素。
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)
  • intersection
    Returns the elements in the two RDDs which are the same.
val x = sc.parallelize(1 to 20)
val y = sc.parallelize(10 to 30)
val z = x.intersection(y)
z.collect
res74: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11)
  • distinct
    Returns a new RDD that contains each unique value only once.
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
a.distinct(2).partitions.length
res16: Int = 2

a.distinct(3).partitions.length
res17: Int = 3
  • groupBy
val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), 
(odd,ArrayBuffer(1, 3, 5, 7, 9)))

val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
  a % 2
}
a.groupBy(myfunc).collect
res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), 
(1,ArrayBuffer(1, 3, 5, 7, 9)))

val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
  a % 2
}
a.groupBy(x => myfunc(x), 3).collect
a.groupBy(myfunc(_), 1).collect
res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), 
(1,ArrayBuffer(1, 3, 5, 7, 9)))
  • keyBy
    Constructs two-component tuples (key-value pairs) by applying a function on each data item. The result of the function becomes the key and the original data item becomes the value of the newly created tuples.
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
b.collect
res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), 
(8,elephant))
  • groupByKey
    Very similar to groupBy, but instead of supplying a function, the key-component of each pair will automatically be presented to the partitioner.
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
b.groupByKey.collect
res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), 
(6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), 
(5,ArrayBuffer(tiger, eagle)))
  • reduceByKey
    This function provides the well-known reduce functionality in Spark. Please note that any function f you provide, should be commutative in order to generate reproducible results.
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
  • aggregate
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z.aggregate(0)(math.max(_, _), _ + _)
res40: Int = 9

val z = sc.parallelize(List("a","b","c","d","e","f"),2)
z.aggregate("")(_ + _, _+_)
res115: String = abcdef

z.aggregate("x")(_ + _, _+_)
res116: String = xxdefxabc

val z = sc.parallelize(List("12","23","345","4567"),2)
z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res141: String = 42

z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res142: String = 11

val z = sc.parallelize(List("12","23","345",""),2)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res143: String = 10

  • sortByKey
This function sorts the input RDD’s data and stores it in a new RDD. The output RDD is a shuffled RDD because it stores data that is output by a reducer which has been shuffled. The implementation of this function is actually very clever. First, it uses a range partitioner to partition the data in ranges within the shuffled RDD. Then it sorts these ranges individually with mapPartitions using standard sort mechanisms.
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
c.sortByKey(false).collect
res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))

val a = sc.parallelize(1 to 100, 5)
val b = a.cartesian(a)
val c = sc.parallelize(b.takeSample(true, 5, 13), 2)
val d = c.sortByKey(false)
res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))

  • cogroup
cogroup對兩個RDD資料集按key進行group by,並對每個RDD的value進行單獨group by。
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c))),
(3,(ArrayBuffer(b),ArrayBuffer(c))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
)

val d = a.map((_, "d"))
b.cogroup(c, d).collect
res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))
)

val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)
val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)
x.cogroup(y).collect
res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))),
(2,(ArrayBuffer(banana),ArrayBuffer())),
(3,(ArrayBuffer(orange),ArrayBuffer())),
(1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))),
(5,(ArrayBuffer(),ArrayBuffer(computer))))

  • pipe
對每一個Partition的資料應用指定的shell命令,並輸出到stdin:
val a = sc.parallelize(1 to 9, 3)
a.pipe("head -n 1").collect
res2: Array[String] = Array(1, 4, 7)

  • coalesce,repartition
調整RDD的Partition個數,生成新的RDD。repartition固定會執行shuffle操作,coalesce可以指定是否shuffle。
val y = sc.parallelize(1 to 10, 10)
val z = y.coalesce(2, false)
z.partitions.length
res9: Int = 2