1. 程式人生 > >spark RDD和RDD運算元

spark RDD和RDD運算元

什麼是RDD?

RDD全稱resilient distributed dataset(彈性分散式資料集)。他是一個彈性分散式資料集,是spark裡面抽象的概念。代表的是一個不可變的,集合裡面的元素可以分割槽的支援並行化的操作。

RDD產生的意義在於降低開發分散式應用程式的門檻和提高執行效率。它是一個可以容錯的不可變集合,集合中的元素可以進行並行化地處理,Spark是圍繞RDDs的概念展開的。RDD可以通過有兩種建立的方式,一種是通過已經存在的驅動程式中的集合進行建立,另一種是通引用外部儲存系統中的資料集進行建立,這裡的外部系統可以是像HDFS或HBase這樣的共享檔案系統,也可以是任何支援hadoop InputFormat的資料。

在原始碼中,RDD是一個具備泛型的可序列化的抽象類。具備泛型意味著RDD內部儲存的資料型別不定,大多數型別的資料都可以儲存在RDD之中。RDD是一個抽象類則意味著RDD不能直接使用,我們使用的時候通常使用的是它的子類,如HadoopRDD,BlockRDD,JdbcRDD,MapPartitionsRDD,CheckpointRDD等。


RDD的五大特性:

1、RDD是由一系列的分割槽組成。

2、一個操作作用於每一個分割槽。

3、RDD之間存在各種依賴關係。

4、可選的特性,key-value型的RDD是通過hash進行分割槽。

5、RDD的每一個分割槽在計算時會選擇最佳的計算位置。(資料本地化)


RDD的建立方式:

sc.parallelize

外部資料來源


Spark中的運算元:

1、RDD可以分為兩類,transformations和actions。
2、Transformations 變換/轉換運算元:將一個RDD轉換成另一個RDD,所有的Transformation都是lazy的,只有發生action是才會觸發計算。
3、Action 行動運算元:這類運算元會觸發 SparkContext 提交 作業。
####思考:spark官網說這樣設定運算元會使spark執行地更加的高效,請問這是為什麼呢?
答:1)假設執行一個rdda.map().reduce()的操作,如果作為轉換運算元map()也觸發計算,則肯定得將結果寫出來,降低效率。
        2)由於lineage的關係,之後詳細講解。
4、當你對一個RDD進行轉換時,只要觸發action操作就可能會引起RDD的重算,RDD的重算機制使得當某個RDD資料丟失重算可以恢復該RDD。
5、你可以通過persist()或cache()方法將RDD儲存在記憶體中,這樣的話,在下次查詢時可以更快地訪問到RDD中的元素。spark也支援將RDD儲存在磁碟上,也支援RDD的跨節點複製。


Transformations中的運算元


map(func) 
返回一個新的分散式資料集,由每個原元素經過func函式處理後的新元素組成 
scala> var data = sc.parallelize(1 to 9,3)
//內容為 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> data.map(x=>x*2).collect()
//輸出內容 Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

filter(func) 
返回一個新的資料集,由經過func函式處理後返回值為true的原元素組成 
scala> var data = sc.parallelize(1 to 9,3)
//內容為 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> data.filter(x=> x%2==0).collect()
//輸出內容 Array[Int] = Array(2, 4, 6, 8)

flatMap(func) 
類似於map,但是每一個輸入元素,會被對映為0個或多個輸出元素,(因此,func函式的返回值是一個seq,而不是單一元素) 
scala> var data = sc.parallelize(1 to 4,1)
//輸出內容為 Array[Int] = Array(1, 2, 3, 4)
scala> data.flatMap(x=> 1 to x).collect()
//輸出內容為 Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

mapPartitions(func) 
類似於map,對RDD的每個分割槽起作用,在型別為T的RDD上執行時,func的函式型別必須是Iterator[T]=>Iterator[U]
//首先建立三個分割槽
scala> var data = sc.parallelize(1 to 9,3)
//輸出為 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
//檢視分割槽的個數
scala> data.partitions.size
//輸出為 Int = 3
//使用mapPartitions
scala> var result = data.mapPartitions{ x=> {
var res = List[Int]()
var i = 0
while(x.hasNext){
i+=x.next()
}
res.::(i).iterator
 }}
//:: 該方法被稱為cons,意為構造,向佇列的頭部追加資料,創造新的列表。用法為 x::list,其中x為加入到頭部的元素,無論x是列表與否,它都只將成為新生成列表的第一個元素,也就是說新生成的列表長度為list的長度+1(btw, x::list等價於list.::(x))
scala> result.collect
//輸出為 Array[Int] = Array(6, 15, 24)

mapPartitionsWithIndex(func) 和mapPartitions類似,但func帶有一個整數引數表上分割槽的索引值,在型別為T的RDD上執行時,func的函式引數型別必須是(int,Iterator[T])=>Iterator[U] 
//首先建立三個分割槽
scala> var data = sc.parallelize(1 to 9,3)
//輸出為 Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
//檢視分割槽的個數
scala> data.partitions.size
//輸出為 Int = 3
scala> var result = data.mapPartitionsWithIndex{
(x,iter) => {
var result = List[String]()
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::( x + "|" +i).iterator
}}
result.collect
//輸出結果為 Array[String] = Array(0|6, 1|15, 2|24)

sample(withReplacement,fraction,seed) 
根據給定的隨機種子seed,隨機抽樣出數量為fraction的資料 (第一個引數withReplacement這個值如果是true時,採用PoissonSampler取樣器(Poisson分佈),否則使用BernoulliSampler的取樣器.;第二個引數fraction 一個大於0,小於或等於1的小數值,用於控制要讀取的資料所佔整個資料集的概率;第三個引數seed 表示隨機的種子)
//建立資料
var data = sc.parallelize(1 to 1000,1)
//採用固定的種子seed隨機
data.sample(true,0.005,0).collect
//輸出為 Array[Int] = Array(192, 435, 459, 647, 936)
data.sample(false,0.005,0).collect
//輸出為 Array[Int] = Array(192, 795, 826)
//採用隨機種子
data.sample(false,0.005,scala.util.Random.nextInt(1000)).collect
//輸出為 Array[Int] = Array(136, 158)


union(otherDataSet) 
返回一個新的資料集,由原資料集合引數聯合而成 (不會去重)
//建立第一個資料集
scala> var data1 = sc.parallelize(1 to 5,1)
//建立第二個資料集
scala> var data2 = sc.parallelize(3 to 7,1)
//取並集
scala> data1.union(data2).collect
//輸出為 Array[Int] = Array(1, 2, 3, 4, 5, 3, 4, 5, 6, 7)

intersection(otherDataset) 
求兩個RDD的交集 
//建立第一個資料集
scala> var data1 = sc.parallelize(1 to 5,1)
//建立第二個資料集
scala> var data2 = sc.parallelize(3 to 7,1)
//取交集
scala> data1.intersection(data2).collect
//輸出為 Array[Int] = Array(4, 3, 5)


distinct([numtasks]) 
返回一個包含源資料集中所有不重複元素的新資料集(去重)
//建立資料集
scala> var data = sc.parallelize(List(1,1,1,2,2,3,4),1)
//執行去重
scala> data.distinct.collect
//輸出為 Array[Int] = Array(4, 1, 3, 2)
//如果是鍵值對的資料,kv都相同,才算是相同的元素
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))
//執行去重
scala> data.distinct.collect
//輸出為 Array[(String, Int)] = Array((A,1), (B,1), (A,2))

groupByKey([numtasks]) 
在一個由(K,v)對組成的資料集上呼叫,返回一個(K,Seq[V])對組成的資料集。
如果想要對key進行聚合的話,使用reduceByKey或者combineByKey會有更好的效能 
預設情況下,輸出結果的並行度依賴於父RDD的分割槽數目
//建立資料集
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))
//分組輸出
scala> data.groupByKey.collect
//輸出為 Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(1, 1, 2)))

reduceByKey(func,[numTasks]) 
在一個(K,V)對的資料集上使用,返回一個(K,V)對的資料集,key相同的值,都被使用指定的reduce函式聚合到一起,reduce任務的個數是可以通過第二個可選引數來配置的 
//建立資料集
scala> var data = sc.parallelize(List(("A",1),("A",1),("A",2),("B",1)))
scala> data.reduceByKey((x,y) => x+y).collect
//輸出為 Array[(String, Int)] = Array((B,1), (A,4))


aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
在(k,v)對的資料集上呼叫時,返回一個數據集(k,u)對,其中每個鍵的值使用給定的組合函式和一箇中立的“0”值聚合。
允許與輸入值型別不同的聚合值型別,同時避免不必要的分配。像groupbykey,數量減少的任務是通過配置一個可選引數。
scala> var data = sc.parallelize(List((1,1),(1,2),(1,3),(2,4)),2)
scala> def sum(a:Int,b:Int):Int = { a+b }
scala> data.aggregateByKey(0)(sum,sum).collect
res42: Array[(Int, Int)] = Array((2,4), (1,6))
scala> def max(a:Int,b:Int):Int = { math.max(a,b) }
scala> data.aggregateByKey(0)(max,sum).collect
res44: Array[(Int, Int)] = Array((2,4), (1,5))

sortByKey([ascending],[numTasks]) 
在型別為(K,V)的資料集上呼叫,返回以K為鍵進行排序的(K,V)對資料集,升序或者降序有boolean型的ascending引數決定
//建立資料集
scala> var data = sc.parallelize(List(("A",2),("B",2),("A",1),("B",1),("C",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at <console>:27
//對資料集按照key進行預設排序
scala> data.sortByKey().collect
res23: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))
//升序排序
scala> data.sortByKey(true).collect
res24: Array[(String, Int)] = Array((A,2), (A,1), (B,2), (B,1), (C,1))
//降序排序
scala> data.sortByKey(false).collect
res25: Array[(String, Int)] = Array((C,1), (B,2), (B,1), (A,2), (A,1))

join(otherDataset,[numTasks]) 
在型別為(K,V)和(K,W)型別的資料集上呼叫,返回一個(K,(V,W))對,每個key中的所有元素都在一起的資料集 ,外連線是通過leftouterjoin,rightouterjoin支援,和fullouterjoin。
//建立第一個資料集
scala> var data1 = sc.parallelize(List(("A",1),("A",2),("C",3)))
//建立第二個資料集
scala> var data2 = sc.parallelize(List(("A",4)))
//建立第三個資料集
scala> var data3 = sc.parallelize(List(("A",4),("A",5)))
data1.join(data2).collect
//輸出為 Array[(String, (Int, Int))] = Array((A,(1,4)), (A,(2,4)))
data1.join(data3).collect
//輸出為 Array[(String, (Int, Int))] = Array((A,(1,4)), (A,(1,5)), (A,(2,4)), (A,(2,5)))

cogroup(otherDataset,[numTasks]) 
在型別為(K,V)和(K,W)型別的資料集上呼叫,返回一個數據集,為(K,Iterable[V],Iterable[W]) 元組,這種操作也被稱為分組。
//建立第一個資料集
scala> var data1 = sc.parallelize(List(("A",1),("A",2),("C",3)))
//建立第二個資料集
scala> var data2 = sc.parallelize(List(("A",4)))
//建立第三個資料集
scala> var data3 = sc.parallelize(List(("A",4),("A",5)))
scala> data1.cogroup(data2).collect
//Array[(String, (Iterable[Int], Iterable[Int]))] = Array((A,(CompactBuffer(1, 2),CompactBuffer(4))), (C,(CompactBuffer(3),CompactBuffer())))
scala> data1.cogroup(data3).collect
//Array[(String, (Iterable[Int], Iterable[Int]))] = Array((A,(CompactBuffer(1, 2),CompactBuffer(4, 5))), (C,(CompactBuffer(3),CompactBuffer())))

cartesian(otherDataset) 
笛卡爾積,但在資料集T和U上呼叫時,返回一個(T,U)對的資料集,所有元素互動進行笛卡爾積 
//建立第一個資料集
scala> var a = sc.parallelize(List(1,2))
//建立第二個資料集
scala> var b = sc.parallelize(List("A","B"))
//計算笛卡爾積
scala> a.cartesian(b).collect
//輸出結果 res2: Array[(Int, String)] = Array((1,A), (1,B), (2,A), (2,B))


pipe(command,[envVars]) 
通過管道的方式對RDD的每個分割槽使用shell命令進行操作,返回對應的結果 。分割槽的元素將會被當做輸入,指令碼的輸出則被當做返回的RDD值。
//建立資料集
scala> var data = sc.parallelize(1 to 9,3)
//測試指令碼
scala> data.pipe("head -n 1").collect
res26: Array[String] = Array(1, 4, 7)
scala> data.pipe("tail -n 1").collect
res27: Array[String] = Array(3, 6, 9)
scala> data.pipe("tail -n 2").collect
res28: Array[String] = Array(2, 3, 5, 6, 8, 9)


coalesce(numPartitions) 
對RDD中的分割槽減少指定的數目,通常在過濾完一個大的資料集之後進行此操作 ,用於合併小檔案(第一個引數是分割槽的數量,第二個引數是是否進行shuffle)
//建立資料集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27
//檢視分割槽的大小
scala> data.partitions.size
res3: Int = 3
//不使用shuffle重新分割槽
scala> var result = data.coalesce(2,false)
result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[19] at coalesce at <console>:29
scala> result.partitions.length
res12: Int = 2
scala> result.toDebugString
res13: String = 
(2) CoalescedRDD[19] at coalesce at <console>:29 []
 |  ParallelCollectionRDD[9] at parallelize at <console>:27 []
//使用shuffle重新分割槽
scala> var result = data.coalesce(2,true)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at coalesce at <console>:29
scala> result.partitions.length
res14: Int = 2
scala> result.toDebugString
res15: String = 
(2) MapPartitionsRDD[23] at coalesce at <console>:29 []
 |  CoalescedRDD[22] at coalesce at <console>:29 []
 |  ShuffledRDD[21] at coalesce at <console>:29 []
 +-(3) MapPartitionsRDD[20] at coalesce at <console>:29 []
    |  ParallelCollectionRDD[9] at parallelize at <console>:27 []

repartition(numpartitions) 
重新洗牌的RDD資料隨機到建立更多或更少的分割槽中並且使資料再平衡。
用於增加或減少RDD的並行度(分割槽數)和解決資料傾斜的。是一個預設使用shuffle的coalesce,如果是用於減少分割槽數量,請使用coalesce,避免使用shuffle。
//建立資料集
scala> var data = sc.parallelize(1 to 9,3)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27
//檢視分割槽的大小
scala> data.partitions.size
res3: Int = 3
scala> var result = data.repartition(2)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at repartition at <console>:29
scala> result.partitions.length
res16: Int = 2
scala> result.toDebugString
res17: String = 
(2) MapPartitionsRDD[27] at repartition at <console>:29 []
 |  CoalescedRDD[26] at repartition at <console>:29 []
 |  ShuffledRDD[25] at repartition at <console>:29 []
 +-(3) MapPartitionsRDD[24] at repartition at <console>:29 []
    |  ParallelCollectionRDD[9] at parallelize at <console>:27 []

repartitionAndSortWithinPartitions(partitioner)
這個方法是在分割槽中按照key進行排序,這種方式比先分割槽再sort更高效,因為相當於在shuffle階段就進行排序。
scala> var data = sc.parallelize(List((1,2),(1,1),(2,3),(2,1),(1,4),(3,5)),2)
data: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[60] at parallelize at <console>:27
scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(2)).collect
res52: Array[(Int, Int)] = Array((2,3), (2,1), (1,2), (1,1), (1,4), (3,5))
scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(1)).collect
res53: Array[(Int, Int)] = Array((1,2), (1,1), (1,4), (2,3), (2,1), (3,5))
scala> data.repartitionAndSortWithinPartitions(new org.apache.spark.HashPartitioner(3)).collect
res54: Array[(Int, Int)] = Array((3,5), (1,2), (1,1), (1,4), (2,3), (2,1))


Action中的運算元:


reduce(func) 
通過函式func聚集資料集中的所有元素,這個函式必須是關聯性的,確保可以被正確的併發執行 
//建立資料集
scala> var data = sc.parallelize(1 to 3,1)
scala> data.collect
res6: Array[Int] = Array(1, 2, 3)
//collect計算
scala> data.reduce((x,y)=>x+y)
res5: Int = 6

collect() 
在driver的程式中,以陣列的形式,返回資料集的所有元素,這通常會在使用filter或者其它操作後,返回一個足夠小的資料子集再使用 
//建立資料集
scala> var data = sc.parallelize(1 to 3,1)
scala> data.collect
res6: Array[Int] = Array(1, 2, 3)

count() 
返回資料集的元素個數 
//建立資料集
scala> var data = sc.parallelize(1 to 3,1)
//統計個數
scala> data.count
res7: Long = 3
scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.count
res8: Long = 2


first() 
返回資料集的第一個元素(類似於take(1)) 
//建立資料集
scala> var data = sc.parallelize(List(("A",1),("B",1)))
//獲取第一條元素
scala> data.first
res9: (String, Int) = (A,1)


take(n) 
返回一個數組,由資料集的前n個元素組成。注意此操作目前並非並行執行的,而是driver程式所在機器 
//建立資料集
scala> var data = sc.parallelize(List(("A",1),("B",1),("B",2),("C",1)))
scala> data.take(1)
res10: Array[(String, Int)] = Array((A,1))
//如果n大於總數,則會返回所有的資料
scala> data.take(8)
res12:  Array[(String, Int)] = Array((A,1), (B,1), (B,2), (C,1))
//如果n小於等於0,會返回空陣列
scala> data.take(-1)
res13: Array[(String, Int)] = Array()
scala> data.take(0)
res14: Array[(String, Int)] = Array()


takeSample(withReplacement,num,seed) 
返回一個數組,在資料集中隨機取樣num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定的隨機數生成器種子 
這個方法與sample還是有一些不同的,主要表現在:
返回具體個數的樣本(第二個引數指定)
直接返回array而不是RDD
內部會將返回結果隨機打散
//建立資料集
scala> var data = sc.parallelize(List(1,3,5,7))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
//隨機2個數據
scala> data.takeSample(true,2,1)
res0: Array[Int] = Array(7, 1)
//隨機4個數據,注意隨機的資料可能是重複的
scala> data.takeSample(true,4,1)
res1: Array[Int] = Array(7, 7, 3, 7)
//第一個引數是是否重複
scala> data.takeSample(false,4,1)
res2: Array[Int] = Array(3, 5, 7, 1)
scala> data.takeSample(false,5,2)
res3: Array[Int] = Array(3, 5, 7, 1)

takeOrderd(n,[ordering]) 
排序後的limit(n) 
//建立資料集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21
//返回排序資料
scala> data.takeOrdered(3)
res4: Array[String] = Array(a, b, c)

saveAsTextFile(path) 
將資料集的元素,以textfile的形式儲存到本地檔案系統hdfs或者任何其他hadoop支援的檔案系統,spark將會呼叫每個元素的toString方法,並將它轉換為檔案中的一行文字 
//建立資料集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21
//儲存為test_data_save檔案
scala> data.saveAsTextFile("test_data_save")
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
<console>:24: error: not found: type GzipCodec
              data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
                                                           ^
//引入必要的class
scala> import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.io.compress.GzipCodec
//儲存為壓縮檔案
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])


saveAsSequenceFile(path) 
將資料集的元素,以sequencefile的格式儲存到指定的目錄下,本地系統,hdfs或者任何其他hadoop支援的檔案系統,RDD的元素必須由key-value對組成。並都實現了hadoop的writable介面或隱式可以轉換為writable 

saveAsObjectFile(path) 
使用java的序列化方法儲存到本地檔案,可以被sparkContext.objectFile()載入 

countByKey() 
對(K,V)型別的RDD有效,返回一個(K,Int)對的map,表示每一個可以對應的元素個數 
//建立資料集
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:22
//統計個數
scala> data.countByKey
res9: scala.collection.Map[String,Long] = Map(B -> 1, A -> 2)


foreache(func) 
在資料集的每一個元素上,執行函式func,t通常用於更新一個累加器變數,或者和外部儲存系統做互動
// 建立資料集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:22
// 遍歷
scala> data.foreach(x=>println(x+" hello"))
b hello
a hello
e hello
f hello
c hello

參考部落格:
http://www.cnblogs.com/xing901022/p/5947706.html
https://www.cnblogs.com/xing901022/p/5944297.html


【來自@若澤大資料】