1. 程式人生 > >Spark API 詳解/大白話解釋 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues

Spark API 詳解/大白話解釋 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues

map(function)
map是對RDD中的每個元素都執行一個指定的函式來產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

舉例:

val a = sc.parallelize(1 to 9, 3)
val b = a.map(x => x*2)//x => x*2是一個函式,x是傳入引數即RDD的每個元素,x*2是返回值
a.collect
//結果Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
b.collect
//結果Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

當然map也可以把Key變成Key-Value對

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x, 1))
b.collect.foreach(println(_))
/*
(dog,1)
(tiger,1)
(lion,1)
(cat,1)
(panther,1)
( eagle,1)
*/

mapPartitions(function)
map()的輸入函式是應用於RDD中每個元素,而mapPartitions()的輸入函式是應用於每個分割槽


package test

import scala.Iterator

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TestRdd {
  def sumOfEveryPartition(input: Iterator[Int]): Int = {
    var total = 0
    input.foreach { elem =>
      total += elem
    }
    total
  }
  def main(args: Array[String]) {
    val
conf = new SparkConf().setAppName("Spark Rdd Test") val spark = new SparkContext(conf) val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD有6個元素,分成2個partition val result = input.mapPartitions( partition => Iterator(sumOfEveryPartition(partition)))//partition是傳入的引數,是個list,要求返回也是list,即Iterator(sumOfEveryPartition(partition)) result.collect().foreach { println(_)//6 15 } spark.stop() } }

mapValues(function)
原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函式只適用於元素為KV對的RDD。

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect

//"x" + _ + "x"等同於everyInput =>"x" + everyInput + "x"
//結果
Array(
(3,xdogx),
(5,xtigerx),
(4,xlionx),
(3,xcatx),
(7,xpantherx),
(5,xeaglex)
)

flatMap(function)
與map類似,區別是原RDD中的元素經map處理後只能生成一個元素,而原RDD中的元素經flatmap處理後可生成多個元素

val a = sc.parallelize(1 to 4, 2)
val b = a.flatMap(x => 1 to x)//每個元素擴充套件
b.collect
/*
結果    Array[Int] = Array( 1, 
                           1, 2, 
                           1, 2, 3, 
                           1, 2, 3, 4)
*/

flatMapValues(function)

val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.flatMapValues(x=>1 to x)
b.collect.foreach(println(_))
/*
(1,1)
(1,2)
(3,1)
(3,2)
(3,3)
(3,4)
(5,1)
(5,2)
(5,3)
(5,4)
(5,5)
(5,6)
*/