1. 程式人生 > >JavaSpark-鍵值對操作(PairRDD)

JavaSpark-鍵值對操作(PairRDD)

鍵值對 RDD 通常用來進行聚合計算。先通過一些初始 ETL(抽取、轉 化、裝載)操作來將資料轉化為鍵值對形式。鍵值對 RDD 提供了一些新的操作介面
讓使用者控制鍵值對 RDD 在各節點上分佈情況的高階特性:分割槽。
使用可控的分割槽方式把常被一起訪問的資料放到同一個節點上,可以大大減少應用的通訊 開銷。這會帶來明顯的效能提升。

動機
Spark 為包含鍵值對型別的 RDD 提供了一些專有的操作,稱為 pair RDD。它們提供了並行操作各個鍵或跨節點重新進行資料分組 的操作介面。
通常從一個 RDD 中提取某些欄位(例如代表事件時間、使用者 ID 或者其他識別符號的欄位), 使用這些欄位作為 pair RDD 操作中的鍵。

建立Pair RDD
很多儲存鍵值對的資料格式會 在讀取時直接返回由其鍵值對資料組成的 pair RDD。 RDD 轉 為 pair RDD 時,可以呼叫 map() 函式來實現,傳遞的函式需要返回鍵值對。

Java 沒有自帶的二元組型別,因此 Spark 的 Java API 讓使用者使用 scala.Tuple2 類來建立二 元組。可以通過 new Tuple2(elem1, elem2) 來建立一個新的二元 組,並且可以通過 ._1() 和 ._2() 方法訪問其中的元素

        JavaRDD<String> data = js.parallelize(Arrays.
asList("haha","heihei")); JavaPairRDD<String,Integer> line = data.mapToPair(x -> {return new Tuple2(x,1);});

使用 Java 從記憶體資料集建立pair RDD 的話,則需要使用 SparkContext.parallelizePairs()

        List<Tuple2<String,Integer>> lt = new ArrayList<>();
        Tuple2<String
,Integer> tp1 = new Tuple2<>("pinda", 2); Tuple2<String,Integer> tp2 = new Tuple2<>("qank", 6); Tuple2<String,Integer> tp3 = new Tuple2<>("panda", 5); lt.add(tp1); lt.add(tp2); lt.add(tp3); JavaPairRDD<String,Integer> data = js.parallelizePairs(lt);

Pair RDD的轉化操作
Pair RDD 可以使用所有標準 RDD 上的可用的轉化操作
於 pair RDD 中包含二元組,所以需要傳遞的函式應 當操作二元組而不是獨立的元素
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述
只想訪問 pair RDD 的值部分,這時操作二元組很麻煩。由於這是一種常見的 使用模式,因此 Spark 提供了 mapValues(func) 函式

聚合操作
當資料集以鍵值對形式組織的時候,聚合具有相同鍵的元素進行一些統計是很常見的操 作。
之前講解過基礎 RDD 上的 fold()、combine()、reduce() 等行動操作,pair RDD 上則 有相應的針對鍵的轉化操作。Spark 有一組類似的操作,可以組合具有相同鍵的值。這些 操作返回 RDD,因此它們是轉化操作而不是行動操作。

reduceByKey() 與 reduce() 相當類似;它們都接收一個函式,並使用該函式對值進行合併。

foldByKey() 則與 fold() 相當類似;它們都使用一個與 RDD 和合並函式中的資料型別相 同的零值作為初始值。

可以使用 reduceByKey() 和 mapValues() 來計算每個鍵的對應值的 均值
用 reduceByKey() 和 foldByKey() 會在為每個鍵計算全域性的總結果之前 先自動在每臺機器上進行本地合併
combineByKey() 介面可以讓你自定義合併的行為。

//分散式單詞計數問題,flatMap() 來生成以單詞為鍵、以數字 1 為值的 pair RDD
        SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> input = sc.textFile("E:/太古神王.txt");
        JavaRDD<String> words = input.flatMap(x -> Arrays.asList(x.split("")).iterator());
        JavaPairRDD<String,Integer> counts = words.mapToPair(s -> new Tuple2<String, Integer>(s,1));
        JavaPairRDD<String, Integer> results =  counts.reduceByKey((x,y)->{ return x+y; }); 
        List<Tuple2<String, Integer>> results1 = results.collect();

        System.out.println(results1.toString());

對第一個 RDD 使用 countByValue() 函式,以更快地實現 單詞計數:
input.flatMap(x => x.split("")).countByValue()

combineByKey() 是最為常用的基於鍵進行聚合的函式。大多數基於鍵聚合的函式都是用它 實現的。和 aggregate() 一樣,combineByKey() 可以讓使用者返回與輸入資料的型別不同的 返回值。

class AvgCount implements Serializable{
    public int total;
    public int num;
    private static final long serialVersionUID = 3325529460700487293L;
    public AvgCount(int total,int num){
        this.total = total;
        this.num = num;
    }
}
        AvgCount a =new AvgCount(0,0);
        line.aggregate(a,(x,y)->{           
                            x.total += y;
                            x.num += 1;             
                            return x;
                            }, 
                         (x,y)->{               
                            x.total +=y.total;
                            x.num +=y.num;  
                            return x;}
                );

如何處理每個元素的。由於 combineByKey() 會遍歷分割槽中的所有元素,因此每個元素的鍵要麼還沒有遇到過,要麼就 和之前的某個元素的鍵相同

如果這是一個新的元素,combineByKey() 會使用一個叫作 createCombiner() 的函式來建立 那個鍵對應的累加器的初始值。需要注意的是,這一過程會在每個分割槽中第一次出現各個 鍵時發生,而不是在整個 RDD 中第一次出現一個鍵時發生

如果這是一個在處理當前分割槽之前已經遇到的鍵,它會使用 mergeValue() 方法將該鍵的累 加器對應的當前值與這個新的值進行合併

由於每個分割槽都是獨立處理的,因此對於同一個鍵可以有多個累加器。如果有兩個或者更 多的分割槽都有對應同一個鍵的累加器,就需要使用使用者提供的 mergeCombiners() 方法將各 個分割槽的結果進行合併

public static class AvgCount implements Serializable { 
  public AvgCount(int total, int num) {   
      total_ = total; 
      num_ = num; 
      }
    public int total_;   
    public int num_;   
    public float avg() {   
        return total_/(float)num_; 
    } 
} 

Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {   
    public AvgCount call(Integer x) { 
         return new AvgCount(x, 1);   
         } 
     }; 
Function2<AvgCount,Integer,AvgCount> addAndCount = new Function2<AvgCount,Integer,AvgCount>() {   
     public AvgCount call(AvgCount a, Integer x) {   
          a.total_ += x;
          a.num_ += 1;
          return a;   
          } 
    }; 

Function2<AvgCount,AvgCount,AvgCount> combine =   new Function2<AvgCount,AvgCount,AvgCount>() {   
public AvgCount call(AvgCount a, AvgCount b) {     
            a.total_ += b.total_;
            a.num_ += b.num_;
            return a;
         }
}; 

AvgCount initial = new AvgCount(0,0); JavaPairRDD<String, AvgCount> avgCounts = nums.combineByKey(createAcc,addAndCount,combine); 
Map<String, AvgCount> countMap = avgCounts.collectAsMap(); 
for(Entry<String,AvgCount> entry : countMap.entrySet()) {
    System.out.println(entry.getKey() + ":" + entry.getValue().avg());
 }

這裡寫圖片描述

並行度調優
Spark 是怎樣 確定如何分割工作的。每個 RDD 都有固定數目的分割槽,分割槽數決定了在 RDD 上執行操作 時的並行度。

在執行聚合或分組操作時,可以要求 Spark 使用給定的分割槽數。Spark 始終嘗試根據叢集 的大小推斷出一個有意義的預設值,對並行度進行調優來獲取更好的 效能表現。

大多數操作符都能接收第二個引數,這個引數用來指定分組結果或聚合結果的 RDD 的分割槽數

//設定並行度
sc.parallelize(data).reduceByKey(x, y -> x + y, 10) 

Spark 提供了 repartition() 函式。它會把資料通過網路進行混洗,並創 建出新的分割槽集合
對資料進行重新分割槽是代價相對比較大的操作
Spark 中也 有一個優化版的 repartition(),叫作 coalesce()。
你可以使用 Java 或 Scala 中的 rdd. partitions.size() 檢視 RDD 的分割槽數,並確保調 用 coalesce() 時將 RDD 合併到比現在的分割槽數更少的分割槽中。

資料分組
如果資料已經以預期的方式提取了鍵,groupByKey() 就會使用 RDD 中的鍵來對資料進行 分組。對一個由型別 K 的鍵和型別 V 的值組成的 RDD,所得到的結果 RDD 型別會是 [K, Iterable[V]]

groupByKey() 可以用於未成對的資料上,也可以根據除鍵相同以外的條件進行分組。它可以 接收一個函式,對源 RDD 中的每個元素使用該函式,將返回結果作為鍵再進行分組。

如果你發現自己寫出了先使用 groupByKey() 然後再對值使用 reduce() 或者 fold() 的程式碼,你很有可能可以通過使用一種根據鍵進行聚合的函式來更 高效地實現同樣的效果。
對每個鍵歸約資料,返回對應每個鍵的歸約值的 RDD,而不是把 RDD 歸約為一個記憶體中的值。
例rdd.reduceByKey(func) 與 rdd.groupByKey().mapValues(value => value.reduce(func)) 等價,但是前 者更為高效,因為它避免了為每個鍵建立存放值的列表的步驟。

對單個 RDD 的資料進行分組,可使用 cogroup() 函式對多個共享同 一個鍵的 RDD 進行分組。對兩個鍵的型別均為 K 而值的型別分別為 V 和 W 的 RDD 進行 cogroup() 時,得到的結果 RDD 型別為 [(K, (Iterable[V], Iterable[W]))]。如果其中的 一個 RDD 對於另一個 RDD 中存在的某個鍵沒有對應的記錄,那麼對應的迭代器則為空。 cogroup() 提供了為多個 RDD 進行資料分組的方法。

cogroup() 不僅可以用於實現連線操作,還可以用來求鍵的交集

連線 (程式碼新增)
將有鍵的資料與另一組有鍵的資料一起使用是對鍵值對資料執行的最有用的操作之一。連 接資料可能是 pair RDD 最常用的操作之一。連線方式多種多樣:右外連線、左外連線、交 叉連線以及內連線。

普通的 join 操作符表示內連線 。只有在兩個 pair RDD 中都存在的鍵才叫輸出。當一個輸 入對應的某個鍵有多個值時,生成的 pair RDD 會包括來自兩個輸入 RDD 的每一組相對應 的記錄。

“連線”是資料庫術語,表示將兩張表根據相同的值來組合欄位。

leftOuterJoin(other) 和 rightOuterJoin(other) 都會根據鍵連線兩個 RDD,但是允許結果中存在其中的一個 pair RDD 所缺失的鍵。

使用 leftOuterJoin() 產生的 pair RDD 中,源 RDD 的每一個鍵都有對應的記錄。每個 鍵相應的值是由一個源 RDD 中的值與一個包含第二個 RDD 的值的 Option(在 Java 中為 Optional)物件組成的二元組

資料排序
如果鍵有已定義的順 序,就可以對這種鍵值對 RDD 進行排序。當把資料排好序後,後續對資料進行 collect() 或 save() 等操作都會得到有序的資料

們經常要將 RDD 倒序排列,因此 sortByKey() 函式接收一個叫作 ascending 的引數,表 示我們是否想要讓結果按升序排序(預設值為 true)
以提供自定義的比較函式
下面例會將整數轉為字串

class IntegerComparator implements Comparator<Integer> {                                                        public int compare(Integer a, Integer b) {  
    return String.valueOf(a).compareTo(String.valueOf(b)) 
        } 
} 
rdd.sortByKey(comp)

Pair RDD的行動操作
所有基礎 RDD 支援的傳統行動操作也都在 pair RDD 上可用。Pair RDD 提供了一些額外的行動操作,可以讓我們充分利用資料的鍵值對特性
這裡寫圖片描述

資料分割槽(進階)詳細見書
對資料集在節點間的分割槽進行控制
在分散式程式中, 通訊的代價是很大的,因此控制資料分佈以獲得最少的網路傳輸可以極大地提升整體性 能
和單節點的程式需要為記錄集合選擇合適的資料結構一樣,Spark 程式可以通過控制 RDD 分割槽方式來減少通訊開銷
只有當資料集多次在 諸如連線這種基於鍵的操作中使用時,分割槽才會有幫助

Spark 中所有的鍵值對 RDD 都可以進行分割槽。系統會根據一個針對鍵的函式對元素進行分組

Spark 可以確保同一組的鍵出現在同一個 節點上

你可能使用雜湊分割槽將一個 RDD 分成了 100 個分割槽,此時鍵的雜湊值對 100 取模的結果相同的記錄會被放在一個節點上。你也可以使用範圍分割槽法,將鍵在同一 個範圍區間內的記錄都放在同一個節點上

這裡寫程式碼片

預設情況下,連線操作會將兩 個數據集中的所有鍵的雜湊值都求出來,將該雜湊值相同的記錄通過網路傳到同一臺機器 上,然後在那臺機器上對所有鍵相同的記錄進行連線操作
因為 userData 表比 每五分鐘出現的訪問日誌表 events 要大得多,所以要浪費時間做很多額外工作:在每次調 用時都對 userData 表進行雜湊值計算和跨節點資料混洗,雖然這些資料從來都不會變化

解決:在程式開始時,對 userData 表使用 partitionBy() 轉化操作, 將這張錶轉為雜湊分割槽。可以通過向 partitionBy 傳遞一個 spark.HashPartitioner 物件來 實現該操作

獲取RDD的分割槽方式
以使用 RDD 的 partitioner 屬性(Java 中使用 partitioner() 方法)來獲取 RDD 的分割槽方式

中使用 partitioner 屬性不僅是檢驗各種 Spark 操作如何影響分割槽方式的一種 好辦法,還可以用來在你的程式中檢查想要使用的操作是否會生成正確的結果

確實要在後續操作中使用 partitioned,那就應當在定義 partitioned 時,在第三行輸入的最後加上 persist()

如果不呼叫 persist() 的話,後續的 RDD 操作會對 partitioned 的整個譜系重新求值,這會導致對 pairs 一遍又一遍地進行雜湊分割槽操作

從分割槽中獲益的操作
Spark 的許多操作都引入了將資料根據鍵跨節點進行混洗的過程。所有這些操作都會 從資料分割槽中獲益(減少網路傳輸)

    對於像 reduceByKey() 這樣只作用於單個 RDD 的操作,執行在未分割槽的 RDD 上的時候會 導致每個鍵的所有對應值都在每臺機器上進行本地計算,只需要把本地最終歸約出的結 果值從各工作節點傳回主節點,所以原本的網路開銷就不算大。而對於諸如 cogroup()join() 這樣的二元操作,預先進行資料分割槽會導致其中至少一個 RDD(使用已知分割槽器的那個 RDD)不發生資料混洗。如果兩個 RDD 使用同樣的分割槽方式,並且它們還快取在 同樣的機器上(比如一個 RDD 是通過 mapValues() 從另一個 RDD 中創建出來的,這兩個 RDD 就會擁有相同的鍵和分割槽方式),或者其中一個 RDD 還沒有被計算出來,那麼跨節 點的資料混洗就不會發生了。

影響分割槽方式的操作
Spark 內部知道各操作會如何影響分割槽方式,並將會對資料進行分割槽的操作的結果 RDD 自 動設定為對應的分割槽器
轉化操作的結果並不一定會按已知的分割槽方式分割槽,這時輸出的 RDD 可能就會沒 有設定分割槽器
另外兩個操作 mapValues() 和 flatMapValues() 作為替代方法,它們可以保證每個二元組的鍵保持不變
其他所有的操作生成的結果都不會存在特定的分割槽方式

示例:PageRank

自定義分割槽方式