1. 程式人生 > >Learning Spark中文版--第三章--RDD編程(2)

Learning Spark中文版--第三章--RDD編程(2)

翻譯 瓶頸 並集 ria multi guide 第六章 rabl 函數式

Common Transformations and Actions

??本章中,我們瀏覽了Spark中大多數常見的transformation(轉換)和action(動作)。在包含特定數據類型的RDD上可以進行額外的操作,例如,可以對純數字RDD使用統計函數,對鍵值對的RDD進行聚合操作。後面的章節我們會介紹這些特別的操作和RDD類型間的轉換。

Basic RDD

??我們將首先描述所有的RDD上可以執行的transformation(轉換)和action(動作),忽略數據的影響。

element-wise transformation(逐元素的轉換)

??你將會很喜歡使用map()和filter()這兩個最常用的transformation(轉換)(見圖3-2)。map()轉換接受一個函數參數,RDD中的每個元素都會經過map中的函數處理,生成新的元素組成的RDD。filter()轉換接受一個函數參數,並且返回一個能通過filter()函數的元素組成的RDD。

技術分享圖片


圖3-2,輸入RDD到mapRDD和filterRDD

??map()函數可以做太多太多事情了,從抓取網站關聯的每個URL放入集合到對數字進行平方。非常有用的一點是map()的返回值不必須和輸入類型一樣,這樣如果你有個String類型的RDD並且map()函數會把字符串轉換成Double類型返回,我們的輸入RDD會是RDD[String],結果RDD會是RDD[Double]。

??看一個基礎的例子,通過map()對數字平方。(Example3-26到3-28):

Example 3-26. Python squaring the values in an RDD

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i " % (num)
    
Example 3-27. Scala squaring the values in an RDD

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

Example 3-28. Java squaring the values in an RDD

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
    public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));

??有事我們想為每個輸入元素產生多個輸出元素。這種操作叫做flatMap()。和
map()一樣,RDD中的每個元素都會被flatMap()中的函數調用。我們不是返回一個元素,而是返回一個叠代器,其中包含要返回的值。我們沒有生成叠代器的RDD,而是返回一個有所有叠代器中元素組成的RDD。flatMap()一個簡單的用處就是把輸入的字符串切割成單詞,示例:

Example 3-29. flatMap() in Python, splitting lines into words

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # returns "hello"

Example 3-30. flatMap() in Scala, splitting lines into multiple words

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // returns "hello

Example 3-31. flatMap() in Java, splitting lines into multiple words

JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String line) {
    return Arrays.asList(line.split(" "));
    }
});
words.first(); // returns "hello"

??在圖3-3中有map()和flatMap()差異的圖解。你可以把flatMap()看做返回壓平了的叠代器(如Example中展示,flatMap中的函數把傳入的String切割成了單詞數組,但是返回的是一個個單詞組成的RDD。就好比我們看到了flatMap()把傳入的東西轉換(就是傳入flatMap()函數的操作)成一串珠子,但是最終返回的是一個個珠子),所以當操作結束返回的不是一個列表RDD,而是列表中每個元素組成的RDD。

技術分享圖片

Pseudo set operations(偽set操作)

??即使RDD本身不是很標準的集合,RDD也支持很多數學集合操作,如交集(intersection)和聯合(union)。圖3-4有展示了四個操作。很重要的一點是,所有的操作都需要使用相同類型的RDD。

(第一句話翻譯的不好,大體談下自己的理解,集合原文用的set,狹義set沒有重復元素(如Java中的set),但是下圖中的set有很多重復元素,set若取集合的意思,說這些RDD是set肯定不會用不標準來形容,但是取狹義的沒有重復元素的集合就可以用不是很準確來形容了。原文:when the RDDs themselves are not properly sets)

技術分享圖片


??我們RDD中最經常不能滿足set特性的一點就是元素的唯一性,因為RDD中經常有元素的重復。如果想要去重我們可以使用RDD.distinc()這個transformation(轉換)來產生一個新的沒有重復元素的RDD。註意distinct()的代價非常高昂,因為它會在網絡上整理(shuffling)所有的數據來確保我們只收到每個元素的唯一副本。我們會在第四章來詳細討論洗牌(shuffling)和如何避免洗牌。

??最簡單的集合操作是union(other),會返回由兩個源數據組成的RDD。這在許多用例中都很有用,例如從多個來源處理日誌文件。不像數學中的並集操作,如果在輸入RDD中有重復的元素,Spark的union操作會包含重復的元素(當然我們可以使用distinct來修正)。

??Spark也提供了intersection(other)方法,會返回兩個RDD中共有的元素。intersection()運行時會刪除所有重復的元素(包括單個RDD中的重復元素)。盡管intersection()和union()是很相似的概念,但是intersection()性能差很多,因為他需要整理整個網絡的數據去確定共有元素。

??有時候經過考慮我們需要刪除一些數據。subtract(other)函數可以從一個RDD中刪除另一個RDD中包含的元素,即第一個RDD減去第二個RDD的差。與intersection()一樣,subtract會進行耗時耗力的洗牌(shuffle)。

??我們還可以計算兩個RDD的笛卡爾積,如圖3-5。a是一個源RDD中的元素,b是另一個源RDD中的元素,兩個RDD做笛卡爾積transformation(轉換)會返回所有可能的(a,b)元素對兒。當我們想要思考所有可能的元素對兒的相似性的時候笛卡爾積就很有用,比如計算每個用戶對所提供價格的期望興趣。我們還可以對RDD自身做笛卡爾積,來實現很有用的需求如用戶相似度。需要註意的是笛卡爾積在較大數據的處理上代價非常高。

解釋一下笛卡爾積,笛卡爾積就是兩個集合的乘積。如果兩個集合c(數學,語文),s(Alice,Bob,Carl),一個課程集合,一個學生集合,兩個集合的笛卡爾積就是((數學,Alice),(數學,Bob),(數學,Carl),(語文,Alice),(語文,Bob),(語文,Carl)),這就是學生集合所有選課的可能性。

技術分享圖片


表3-2和3-3總結了上述的和常用的RDD轉換。

技術分享圖片


技術分享圖片

貼完圖發現時英文的,我再手打一遍吧。

表3-2 包含{1,2,3,3}的RDD的基本轉換

函數名 目的 示例 結果
map() 對RDD中的每個元素應用一個函數,返回一個RDD rdd.map(x => x+1) {2,3,4,4}
flatMap() 對RDD中的每個元素應用一個函數返回叠代器中的內容。經常用來提取單詞 rdd.flatMap(x => x.to(3) {1,2,3,2,3,3,3}
filter() 返回由每個通過filter()條件的元素組成的RDD rdd.filter(x => x!=1) {2,3,3}
distinct() 刪除重復元素 rdd.distinct() {1,2,3}
sample(withReplacement,fraction,[seed]) 對RDD進行替換或不替換采樣 rdd.sample,0.5) Nondeterministic

Sample是對rdd中的數據集進行采樣,並生成一個新的RDD,這個新的RDD只有原來RDD的部分數據,這個保留的數據集大小由fraction來進行控制。

參數說明

withReplacement,這個值如果是true時,采用PoissonSampler取樣器(Poisson分布),
否則使用BernoulliSampler的取樣器。

Fraction,一個大於0,小於或等於1的小數值,用於控制要讀取的數據所占整個數據集的概率。

Seed,這個值如果沒有傳入,默認值是一個0~Long.maxvalue之間的整數。

表3-2 包含{1,2,3}和{3,4,5}兩個RDD的轉換

函數名 目的 示例 結果
union() 產生由兩個RDD元素組成的RDD rdd.union(other) {1,2,3,3,4,5}
intersection() 產生兩個RDD共有元素的RDD rdd.intersection(other) {3}
subtract() 刪除一個RDD的內容(如,刪除訓練數據) rdd.subtract(other) {1,2}
cartesian() 求與另一個RDD的笛卡爾積 rdd.cartesian(othre) {(1,3)(1,4)...(3,5)}
Actions

??在基本的RDD上最常見的Action(動作)是reduce(),它會使用一個函數操作同一RDD上兩個的元素,然後返回一個新的和RDD類型相同元素。一個簡單的例子就是相加函數,可以用來計算RDD的總和。使用reduce()函數,我們可以非常簡單的求得RDD上元素之和,計算元素數量,或者執行其他聚合操作。示例:

Example 3-32. reduce() in Python
sum = rdd.reduce(lambda x, y: x + y)

Example 3-33. reduce() in Scala
val sum = rdd.reduce((x, y) => x + y)

Example 3-34. reduce() in Java
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer x, Integer y) { return x + y; }
});

??和reduce()很相似的是fold(),它和reduce()輸入函數的簽名是一樣的,但是除此之外,對於每個分區的初始調用它還需要一個“初始值(zero value)”。你提供的初始值應該是你操作的標識元素,也就是說,多次應用同意函數初始值不應該改變(例如,加法初始值是0,乘法初始值是1,或者元素串連成列表的初始值是個空列表)。

你可以通過修改並返回兩個參數中的第一個參數來最小化fold()中的對象創建,但是,不應該修改第二個參數。

??fold()和reduce()都需要返回結果的類型和我們操作的RDD中元素的類型一樣。這種條件對於sum這種操作就很有效,但是有時候我們需要返回一種不同的類型。例如,當我們計算運行平均值時,我們需要保持記錄總值和總數量,最終返回一對兒結果。我們可以使用map()函數將每個元素轉換成(元素,1),就變成了我們想要的返回類型,這樣reduce()函數可以對成對數據進行計算,順利輸出成對數據。

??aggregate()函數把我們從必須返回和操作RDD類型一樣的元素的約束中解脫出來。使用aggregate(),類似fold,我們提供一個想要返回的初始值類型。然後我們提供一個元素將RDD中的元素和累加器結合起來。最後,我們需要提供第二個函數將兩個累加器合並,因為每個節點都在本地累加自己的結果。

??我們可以使用aggregate來計算一個RDD的平均值,避免了在fold()函數之前使用map()函數進行處理。示例:

Example 3-35. aggregate() in Python

sumCount = nums.aggregate((0, 0),
                (lambda acc, value: (acc[0] + value, acc[1] + 1),
                (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])

Example 3-36. aggregate() in Scala

val result = input.aggregate((0, 0))(
                (acc, value) => (acc._1 + value, acc._2 + 1),
                (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble

Example 3-37. aggregate() in Java

class AvgCount implements Serializable {
    public AvgCount(int total, int num) {
        this.total = total;
        this.num = num;
    }
    public int total;
    public int num;
    public double avg() {
        return total / (double) num;
    }
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
    new Function2<AvgCount, Integer, AvgCount>() {
        public AvgCount call(AvgCount a, Integer x) {
            a.total += x;
            a.num += 1;
            return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
    new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
        a.total += b.total;
        a.num += b.num;
        return a;
    }
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());

??在RDD上的一些action(動作)將一些或所有的數據以常規集合或值的形式返回給我們的驅動程序。

??最簡單和常用的將數據返回給驅動程序的操作是collect(),這將返回整個RDD的內容。collect()常用在單元測試中,整個RDD的內容裝載進內存也是在預期之中,這使得我們很容易將RDD的值和預期結果進行比較。collect()受限於需要將所有的數據塞進內存,因為他需要把所有的數據拷貝到驅動程序。

??take(n)返回n個RDD上的元素並且盡可能最小化訪問的分區數量,所以他返回的集合可能有一些偏頗。比較重要的需要註意的一點是這些操作返回的元素可能和你預期的順序不一樣。

??這些操作對於單元測試和快速debug來說很有用,但是當數據量過大是,可能會導致運行瓶頸。

??如果數據有規定的順序,我們可以使用top()函數提取RDD最上面的元素。top()會使用默認排序來提取數據,但我們也可以提供比較函數來提取頂端元素。

??有時我們需要對驅動程序中的數據進行采樣。takeSample(withReplacement,num,seed)函數允許我們對數據使用替換或非替換采樣。

??有時對RDD中的所有元素進行action(動作)卻不返回結果給驅動程序很有用。一個很好的例子就是把JSON數據發送給web服務器,或者將記錄插入到數據庫中。另一種情況,foreach()操作允許我們對RDD中的每個元素執行計算,而不需要將其帶回到本地。

??除了我們講的基本RDD操作之外,更深入的操作的函數名可讀性非常好,通過他們的名字你大體就能理解他們所表現的操作方式。count()返回元素的總數,countByValue()返回一個唯一值的計數組成的map。表3-4總結了這些action(動作)。

Table3-4包含{1,2,3,3}RDD上的基本action(動作)

函數名 目的 示例 結果
collect() 返回RDD的所有元素 rdd.collect() {1,2,3,3}
count() 返回RDD元素數量 rdd.count 4
countByValue() 每個元素在RDD中出現的次數 rdd.countByValue() {(1,1),(2,1),(3,2)}
take(num) 返回RDD中num個元素 rdd.take(2) {1,2}
top(num) 返回RDD頂端的num個元素 rdd.top(2) {3,3}
takeOrdered(num)(ordering) 返回基於提供的排序的num個元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeSample(withReplacement,num,[seed]) 返回num個隨機元素 rdd.takeSample(false,1) 不確定
reduce(func) 並行地把RDD中的元素結合在一起 rdd.reduce((x,y)=>x+y) 9
fold(zero)(func) 和reduce一樣但是有初始值 rdd.fold(0)((x,y)=>x+y) 9
aggregate(zeroValue)(seqOp,combOp) 類似reduce但是用來返回一個不同類型 rdd.agregate((0,0))
((x,y)=>
(x._1+y,x._2+1),
(x,y) =>
(x._1+y._1,x._2+y._2)
(9,4)
foreach(func) RDD中每個元素都會應用在func函數上 rdd.foreach(func)

Converting Betwenn RDD Types(RDD之間的轉換)

??有些函數只能在確定RDD類型的時候使用,如mean()和variance()能用在數字類型的RDD,join()用在鍵值對的RDD上。我們在第六章會介紹數字數據,第四章介紹鍵值對RDD。在Sacla和Java中,這些方法沒有在標準RDD類中定義,所以為了使用這些額外功能我們必須確保得到了正確的專門化類。

Scala

??在Scala中,用特殊的函數轉換RDD(如,在Double類型的RDD上聲明數字(numberic)函數),特殊的函數是指使用隱式轉換自動處理。之前在17頁提到過的“初始化SparkContext”為了這些轉換工作,我們需要添加import org.apache.spark.SparkContext._。你可以在SparkContext 對象文檔中查看隱式轉換的清單(scala的隱式轉換很好用噢)。這些隱式轉換把RDD轉換成不同的包裝類,比如DoubleRDDFunctions(包裝數字數據的RDD)和PairRDDFunctions(包裝鍵值對),用來暴露附加功能如mean()和variance()。

??隱式轉換功能非常強大,但有時會讓人困惑。如果你可以在RDD上調用一個mean()函數,你可能查看了RDD類的Scala文檔並且註意到沒有mean()函數。但是調用卻因為RDD[Double]隱式轉換成DoubleRDDFunctions成功了。當在你的RDD的Scala文檔中查找函數式,確定一下是否在包裝類中能夠使用這些函數。

Java

??在JAVA中轉換兩個特別類型的RDD就比較明顯。特別地,對於這些類型的RDD,有專門的類成為JavaDoubleRDD和JavaPairRDD,這些類型的數據提供了額外的方法。這中方法的一個有點就是你可以很清楚的理解到底發生了什麽,但是這種方式可能有點麻煩。

??要構建這些特殊類型的RDD,而不是總是使用函數類,我們將需要使用專門的版本。如果我們想從T泛型的RDD中創建一個DoubleRDD,我們使用DoubleFunction<T>而不是Function<T,Double>。表3-5展示了這些特殊化函數和他們的用法。

??我們還需要在RDD上調用不同的函數(這樣我們不能創建一個Double函數並傳遞給map()函數)。當我們需要一個DoubleRDD時,不是調用map()進行轉換,而我們需要調用mapToDouble(),所有其他的函數都遵循相同的模式。

表3-5特定類型函數的Java接口

函數名 等價函數 用法
DoubleFlatMapFunction<T> Function<T,Iterable<Double>> 從flatMapToDouble中得到DoubleRDD
DoubleFunction<T> Function<T,double> 從mapToDouble中得到DoubleRDD
PairFlatMapFunction<T,K,V> Function<T,Iterable<Tuple2<K,V>>> 從flatMapToPair中得到PairRDD<K,V>
PairFunction<T,K,V> Function<T,Tuple2<K,V>> 從mapToPair中得到PairRDD<K,V>



??我們可以修改一下Example3-28,對RDD中的元素平方,產生一個JavaDoubleRDD,如Example3-38。這能夠使我們訪問額外的DoubleRDD特定的函數功能如mean()和variance()。

Example 3-38. Creating DoubleRDD in Java
JavaDoubleRDD result = rdd.mapToDouble(
    new DoubleFunction<Integer>() {
        public double call(Integer x) {
            return (double) x * x;
        }
});
System.out.println(result.mean());
Python

??Python的API和Java、Scala的結構不同。在Python中所有的函數都是在基本RDD類上實現的,但是如果RDD的類型不正確,運行時會失敗。

Persistence(Caching)

??向我們之前討論的,Spark的RDD是惰性求值的,並且我們有時會希望多次使用同一個RDD。如果我們天真地這樣做,每當我們調用一個action(動作),SPark將重新計算RDD和他所有的依賴項。對於叠代算法,這可能導致非常高昂的代價,因為它要多次查看數據。Example3-39展示了另一個小例子,統計並寫出相同的RDD。

Example 3-39. Double execution in Scala
val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))

??為了防止對一個RDD計算多次,我們可以讓Spark持久化數據。當我們讓Spark持久化一個RDD時,計算RDD的節點將存儲他們的分區。如果一個將數據持久化了的節點失敗了,Spark會在需要時重新計算剩下數據的分區。如果我們希望處理節點失敗時避免機器卡頓,可以將數據復制在多個節點上。

??Spark針對我們的目的提供了多種可供選擇的持久化級別,如表3-6。在Scala和Java中,默認的persist()會如不可序列化的對象一樣把數據保存在JVM的堆中。在Python中,我們總是通過序列化數據來持久化存儲,所以默認的是pickle(python中的序列化)對象而不是保存在JVM堆中。當我們將數據寫入磁盤或非堆存儲時,這些數據總是要被序列化。

表3-6.org.apche.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化級別。如果需要我們可以通過在存儲級別尾部添加_2 來在兩臺機器上復制數據。

技術分享圖片

堆外緩存是實驗性的並且使用了Tachyon。如果你對Spark的堆外緩存感興趣,可以查看一下Running Spark on Tachyon guide。

Example 3-40. persist() in Scala
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

??需要註意的是我們在第一次action(動作)之前調用了RDD的persist()。persist()調用本身不強制求值。

??如果你試圖在內存中緩存太多數據,Spark會使用最近最少使用的緩存策略來自動清除老分區。對於MEMORY_ONLY存儲級別,它會在它們下一次訪問是重新計算這些分區,對於MEMORY_AND_DISK級別,會把它們寫入磁盤。這意味著你不用擔心如果你緩存太多數據而導致job的崩潰。但是,大量重復計算的時間。

??最後,RDD還有一個unpersist()方法,可以讓你手動將RDD從緩存中刪除。

Conclusion

??這一章中,我們介紹了RDD的操作模型和大量的常用操作。如果你看到這,恭喜你已經學習了所有SPark中的核心概念。下一章中,我們將介紹關於鍵值對RDD上可使用的特殊操作集合,這些操作在並行條件下聚合和分組數據非常常用。在此之後,我們將討論各種數據源的輸入和輸出,以及使用SparkContext的高級話題。

Learning Spark中文版--第三章--RDD編程(2)