1. 程式人生 > >【spark】Spark運算元:RDD基本轉換操作–map、flagMap、distinct

【spark】Spark運算元:RDD基本轉換操作–map、flagMap、distinct

  • map

將一個RDD中的每個資料項,通過map中的函式對映變為一個新的元素。

輸入分割槽與輸出分割槽一對一,即:有多少個輸入分割槽,就有多少個輸出分割槽。

  1. hadoop fs -cat /tmp/lxw1234/1.txt
  2. hello world
  3. hello spark
  4. hello hive
  5. //讀取HDFS檔案到RDD
  6. scala>var data = sc.textFile("/tmp/lxw1234/1.txt")
  7. data: org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[1] at textFile at :21
  8. //使用map運算元
  9. scala>var mapresult
    = data.map(line => line.split("\\s+"))
  10. mapresult: org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[2] at map at :23
  11. //運算map運算元結果
  12. scala> mapresult.collect
  13. res0:Array[Array[String]]=Array(Array(hello, world),Array(hello, spark),Array(hello, hive))
  • flatMap

屬於Transformation運算元,第一步和map一樣,最後將所有的輸出分割槽合併成一個。

  1. /使用flatMap運算元
  2. scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))
  3. flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
  4. //運算flagMap運算元結果
  5. scala> flatmapresult.collect
  6. res1:Array[String]=Array(hello, world, hello, spark, hello, hive)

使用flatMap時候需要注意:
flatMap會將字串看成是一個字元陣列。
看下面的例子:

  1. scala> data.map(_.toUpperCase).collect
  2. res32:Array[String]=Array(HELLO WORLD, HELLO SPARK, HELLO HIVE, HI SPARK)
  3. scala> data.flatMap(_.toUpperCase).collect
  4. res33:Array[Char]=Array(H, E, L, L, O,, W, O, R, L, D, H, E, L, L, O,, S, P, A, R, K, H, E, L, L, O,, H, I, V, E, H, I,, S, P, A, R, K)

再看:

  1. scala> data.map(x => x.split("\\s+")).collect
  2. res34:Array[Array[String]]=Array(Array(hello, world),Array(hello, spark),Array(hello, hive),Array(hi, spark))
  3. scala> data.flatMap(x => x.split("\\s+")).collect
  4. res35:Array[String]=Array(hello, world, hello, spark, hello, hive, hi, spark)

這次的結果好像是預期的,最終結果裡面並沒有把字串當成字元陣列。
這是因為這次map函式中返回的型別為Array[String],並不是String。
flatMap只會將String扁平化成字元陣列,並不會把Array[String]也扁平化成字元陣列。

  • distinct

對RDD中的元素進行去重操作。

  1. scala> data.flatMap(line => line.split("\\s+")).collect
  2. res61:Array[String]=Array(hello, world, hello, spark, hello, hive, hi, spark)
  3. scala> data.flatMap(line => line.split("\\s+")).distinct.collect
  4. res62:Array[String]=Array(hive, hello, world, spark, hi)

如果覺得本部落格對您有幫助,請 贊助作者