1. 程式人生 > >Spark程式設計指南入門之Java篇二-基本操作

Spark程式設計指南入門之Java篇二-基本操作

4. RDD的操作

4.1 基本操作


RDD有2種類型的操作,一種是轉換transformations,它基於一個存在的資料集創建出一個新的資料集;另一種是行動actions,它通過對一個存在的資料集進行運算得出結果。例如,map方法是轉換操作,它將資料集的每一個元素按指定的函式轉換為一個新的RDD;reduce方法是行動操作,它將資料集的所有元素按指定的函式進行聚合運算得出結果給驅動節點。Spark的所有轉換操作都是延時載入執行的,當需要執行行動操作返回結果時才會載入執行,這種設計讓Spark的運算更加高效。例如:

distData.map(a -> {
	System.out.println(a);
	return (a + 1);
});

執行上述程式碼後控制檯是沒有輸出的,因為map方法實際是沒有執行的。如果結合reduce使用的話就會被執行,例如以下例子,控制檯輸出1 2 3 4 5。

distData.map(a -> {
	System.out.println(a);
	return (a + 1);
}).reduce((a, b) -> (a + b));

預設地,每個轉換的RDD在執行action操作時都會重新計算,不過可以使用persist或cache方法把RDD持久化到記憶體以便下次使用而不需要重新運算建立;Spark也支援將RDD持久化到磁碟,或在多個節點上覆制。例如下面程式碼把map的結果持久化到記憶體:

distData.map(a -> a + 1).persist(StorageLevel.MEMORY_ONLY());

4.2 傳遞方法

Spark的API在很大程度上依賴於把驅動節點中的方法傳遞到叢集上執行,那些方法都是通過實現了定義在org.apache.spark.api.java.function包裡面介面的類來表示,有2種建立該方法的方式:

- 使用匿名內部類或命名內部類實現該方法介面
- 使用Java 8的lambda表示式定義實現

在上述的3.1程式碼裡面也有相關的例子,又例如下面的2個例子,分別使用了匿名內部類和實體類:

- 匿名內部類方式:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
	public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
	public Integer call(Integer a, Integer b) { return a + b; }
});

- 命名內部類方式:

class GetLength implements Function<String, Integer> {
	public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
    public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

5. 理解閉包

理解程式在叢集中執行時變數和方法的使用範圍和生命週期是Spark的其中一個難點。在變數使用範圍之外修改變數的RDD操作是一個經常引起困惑的原因。例如下面的例子,使用了foreach()方法實現一個計數器,根據程式是否在同一個JVM中執行會有不同的結果(類似的問題在其它的RDD操作中也會出現),例如當程式執行在Spark本地模式(--master = local[n])和叢集模式(例如通過spark-submit提交任務到YARN):

int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// 錯誤: 不要使用下面操作!!
rdd.foreach(x -> counter += x);

System.out.println("Counter value: " + counter);

5.1 本地模式 VS 叢集模式

在叢集模式下,Spark執行jobs的時候會把處理的RDD操作分割為多個任務,每個任務被一個執行器executor處理,在執行之前,Spark首先會計算任務的閉包。閉包是必須對executor可見的變數和方法,用於對RDD執行運算(上例的foreach())。閉包會被序列化和傳送給每個executor,被髮送給每個executor的閉包內的變數是原來的副本。因此,當在foreach方法內引用的計數器已經不是原來驅動節點的計數器了。在驅動節點的記憶體裡面雖然仍有一個計數器,但它對其它的executors都是不可見的!executors只能訪問對應序列化閉包中的計數器副本。因此,最終的計數器值仍然是0,因為所有對計數器的操作都是對對應序列化閉包中的計數器副本執行的。

在本地模式下,一般情況foreach方法和驅動節點是在同一個JVM執行,因此操作的是同一個計數器,會得出正確的運算結果。

在類似的場景下,為了確保得出正確的結果應該使用累加器Accumulator。Spark中的累加器專門為在叢集中的多個節點間更新同一變數提供了一種安全的機制。在後續的指南里面會對累加器做進一步的詳細介紹。一般來說,像迴圈或本地定義方法這樣的閉包結構,不應該用於更改全域性狀態。

5.2 列印RDD的元素

另外一種常用的操作就是使用rdd.foreach(x -> System.out.println(x))或rdd.map(x -> System.out.println(x))列印一個RDD的所有元素。在單機模式下,該方法會輸出期待的列印對應RDD的所有元素結果。然而在叢集模式下,輸出的結果是在對應executor的stdout,而不是在驅動節點,所以在驅動節點的stdout是看不見輸出的。如果想把RDD的所有元素列印到驅動節點上,可以使用collect()方法:

JavaRDD<Integer> rdd = sc.parallelize(data);
List<Integer> list = rdd.collect();
for (Integer i : list) {
    System.out.println(i);
}

但這樣會導致驅動節點的記憶體不足,因為collect()方法會把整個RDD的資料都傳送到驅動節點上;如果只需要列印RDD的少量元素,可以使用較安全的take()方法獲取前X個元素,例如下面例子獲取RDD的前100個元素到驅動節點並打印出來:

JavaRDD<Integer> rdd = sc.parallelize(data);
List<Integer> list = rdd.take(100);
for (Integer i : list) {
    System.out.println(i);
}

6. 鍵值對的操作

大部分Spark操作的RDD都是包含任意型別的物件,也有少量特殊的僅支援含有鍵值對的RDD操作。最常用的一個操作是分散式“移動(shuffle)”操作,例如按照key將RDD的元素進行分組或聚合操作。

在Java中,鍵值對使用的是Scala標準庫裡面的scala.Tuple2類,可以通過呼叫new Tuple2(a, b)建立,然後通過tuple._1()和tuple._2()方法訪問它的屬性。鍵值對RDD使用的是JavaPairRDD類,可以使用特定的map操作將JavaRDDs轉換為JavaPairRDDs,例如mapToPair和flatMapToPair。JavaPairRDD擁有標準RDD和特殊鍵值對的方法,例如,在下面的程式碼中使用了reduceByKey對鍵值對操作,計算每行的文字出現的次數:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

我們還可以使用counts.sortByKey()按照字母順序將鍵值對排序,然後使用counts.collect()將結果以一個數組的形式傳送給驅動節點。需要注意的是,當在鍵值對操作中使用自定義物件作為key時,必須確保自定義的equals()方法有對應的hashCode()方法。

TO BE CONTINUED...O(∩_∩)O