1. 程式人生 > >《Spark快速大資料分析》——讀書筆記(4)

《Spark快速大資料分析》——讀書筆記(4)

第4章 鍵值對操作

鍵值對RDD通常用來進行聚合計算。我們一般要先通過一些初試ETL(抽取、轉化、裝載)操作來將資料轉化為鍵值對形式。
本章也會討論用來讓使用者控制鍵值對RDD在各節點上分佈情況的高階特性:分割槽。

4.1 動機

pair RDD(包含鍵值對型別的RDD)提供了並行操作各個鍵或跨節點重新進行資料分組的操作介面。

4.2 建立Pair RDD

當需要把一個普通的RDD轉為pair RDD時,可以呼叫map()函式來實現,傳遞的函式需要返回鍵值對。

例4-1:在Python中使用第一個單詞作為鍵創建出一個pair RDD

pairs=lines.map(lambda x: (x.split
(" ")[0], x))

java和Scala版本的略。

4.3 Pair RDD的轉化操作

Pair RDD可以使用所有標準RDD上的可用的轉化操作。3.4節中介紹的所有關於傳遞函式的規則也都是用於pair RDD。
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述

例4-4:用Python對第二個元素進行篩選

result=pairs.filter(lambda keyValue:len(keyValue[1])<20)

mapValues(func)函式功能類似於map{case (x, y) : (x, func(y))}可以只訪問值部分。

4.3.1 聚合操作

基礎RDD上的fold()、combine()、reduce()等是行動操作,Spark中對鍵進行聚合的聚合操作返回RDD是轉化操作。
reduceByKey()和reduce()類似;它們都接收一個函式,並使用該函式對值進行合併。它返回一個由各鍵和對應鍵歸約出來的結果值組成的新的RDD。
foldByKey()與fold()類似;它們都使用一個與RDD和合並函式中的資料型別相同的零值作為初始值。

例4-7:在Python中使用reduceByKey()和mapValues()計算每個鍵對應的平均值

rdd.mapValues(lambda x:(x, 1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

這裡寫圖片描述
分散式單詞計數問題,可以使用flatMap()來生成以單詞為鍵,以數字1為值的pair RDD,然後利用reduceByKey()對所有單詞進行計數。

例4-9:用Python 實現單詞計數

rdd=sc.textFile("s3://...")
words=rdd.flatMap(lambda x:x.split(" "))
result=words.map
(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)

combineByKey()是最為常用的基於鍵進行聚合的函式。和aggregate()一樣,combineByKey()可以讓使用者返回與輸入資料的型別不同的返回值。

combineByKey()有多個引數分別對應聚合操作的各個階段。

例4-12:在Python中使用combineByKey()求每個鍵對應的平均值

sumCount=nums.combineByKey((lambda x:(x,1)),
                           (lambda x,y:(x[0]+y,x[1]+1)),
                           (lambda x,y:(x[0]+y[0],x[1]+y[1])))
sumCount.map(lambda key, xy:(key, xy[0]/xy[1])).collectAsMap()

這裡寫圖片描述
有很多函式可以進行基於鍵的資料合併,在Spark中使用這些專用的聚合函式,始終要比手動將資料分組再規約快很多。
並行度調優
我們討論了所有的轉化操作的分發方式(這個是什麼?),但是還沒探討Spark是怎麼確定如何分割工作的。每個RDD都有固定數目的分割槽,分割槽樹決定了在RDD上執行操作時的並行度。
Spark有預設值,但是也可以調整分割槽數,進而對並行度調優獲取更好的效能表現。

例4-15:在Python中自定義reduceByKey()的並行度

data=[("a",3),("b",4),("a",1)]
sc.parallelize(data).reduceByKey(lambda x,y:x+y)
sc.parallelize(data).reduceByKey(lambda x,y:x+y,10)

除分組操作和聚合操作之外,Spark提供了repartition()函式,它吧資料通過網路進行混洗,並創建出新的分割槽集合。但代價較大。
coalesce()是優化版的repartition()函式。

4.3.2 資料分組

groupByKey()使用RDD中的鍵對資料進行分組。對於型別K的鍵和型別V的值的RDD,得到的結果是[K, Iterable[V]]。
groupBy()可以用在未成對的資料上,也可以根據鍵相同以外的條件進行分組。它可以接受一個函式,對源RDD中的每個元素使用該函式,將返回結果作為鍵在進行分組。
cogroup()函式對多個共享一個鍵的RDD進行分組。對兩個鍵的型別均為K而值型別分別為V和W的RDD進行cogroup()時,得到的結果RDD型別為[(K,(Iterable[V],Iterable[W]))]。cogroup()不僅可以用於實現連線操作,還可以用來求鍵的交集,除此之外,cogroup()還能同時應用於三個及以上的RDD。

4.3.3 連線

連線方式:右外連線、左外連線、交叉連線以及內連線。
普通的join操作符表示內連線。
還有leftOuterJoin(other)和rightOuterJoin(other)

4.3.4 資料排序

sortByKey()接受ascending引數,表示是否讓結果按升序排序。還可以支援自定義的比較函式。

例4-19:在Python中以字串順序對證書進行自定義排序

rdd.sortByKey(ascending=True, numPartitions=None,keyfunc=lambda x:str(x))

4.4 Pair RDD的行動操作

和轉化操作一樣,所有基礎RDD支援的傳統行動操作也都在pair RDD上可用,也有一些額外的行動操作。
這裡寫圖片描述

4.5 資料分割槽(進階)

Spark程式可以通過控制RDD分割槽方式來減少通訊開銷。
Spark中所有的鍵值對RDD都可以進行分割槽,系統會根據一個針對鍵的函式對元素進行分組。儘管Spark沒有給出顯示控制每個鍵具體落在哪一個工作節點上的方法,但Spark可以確保同一組的鍵出現在同一個節點上。

join()操作會對兩個表所有鍵的雜湊值計算出來,如果需要對每個表重複地進行join()操作,partitionBy()轉化操作可以將錶轉為雜湊分割槽,避免重複的對錶進行雜湊值計算和跨節點資料混洗。注意,parititionBy()是一個轉化操作,需要對結果進行持久化。

事實上,許多其他Spark操作會自動為結果RDD設定已知的分割槽方式資訊,而且除join()外還有很多操作也會利用到已有的分割槽資訊。比如,sortByKey()和groupByKey()會分別生成範圍分割槽的RDD和雜湊分割槽的RDD。另一方面,諸如map()這樣的操作會導致新的RDD失去父RDD的分割槽資訊,因為這樣的操作理論上可能會修改每條記錄的鍵。

4.5.1 獲取RDD的分割槽方式

在Scala和Java中,可以使用RDD的partitioner屬性(Java中使用partitioner()方法)來獲取RDD的分割槽方式。它會返回一個scala.Option物件,這是Scala中用來存放可能存在的物件的容器類,可以對其呼叫isDefined()來檢查其中是否有值,呼叫get()來獲取其中的值。
Python中沒有提供查詢分割槽方式的方法,但是Spark內部仍會裡所有已有的分割槽資訊。

4.5.2 從分割槽中獲益的操作

Spark的許多操作都引入了將資料根據鍵跨節點進行混洗的過程。這些操作都會從資料分割槽中獲益。如:cogroup()、groupwith()、join()、leftOuterJoin()、rightOuterJoin、groupByKey()、reduceByKey()、combineByKey()以及lookup()。
對於像reduceByKey()這樣只作用於單個RDD的操作,執行在未分割槽的RDD上的時候會導致每個鍵的所有對應值都在每臺機器上進行本地運算,只需要吧本地最終歸約出的結果傳回主節點,網路開銷本來就不大。而對於cogroup()和join() 這樣的二元操作,預先進行資料分割槽會導致其中至少一個RDD不發生資料混洗。如果兩個RDD使用同樣的分割槽方式,並且他們還快取在同樣的機器上,或者其中一個RDD還沒有被計算出來(這裡不懂!),那麼跨節點的資料混洗就不會發生了。

4.5.3 影響分割槽方式的操作

Spark內部知道各操作會如何影響分割槽方式,並將會對資料進行分割槽的操作的結果RDD自動設定為對應的分割槽器。
所有會為生成的結果RDD設好分割槽方式的操作:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sort()、mapValues()(如果父RDD有分割槽方式的話)、flatMapValues()(如果父RDD有分割槽方式的話),以及filter()(如果父RDD有分割槽方式的話)。
最後,對於二元操作,輸出資料的分割槽方式取決於父RDD 的分割槽方式。預設情況下,結果會採用雜湊分割槽,分割槽的數量和操作的並行度一樣。不過,如果其中的一個父RDD 已經設定過分割槽方式,那麼結果就會採用那種分割槽方式;如果兩個父RDD 都設定過分割槽方式,結果RDD 會採用第一個父RDD 的分割槽方式。

4.5.4 示例:PageRank

4.5.5 自定義分割槽方式

雖然Spark提供的HashPartitioner與RangePartitioner已經能滿足大多數用例,Spark還允許你提供一個自定義的Partitioner物件來控制RDD的分割槽方式。

要實現自定義的分割槽器,需要繼承org.apache.spark.Partitioner類並實現下面三個方法:

  • numPartitions:Int:返回創建出來的分割槽數。
  • getPartition(key:Any):Int:返回給定鍵的分割槽編號(0到numPartitions-1)
  • equals():Java判斷相等性的標準方法。Spark需要該方法檢查分割槽器物件是否和其他分割槽器示例相同,這樣Spark才可以判斷兩個RDD的分割槽方式是否相同。

Java和Scala中自定義Partitioner的方法與Scala中的做法非常相似:只需要擴充套件spark.Partitioner類並且實現必要的方法即可。
Python中不需要擴充套件Partitioner類,而是吧一個特定的雜湊函式作為一個額外的引數傳給RDD.partitionBy()函式。

例4-27:Python自定義分割槽方式

import urlparse
def hash_domain(url)
    return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20,hash_domain)

注意,這裡你所傳過去的雜湊函式會被與其他RDD 的分割槽函式區分開來。如果你想要對多個RDD 使用相同的分割槽方式,就應該使用同一個函式物件,比如一個全域性函式,而不是為每個RDD 建立一個新的函式物件。

4.6 總結

本章學習瞭如何使用Spark提供的專門的函式來操作鍵值對資料。