1. 程式人生 > >Spark學習(六)常用運算元整理

Spark學習(六)常用運算元整理

常用運算元

1、MapPartition

遍歷的單位是每一個partition。
遍歷原理:將每一個partition的資料先載入到記憶體,然後再一條一條遍歷。

rdd.mapPartitions((elems:Iterator[Int]) => {
      println("建立連線")
    while(elems.hasNext){
      println("拼接SQL語句 " + elems.next)
    }
    println("提交")
    elems
})

2、Map

遍歷單位是每一條記錄。

3、MapPartitionWithIndex

在遍歷每一個partition的時候能夠拿到每一個分割槽的ID號,這個運算元一般用於測試環境。

rdd.mapPartitionsWithIndex((index,iterator) =>{
   println("partitonId: " + index)
   while(iterator.hasNext){
      println(iterator.next)
   }
 }).count()

4、getNumPartitions

獲取RDD的分割槽數

val partitionNum1 = rdd.
val partitionNum2 = rdd.partitions.length

5、coalesce

coalesce(…,true)若引數為true,說明分割槽的時候需要產生shuffle,若引數為false說明不需要產生shuffle。
增加RDD的分割槽數使用coalesce(…,true)或者repartition

val coalesceRDD1 = facePowerRDD.coalesce(6, true)   
println("coalesceRDD1.getNumPartitions:" + coalesceRDD1.getNumPartitions)
coalesceRDD1.mapPartitionsWithIndex((index,iterator)=>{
    println("partitionId" + index)
   while(iterator.hasNext){
    println(iterator.next)
   }
    iterator
}).count()

減少RDD的分割槽數,使用coalesce(…,false),也可以使用coalesce(…,true)但是效率會降低。

facePowerRDD
    .coalesce(2, false)
    .mapPartitionsWithIndex((index,iterator)=>{
    println("partitionId" + index)
  while(iterator.hasNext){
   println(iterator.next)
 }
   iterator
}).count()

6、union

合併,他只是將rdd1與rdd2在邏輯上進行合併,並不會真正進行資料的合併以傳輸。

val rdd1 = sc.parallelize(1 to 10, 3)
val rdd2 = sc.makeRDD(11 to 20,3)
val unionRDD = rdd1.union(rdd2)
println(unionRDD.getNumPartitions)

7、zip

將兩個RDD進行橫向合併,但是zip是對應位置合併。
比如:非KV格式的RDD1、RDD2 zip KV格式的RDD

val zipRDD = rdd1.zip(rdd2)
zipRDD.foreach(println)

注意:

  1. 要進行zip的兩個RDD的元素數必須一致。
  2. 要進行zip的兩個RDD的分割槽數必須一致。

8、zipWitIndex

給RDD中的每一個元素加上一個唯一的索引號,非KV的RDD變成了KV格式的RDD。

val zipWithIndexRDD = rdd1.zipWithIndex()
zipWithIndexRDD.foreach(println)
zipWithIndexRDD.map(_.swap).lookup(2).foreach(println)

9、zipWithUniqueId

給RDD中的每一個元素加上一個唯一的索引號,非KV的RDD變成了KV格式的RDD。
每一個分割槽的第一個元素的索引號就是當前分割槽的分割槽號;
每一個分割槽的第二個元素的索引號就是第一個元素+分割槽數。

rdd1
   .zipWithUniqueId()
   .mapPartitions(iterator=>{
    while(iterator.hasNext){
     println(iterator.next)
    }
    iterator
}).count()

10、take(n)

取這個RDD中前n個元素,是action類運算元。

11、first

取這個RDD中第一個元素,與task(1)一樣,也是action類運算元。

rdd1.take(5).foreach(println)
//first = take(1)
println(rdd1.first())

12、combineByKey

rdd.combineByKey(初始化函式,combiner聚合函式,reduce大聚合函式)
combineByKey作用步驟:

  1. 分組完成後,初始化函式會作用到每組資料的第一個元素上。
  2. combiner聚合函式作用到每組資料上,得到最終的combiner小聚合結果。
  3. 將reduce大聚合函式作用在每組資料上。

總結:
val conf = new SparkConf().setMaster(“local”)

  1. local:使用1個執行緒來模擬。
  2. local[10]:程式碼在本機使用10個執行緒來模擬spark的執行。
  3. local[*]:電腦還剩下幾個core,那麼就啟動多少個執行緒來模擬。