1. 程式人生 > >Spark從入門到精通六------RDD的運算元

Spark從入門到精通六------RDD的運算元

    1. RDD程式設計API
      1. RDD運算元

運算元是RDD中定義的方法,分為轉換(transformantion)和動作(action)。Tranformation運算元並不會觸發Spark提交作業,直至Action運算元才提交任務執行,這是一個延遲計算的設計技巧,可以避免記憶體過快被中間計算佔滿,從而提高記憶體的利用率。

RDD擁有的操作比MR豐富的多,不僅僅包括Map、Reduce操作,還包括filter、sort、join、save、count等操作,並且中間結果不需要儲存,所以Spark比MR更容易方便完成更復雜的任務。

RDD支援兩種型別的操作:

轉換(Transformation)

 現有的RDD通過轉換生成一個新的RDD。lazy模式,延遲執行。

轉換函式包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,union,join, coalesce等等。

動作(Action)  在RDD上執行計算,並返回結果給驅動程式(Driver)或寫入檔案系統。

動作操作包括:reduce,collect,count,first,take,countByKey以及foreach等等。

collect  該方法把資料收集到driver端   Array陣列型別

所有的transformation只有遇到action才能被執行。

當觸發執行action之後,資料型別不再是rdd了,資料就會儲存到指定檔案系統中,或者直接列印結果或者收集起來。

RDD操作流程示意:

RDD的轉換與操作

wordcount示例,檢視lazy特性。

只有在執行action時,才會真正開始運算,並得到結果或存入檔案中。

      1. Transformation

RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。

對RDD中的元素執行的操作,實際上就是對RDD中的每一個分割槽的資料進行操作,不需要關注資料在哪個分割槽中。

常用的Transformation:

轉換

含義

map(func)

返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成

filter(func)

返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成

flatMap(func)

先map,再flatten壓平

union(otherDataset)

對源RDD和引數RDD求並集後返回一個新的RDD

intersection(otherDataset)

對源RDD和引數RDD求交集後返回一個新的RDD

subtract(otherDataset)

求差集後返回新的RDD,出現在源rdd中,不在otherrdd中

distinct([numTasks]))

對源RDD進行去重後返回一個新的RDD

mapPartitions(func)

類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]

mapPartitionsWithIndex(func)

類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是

(Int, Interator[T]) => Iterator[U]

sortBy(func,[ascending], [numTasks])

與sortByKey類似,但是更靈活

sortByKey([ascending], [numTasks])

在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD

join(otherDataset, [numTasks])

在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD

cogroup(otherDataset, [numTasks])

在型別為(K,V)和(K,W)的RDD上呼叫,返回一個(K,(Iterable<V>,Iterable<W>))型別的RDD

cartesian(otherDataset)

笛卡爾積

mapValues(func)

在一個(K,V)的RDD上呼叫

groupBy (func, [numTasks])

根據自定義條件進行分組

groupByKey([numTasks])

在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

針對分割槽內部使用seqOp方法,針對最後的結果使用combOp方法。

coalesce(numPartitions)      

用於對RDD進行重新分割槽,第一個引數是分割槽的數量,第二個引數是是否進行shuffle,可不傳,預設不shuffle

repartition(numPartitions)

用於對RDD進行重新分割槽,相當於shuffle版的calesce

groupBy的返回值型別:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

T: 元素的型別   K : 指定的key

groupByKey

defgroupByKey(): RDD[(K, Iterable[V])]

reduceByKey

優先選擇reduceByKey,  語法更簡潔

效能優越

reduceByKey會進行分割槽內聚合,再經過網路傳輸,傳送到相對應的分割槽中。

sortBy既可以作用於RDD[K] ,還可以作用於RDD[(k,v)]

sortByKey  只能作用於 RDD[K,V] 型別上。

      1. Action

動作

含義

reduce(func)

通過func函式聚集RDD中的所有元素

collect()

在驅動程式中,以陣列的形式返回資料集的所有元素

collectAsMap

類似於collect。該函式用於Pair RDD,最終返回Map型別的結果。

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(類似於take(1))

take(n)

返回一個由資料集的前n個元素組成的陣列

saveAsTextFile(path)

將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統

top(n)

按照預設排序(降序) 取資料

takeOrdered(n[ordering])

與top類似,順序相反  預設是升序

countByKey()

針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。

foreach(func)

在資料集的每一個元素上,執行函式func進行更新。foreach,任務在executor中執行,列印資訊也會在executor中顯示

foreachPartition

對分割槽進行操作

foreach和foreachPartition

foreachParititon 每次迭代一個分割槽,foreach每次迭代一個元素。

該方法沒有返回值,或者Unit

主要作用於,沒有返回值型別的操作(列印結果,寫入到mysql資料庫中)

在寫入到mysql資料庫的時候,優先使用foreachPartititon* 結果 存入到 mysql* foreachPartition * 1,map mapPartition   轉換類的運算元, 返回值* 2, mysql  資料庫的連線* 100萬         100萬次的連線* 200 個分割槽     200次連線  一個分割槽中的資料,共用一個連線

foreach和map的區別:

map有返回值,foreach沒有返回值(Unit型別)

map是transformation,lazy執行,foreach是action運算元,觸發任務執行

處理的都是每一條資料。

rdd1.foreach(println) rdd1.foreachPartition(it=>println(it.mkString("")))

coalesce和repartition

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

coalesce(n) 原來的分割槽中的資料,不會被分配到多個分割槽中,

將RDD分割槽的數量修改為numPartitions,常用於減少分割槽

第一個引數為重分割槽的數目,第二個為是否進行shuffle,預設為false

當需要調大分割槽時,必須設定shuffle為true,才能有效,否則分割槽數不變

隨機重新shuffle RDD中的資料,並建立numPartitions個分割槽。這個操作總會通過網路來shuffle全部資料。常用於擴大分割槽

分割槽數調大調小,都會shuffle全部資料,是重量級運算元

常用用法?

coalesce(10,true) = reparititon(10)

如果不需要資料的shuffle,減少或者合併分割槽,就使用coalesce(num)

如果需要資料的shuffle,或者需要擴大分割槽數量,優先使用repartition(num)

擴大分割槽,作用:提升並行度(業務邏輯比較複雜,需要提升並行度)

// 重分割槽的api     rdd1.coalesce(2)     // coalesce(numPartitions: Int, shuffle: Boolean = false     rdd1.repartition(2// coalesce(numPartitions, shuffle = true) //    repartition 就是coalesce,第二個引數為true     /**  repartition 要進行資料的shuffle   不管是擴大分割槽,還是減少分割槽,都進行shuffle       *  coalesce預設沒有進行資料的shuffle   減少分割槽,直接是分割槽合併。       *  coalesce   擴大分割槽,        */

val f = (i:Int,it:Iterator[Int]) =>

     it.map(t=> s"part:$i,values:$t")

另外還有一類可以修改分割槽的方式:

在呼叫shuffle類的運算元時,可以在引數中設定分割槽的數量:

def reduceByKey(func: (V, V) => V, numPartitions: Int):

mapPartitions和mapPartitionsWithIndex

def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

該函式和map函式類似,只不過對映函式的引數由RDD中的每一個元素變成了RDD中每一個分割槽的迭代器。

如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效。

該方法,看上去是操作的每一條資料,實際上是對RDD中的每一個分割槽進行iterator,

mapPartitions( it: Iterator => {it.map(x => x * 10)})

mapPartitionsWithIndex

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

類似於mapPartitions, 不過提供了兩個引數,第一個引數為分割槽的索引。

mapPartitionsWithIndex的func接受兩個引數,第一個引數是分割槽的索引,第二個是一個數據集分割槽的迭代器。而輸出的是一個包含經過該函式轉換的迭代器。

val func = (index: Int, iter: Iterator[Int]) => {

  iter.map(x => "[partID:" +  index + ", val: " + x + "]")

}

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)

rdd1.mapPartitionsWithIndex(func).collect

/**       * map       * mapValues       * mapPartition      操作的是每一個分割槽    函式的輸入引數型別是Iterator[元素型別]       * mapPartitionWithIndex       *       */     // 建立rdd  同時指定分割槽的數量為2個  資料會被打散,然後平均分配     val rdd = sc.makeRDD(List(1, 3, 5, 7, 9), 2) // part 0 1     rdd.map({       i =>         // 具體的元素         i * 10     })     // 該方法每次操作的物件是一個迭代器,對應的就是一個分割槽的資料     rdd.mapPartitions({       it =>         // 分割槽         it.map(_ * 10)     })     // 函式     val f=(index:Int,it:Iterator[Int])=>{       it.map({         t=> s"part:$index,value=$t"       })     }     val index1: RDD[String] = rdd.mapPartitionsWithIndex({       // 第一個引數,是分割槽的索引  分割槽編號       // 第二個引數: 分割槽的資料 Iterator[Int]       case (index, it) =>         it.map({           t =>             s"part:$index,value=$t"         })     })//    ArrayBuffer(part:0,value=1, part:0,value=3, part:1,value=5, part:1,value=7, part:1,value=9)     // 收集資料並列印     println(index1.collect().toBuffer)

collect方法:

不能直接把資料收集到driver段,然後再執行入庫操作。

效率太低,容易引起dirver端崩潰了。OOM