Spark之鍵值對操作-Java篇(三)
一、簡介
鍵值對 RDD 是 Spark 中許多操作所需要的常見資料型別。本章就來介紹如何操作鍵值對 RDD。鍵值對 RDD 通常用來進行聚合計算。我們一般要先通過一些初始 ETL(抽取、轉 化、裝載)操作來將資料轉化為鍵值對形式。鍵值對 RDD 提供了一些新的操作介面(比如 統計每個產品的評論,將資料中鍵相同的分為一組,將兩個不同的 RDD 進行分組合並等)。
二、建立Pair RDD
在Spark中有很多種建立 pair RDD 的方式,此外,當需要把一個普通的RDD轉為 pairRDD 時,可以呼叫 map() 函式來實現,傳遞的函式需要返回鍵值對
public static JavaPairRDD createPairRDD() { List list = Arrays.asList(5, 4, 3, 2, 1, 6, 9, 5, 8, 9); SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("PairRDDDemo"); sc = new JavaSparkContext(conf); sc.setLogLevel("ERROR"); JavaRDD rdd = sc.parallelize(list); PairFunction keyData = new PairFunction() { private static final long serialVersionUID = 1L; @Override public Tuple2 call(Integer x) throws Exception { return new Tuple2(x, x + 1);// 鍵值對轉換,key=x ,value=x+1 } }; JavaPairRDD pairs = rdd.mapToPair(keyData); System.out.println("轉換後的鍵值對=" + pairs.collect()); return pairs; }
三、Pair RDD 轉化成操作
1、相同鍵的值進行相加 reduceByKey
JavaPairRDD pair = pairs.reduceByKey((v1, v2) -> v1 + v2);
2、相同鍵的值進行分組 groupByKey
JavaPairRDD<Integer, Iterable> rdd = pairs.groupByKey();
3、獲取所以得 keys
JavaRDD keys = pairs.keys();
4、獲取全部的 values
JavaRDD values = pairs.values();
5、根據鍵排序 sortByKey
JavaPairRDD sortByKey = pairs.sortByKey();
6、相同的鍵值當中取出最大的那個鍵值對 如:[(1,2), (2,3), (3,4),(3,8)] 結果就是[(1,2), (2,3),(3,8)] JavaPairRDD max = pairs.reduceByKey((v1, v2) -> Math.max(v1, v2));
7、改變value的值 mapvalues
JavaPairRDD mapValues = pairs.mapValues(v1 -> v1 + new Random().nextInt(10));
8、批量更改value的值 flatMapValues(和mapValues是有區別的)
JavaPairRDD flatMapValues = pairs.flatMapValues(v1 -> Lists.newArrayList(10));
四、Pair RDD 行動操作
1、對每個鍵對應的元素分別計數 countByKey
Map countByKey = pairs.countByKey();
2、將結果以對映表的形式返回,以便查詢 collectAsMap
Map collectAsMap = pairs.collectAsMap();
3、返回給定鍵對應的所有值 lookup
List lookup = pairs.lookup(9);
五、RDD分割槽
1、什麼是分割槽
RDD 內部的資料集合在邏輯上(以及物理上)被劃分成多個小集合,這樣的每一個小集合被稱為分割槽。RDDprdd作為一個分散式的資料集,是分佈在多個worker節點上的。如下圖所示,RDD1有五個分割槽(partition),他們分佈在了四個worker nodes 上面,RDD2有三個分割槽,分佈在了三個worker nodes上面。
2、為什麼要分割槽
分割槽的個數決定了平行計算的粒度。多個分割槽平行計算,能夠充分利用計算資源。
3、如何手動分割槽
java的分割槽可以這樣(parallelize)
JavaRDDrdd = sc.parallelize(list, 2); // 這個是分割槽用了,指定建立得到的 RDD 分割槽個數為 2。
pairs.partitions().size() 分割槽數量檢視