1. 程式人生 > >零基礎入門大資料之spark中rdd部分運算元詳解

零基礎入門大資料之spark中rdd部分運算元詳解

先前文章介紹過一些spark相關知識,本文繼續補充一些細節。

我們知道,spark中一個重要的資料結構是rdd,這是一種並行集合的資料格式,大多數操作都是圍繞著rdd來的,rdd裡面擁有眾多的方法可以呼叫從而實現各種各樣的功能,那麼通常情況下我們讀入的資料來源並非rdd格式的,如何轉換為rdd呢?

一個基本的方法是初始化,或者格式化操作函式parallelize。

  • parallelize

比如一個數組Array(1,2,3,4,5),經過parallelize後就變成了rdd格式的陣列。

scala> val d = Array(1,2,3,4,5)
d: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd_d = sc.parallelize(d)
rdd_d: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27

rdd的初始化parallelize函式可以接受兩個引數,上面省略了第二個預設引數,這個引數是分片個數slices,表示資料集切分的份數。一份資料存在某臺機器上的時候,可以指定切分的份數,可以想象,切分的越多,處理起來因為是並行的,速度越快,當然這是要佔資源的,當資料小的時候完全沒必要。典型地,你可以在叢集的每個CPU上分佈2-4個slices. 一般來說,Spark會嘗試根據叢集的狀況,來自動設定slices的數目。也可以通過傳遞給parallelize的第二個引數來進行手動設定,例如:sc.parallelize(data, 10))。

  • aggregate

再來看一個稍複雜的函式:aggregate。這是一個比較底層的使用也很廣泛的函式,字面意思就是聚合的意思。不過它有兩層聚集:第一層,對分片的資料進行聚集,這裡的片就是上面的slices,第二層,對聚集的結果再進行聚集。看一下函式原型:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U)

這個函式需要初始化一個值zeroValue。第一層聚合seqOp,第二層聚合combOp。

看一個例子:

scala> val d = Array(1,2,3,4,5,6)
scala> val rdd_d = sc.parallelize(d,2)
scala> val res = rdd_d.aggregate(0)(math.max(_,_),_+_)

res: Int = 9

解釋一下:
1)首先將一組陣列分成兩片儲存起來:parallelize(d,2);預設是平均分,也就是1,2,3一片,4,5,6一片

2)對每一片執行取最大值操作,因為是聚合函式,所以輸入是兩個,輸出是一個的函式,math.max(_,_)就是這種,下劃線代表所有元素執行。
可以看到這一步過後每一片的結果就是3,6。

3)之後對3,6執行相加的操作得到9.

一般來說這兩個函式設定成一樣的,比如,就是取最大值,那麼第二個函式也可以變成math.max(_,_)

  • cache方法

cache是將RDD的結果暫時存放在記憶體裡面,方便後面需要用到這個rdd的時候不用再計算。

我們知道,spark裡面包含兩種運算運算元,一種是轉換運算元,一種是行為運算元,轉換運算元再spark裡面是惰性計算的,什麼意思呢?就是你寫了程式碼,但是實際上程式執行到這一步並沒有實際發生計算,只有碰到了行為運算元才算正兒八經的計算。轉換運算元,就好比畫了計算流程圖一樣,只是單純的框架而已。這讓我想起了tensorflow裡面的一些運算也是這樣,果然都是一家的程式設計師,思路都完美繼承。

說遠了,這和cache運算元有什麼關係呢?舉個例子,假設資料a經過三層map變成了d,也就是a->b->c->d,a到d之間都是轉換操作,這個時候如果某個計算需要用到d,那麼就會把a到d的過程重新走一遍,因為d在一次計算以後不會儲存在記憶體裡面的,這就導致了一個嚴重的問題就是需要重複計算很多東西。如果d被程式在不同地方多次呼叫的話將帶來效能的下降。這個時候有沒有辦法把d第一次被計算的結果儲存起來呢,有,這就是cache方法,起到快取的作用。

  • 笛卡爾操作 cartesian

什麼是笛卡爾操作?就是兩個集合中的元素分別兩兩排列組合,舉個例子:

val s1 = sc.parallelize(Array(1,2,3,4,5))
val s2 = sc.parallelize(Array("a","b","c"))
val res = s1.cartesian(s2)
res.collect()

>>>  Array[(Int, String)] = Array((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c), (4,a), (4,b), (4,c), (5,a), (5,b), (5,c))

看例子非常容易理解。笛卡爾操作在某些時候還是很有效的。

  • 去重方法:distinct

顧名思義,去掉rdd中重疊的元素。實際中我經常也會用到,比如想把幾個rdd進行合併起來,可以用union方法,但是呢裡面會有重複的,這個時候再接這個函式即可。如下:

val s1 = sc.parallelize(Array(1,2,3,4,5))
val s2 = sc.parallelize(Array(4,5,6,7,8))
val res = s1.union(s2).distinct()
res.collect()
    
>>> Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)

  • 過濾方法:filter

這個方法可以說非常的重要了,靈活運用可以實現非常多的功能。filter方法就是過濾掉rdd中滿足一定條件的rdd,filter函式裡面可以定義各種過濾函式。比如下面:

val s1 = sc.parallelize(Array(1,2,3,4,5))
val res = s1.filter(x => x>3)
res.collect()

>>> Array[Int] = Array(4, 5)

  • 生成鍵值對的keyBy方法

keyBy方法是為rdd中的每個資料額外增加一個key構成(key,value)的資料對,進而這種結構的資料可以使用(key,value)專門的一些聚合函式,這些函式在以前的文章中記錄過。keyBy的方法也比較簡單,舉個例子就明白了:

val s1 = sc.parallelize(Array(1,2,3,4,5))
val res = s1.keyBy(x => x*x)
res.collect()

>>> Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5))

這裡將rdd中每個數的平方值當作自己的key,生成的結果可以看到。所以keyBy只是生成對應元素key。

本文暫時記錄這麼多方法吧。


關注公號【AInewworld】,第一時間獲取精彩內容
在這裡插入圖片描述