1. 程式人生 > >Spark運算元:transformation之map、flatMap和distinct

Spark運算元:transformation之map、flatMap和distinct

1、map

將RDD中的每個元素通過map中的函式對映為一個新的元素,並返回一個新型別的RDD。輸入時的分割槽數與輸出時的分割槽數保持一致。

//HDFS上的txt檔案
hadoop fs -cat /tmp/1.txt
hello world
hello spark
hello hive
 
//讀取HDFS檔案到RDD
scala> var data = sc.textFile("/tmp/1.txt")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
 
//使用map運算元
scala> var mapresult = data.map(line => line.split("\\s+"))
mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
其中Array[String]=Array(hello, world), Array(hello, spark), Array(hello, hive)

//運算map運算元結果
scala> mapresult.collect
res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))

2、flatMap

(1)分兩步:第一步map,第二步flat即將所有的輸出分割槽結果合併到一個分割槽。

/使用flatMap運算元
scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))
flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
 
//運算flagMap運算元結果
scala> flatmapresult.collect
res1: Array[String] = Array(hello, world, hello, spark, hello, hive)

(2)flatMap會將字串當做一個數組

scala> data.map(_.toUpperCase).collect
res32: Array[String] = Array(HELLO WORLD, HELLO SPARK, HELLO HIVE, HI SPARK)
scala> data.flatMap(_.toUpperCase).collect
res33: Array[Char] = Array(H, E, L, L, O,  , W, O, R, L, D, H, E, L, L, O,  , S, P, A, R, K, H, E, L, L, O,  , H, I, V, E, H, I,  , S, P, A, R, K)

(3)map與flatMap的處理結果對比

scala> data.map(x => x.split("\\s+")).collect
res34: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive), Array(hi, spark))
 
scala> data.flatMap(x => x.split("\\s+")).collect
res35: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)

flatMap只會將String扁平化成字元陣列,並不會把Array[String]也扁平化成字元陣列

3、distinct

對RDD中的元素去重。

scala> data.flatMap(line => line.split("\\s+")).collect
res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
 
scala> data.flatMap(line => line.split("\\s+")).distinct.collect
res62: Array[String] = Array(hive, hello, world, spark, hi)