1. 程式人生 > >Spark之鍵值對操作-Java篇(三)

Spark之鍵值對操作-Java篇(三)

一、簡介

 鍵值對 RDD 是 Spark 中許多操作所需要的常見資料型別。本章就來介紹如何操作鍵值對 RDD。鍵值對 RDD 通常用來進行聚合計算。我們一般要先通過一些初始 ETL(抽取、轉 化、裝載)操作來將資料轉化為鍵值對形式。鍵值對 RDD 提供了一些新的操作介面(比如 統計每個產品的評論,將資料中鍵相同的分為一組,將兩個不同的 RDD 進行分組合並等)。

二、建立Pair RDD

在Spark中有很多種建立 pair RDD 的方式,此外,當需要把一個普通的RDD轉為 pairRDD 時,可以呼叫 map() 函式來實現,傳遞的函式需要返回鍵值對

 public static JavaPairRDD createPairRDD() {
        List list = Arrays.asList(5, 4, 3, 2, 1, 6, 9, 5, 8, 9);
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("PairRDDDemo");
        sc = new JavaSparkContext(conf);
        sc.setLogLevel("ERROR");
        JavaRDD rdd = sc.parallelize(list);
        PairFunction keyData = new PairFunction() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2 call(Integer x) throws Exception {
                return new Tuple2(x, x + 1);// 鍵值對轉換,key=x ,value=x+1
            }
        };
        JavaPairRDD pairs = rdd.mapToPair(keyData);
        System.out.println("轉換後的鍵值對=" + pairs.collect());
        return pairs;
}

三、Pair RDD 轉化成操作

1、相同鍵的值進行相加  reduceByKey

JavaPairRDD pair = pairs.reduceByKey((v1, v2) -> v1 + v2);

2、相同鍵的值進行分組 groupByKey

JavaPairRDD<Integer, Iterable> rdd = pairs.groupByKey();

3、獲取所以得 keys

JavaRDD keys = pairs.keys();

4、獲取全部的 values

JavaRDD values = pairs.values();

5、根據鍵排序 sortByKey

JavaPairRDD sortByKey = pairs.sortByKey();

6、相同的鍵值當中取出最大的那個鍵值對 如:[(1,2), (2,3), (3,4),(3,8)] 結果就是[(1,2), (2,3),(3,8)] JavaPairRDD max = pairs.reduceByKey((v1, v2) -> Math.max(v1, v2));

7、改變value的值 mapvalues

JavaPairRDD mapValues = pairs.mapValues(v1 -> v1 + new Random().nextInt(10));

8、批量更改value的值 flatMapValues(和mapValues是有區別的)

JavaPairRDD flatMapValues = pairs.flatMapValues(v1 -> Lists.newArrayList(10));

四、Pair RDD 行動操作

1、對每個鍵對應的元素分別計數 countByKey

Map countByKey = pairs.countByKey();

2、將結果以對映表的形式返回,以便查詢 collectAsMap

Map collectAsMap = pairs.collectAsMap();

3、返回給定鍵對應的所有值 lookup

List lookup = pairs.lookup(9);

 五、RDD分割槽

1、什麼是分割槽

       RDD 內部的資料集合在邏輯上(以及物理上)被劃分成多個小集合,這樣的每一個小集合被稱為分割槽。RDDprdd作為一個分散式的資料集,是分佈在多個worker節點上的。如下圖所示,RDD1有五個分割槽(partition),他們分佈在了四個worker nodes 上面,RDD2有三個分割槽,分佈在了三個worker nodes上面。

2、為什麼要分割槽

      分割槽的個數決定了平行計算的粒度。多個分割槽平行計算,能夠充分利用計算資源。

3、如何手動分割槽

       java的分割槽可以這樣(parallelize)

       JavaRDDrdd = sc.parallelize(list, 2); // 這個是分割槽用了,指定建立得到的 RDD 分割槽個數為 2。

       pairs.partitions().size() 分割槽數量檢視