1. 程式人生 > >Key-Value型別的RDD的建立及基本轉換(1)

Key-Value型別的RDD的建立及基本轉換(1)

1. 建立一個基本的key-value的RDD
scala> val kvPairRDD =
     |   sc.parallelize(Seq(("key1", "value1"), ("key2", "value2"), ("key3", "value3")))
kvPairRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[17] at parallelize at <console>:25

//使用collect獲得叢集元素資訊
scala> kvPairRDD.collect
res21: Array[(String, String)] = Array((key1,value1), (key2,value2), (key3,value3))
2. 可以從一個類的物件中建立RDD
case class User(userId: String, amount: Int)
val personSeqRDD =
  sc.parallelize(Seq(User("jeffy", 30), User("kkk", 20), User("jeffy", 30), User("kkk", 30)))
  scala> personSeqRDD.collect
  res22: Array[User] = Array(User(jeffy,30), User(kkk,20), User(jeffy,30), User(kkk,30))

scala>     //將RDD變成二元組型別的RDD
scala>     val keyByRDD = personSeqRDD.keyBy(x => x.userId)
keyByRDD: org.apache.spark.rdd.RDD[(String, User)] = MapPartitionsRDD[18] at keyBy at <console>:28

scala> keyByRDD.collect
res23: Array[(String, User)] = Array((jeffy,User(jeffy,30)), (kkk,User(kkk,20)), (jeffy,User(jeffy,30)), (kkk,User(kkk,30)))

val keyRDD2 = personSeqRDD.map(user=> (user.userId, user))
scala> keyRDD2.collect
res24: Array[(String, User)] = Array((jeffy,User(jeffy,30)), (kkk,User(kkk,20)), (jeffy,User(jeffy,30)), (kkk,User(kkk,30)))
val keyRDD3 = personSeqRDD.map(user=> (user.userId, user.amount))
scala> keyRDD3.collect
res25: Array[(String, Int)] = Array((jeffy,30), (kkk,20), (jeffy,30), (kkk,30))

val groupByRDD = personSeqRDD.groupBy(user=> user.userId)

scala> val groupByRDD = personSeqRDD.groupBy(user => user.userId)

groupByRDD: org.apache.spark.rdd.RDD[(String, Iterable[User])] = ShuffledRDD[22] at groupBy at <console>:28

val rdd1 = sc.parallelize(Seq("test", "hell"))

rdd1.map(str=> (str, 1))
scala> val a =  rdd1.map(str => (str, 1))
a: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[25] at map at <console>:26
scala> a.collect
res27: Array[(String, Int)] = Array((test,1), (hell,1))