1. 程式人生 > >spark中常用運算元含義及區別

spark中常用運算元含義及區別


Transform:
1.
map:rdd中的每項資料進行map裡的操作後,會形成一個個新的元素的新rdd
flatMap:在map的基礎上進行扁平化,形成一個新的rdd
2.
distinct:轉換操作,去重
filter:對rdd中的元素進行過濾
filterByRange:範圍過濾,作用於鍵值對RDD,對RDD中元素進行過濾,返回鍵在指定範圍內的元素
3.
union:並集操作,不去重
intersection:交集操作,去重
subtract:類似intersection,返回在rdd中出現並且不在otherRdd中出現的元素
subtractByKey:與subtract類似,只不過這裡是針對key的,返回在主RDD中出現並且不在otherRDD中出現的元素
4.
join:相當於sql中的內聯,只返回兩個RDD根據key關聯上的結果 leftOuterJoin:相當於sql中的左外關聯,返回結果以前面的RDD為主,關聯不上的記錄為空 rightOuterJoin:相當於sql中的右外關聯,返回結果以後面的RDD為主,關聯不上的記錄為空 cartesian:做笛卡爾積 5. cogroup:將輸入的資料集(k,v)和另外的資料集(k,w)進行cogroup,得到的資料集是(k,Seq(v),Seq(w))的資料集 groupBy:接收一個函式,這個函式的返回值作為key,然後通過key對裡面的元素進行分組 groupByKey:會對每一個RDD中的value聚合成一個序列,此操作發生在reduce端,所以勢必所有的資料 將會通過網路傳輸,造成不必要的浪費,同時如果資料量十分大,可能還會造成OutOfMemoryError 6.
reduceByKey:會在結果傳送至reduce之前,會對每個mapper在本地進行merge,有點類似於MapReduce的combiner。 這樣做的好處是在map端進行一次reduce之後,資料量會大幅度減小,從而減小傳輸,保證reduce端能夠更快的計算出結果 reduceByKeyLocally:對RDD中的每個k對應的v值根據對映函式來計算,運算結果對映到Map[k,v],而不是RDD[k,v] 7. mapPartitions:與map函式類似,只不過對映函式的引數由RDD的每一個元素變成了RDD中每一個分割槽的迭代(如果在對映過程中 需要頻繁建立額外的物件,使用mapPartitions要比map高效,比如將RDD中所有資料通過JDBC寫入資料庫,如果使用map, 可能每個元素都要建立一個connection,這樣開銷很大,如果使用mapPartitions,那麼只需要針對每一個分割槽建立一個connection)
mapPartitionsWithIndex:函式作用同mapPartitions,不過提供了兩個引數,第一個引數為翻去的索引。 8. zip:用於兩個RDD組合成Key/Value形式的RDD,預設兩個RDD的partition的數量以及元素的數量相同,否則會丟擲異常 zipPartitions:將多個RDD按照partition組合成新的RDD,該函式需要組合的RDD具有相同的分割槽數,但對於每個分割槽內的元素數量沒有要求(三個引數,大致三類實現) 9. zipWithIndex:將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵值對 zipWithUniqueId:將RDD中的元素和一個唯一ID組合成鍵值對(該唯一ID生成演算法: 每個分割槽中第一個元素的唯一ID值為:該分割槽索引號, 每個分割槽中第N個元素的唯一ID值為:(前一個元素的唯一ID值) + (該RDD總的分割槽數)) 10. randomSplit:根據weights權重,將一個RDD切分成多個RDD,權重引數是一個Double陣列 11. glom:將RDD中每一個分割槽中型別為T的元素轉換成Array[T],這樣每個分割槽就只有一個數組元素 12. coalesce:用於將RDD重新分割槽,使用HashPartitioner,第一個為重新分割槽的數目,第二個為時候進行shuffle,預設是false repartition:該函式是coalesce函式的第二個引數為true的實現 13. combineByKey:用於將RDD[K,V]轉換成RDD[K,C],V和C的型別可相同可不同 第一個引數x:原封不動的取出來,第二個引數是函式:區域性運算,第三個引數是函式:對區域性運算後的結果在做運算 每個分割槽中每個key的value中的第一個值:(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相當於hello的第一個1, good中的1 引數(createCombiner:組合器函式,用於將V型別轉換成C型別,輸入引數為RDD[K,V]中的V,輸出為C mergeValue:合併值函式,將一個C型別和一個V型別值合併成一個C型別,輸入引數為(C,V),輸出為C mergeCombiners:合併組合器函式,用於將兩個C型別值合併成一個C型別,輸入引數為(C,C),輸出為C numPartitions:結果RDD分割槽數,預設保持原有的分割槽數 partitioner:分割槽函式,預設為HashPartitioner mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,預設為true) foldByKey:該函式用於RDD[K,V]根據K將V做摺疊、合併處理,其中的引數zeroValue表示先根據對映函式將zeroValue應用於V, 進行初始化V,再將對映函式應用於初始化後的V. aggregateByKey:aggregate針對於序列操作,aggregateByKey針對於K,V操作,可以先進行區域性操作,在進行全域性操作 從原始碼可看出,aggregateByKey呼叫的就是combinByKey方法 14. partitionBy:根據partitioner函式生成新的ShuffleRDD,將原RDD重新分割槽 15. mapValues:同map,只不過mapValues針對的是[K,V]中的V進行map操作 flatMapValues:同flatMap,只不過flatMapValues是針對[K,V]中的V值進行flatMap操作,根據V去扁平化 16. foreachRDD:sparkStreaming中的轉換運算元,處理每一個時間段內的RDD資料。 Action: 1. sortBy:排序,有shuffle,預設是true升序,可以按照k或者v進行排序 sortByKey:排序,有shuffle,預設是true升序,按照k進行排序 2. aggregate:使用者聚合RDD中的元素,先使用seqOp將RDD中每個分割槽的T型別元素聚合成U型別,再使用combOp 將之前每個分割槽聚合後的U型別聚合成U型別,特別注意seqOp和combOp都會使用zeroValue的值,zeroValue的型別為U fold:是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函式Op 3. lookup:用於(K,V)型別的RDD,指定K值,返回RDD中該K對應的所有V值 4. collectAsMap:List集合轉成Map集合(List(("a", 1), ("b", 2))=>Map(b -> 2, a -> 1)) 5. countByKey:計算key的數量 countByValue:計算value的數量 6. froeach:foreach也是對每個partition中的iterator時行迭代處理,通過使用者傳入的function(即函式f)對iterator進行內容的處理, 而不同的是,函式f中的引數傳入的不再是一個迭代器,而是每次的foreach得到的一個rdd的kv例項,也就是具體的資料 foreachPartition:是對每個partition中的iterator時行迭代的處理.通過使用者傳入的function(即函式f)對iterator進行內容的處理, 原始碼中函式f傳入的引數是一個迭代器,也就是說在foreachPartiton中函式處理的是分割槽迭代器,而非具體的資料,不會生成一個新的RDD 7. keys:RDD[K,V],列印key values:RDD[K,V],列印value 8. keyBy:以傳入的引數作為key(以單詞的第一個字母:rdd19.keyBy(_ (0)).collect() => ArrayBuffer((d,dog), (s,salmon)) 9. count:返回RDD中的元素數量。 reduce:根據對映函式f,對RDD中的元素進行二元計算,返回計算結果。 collect:用於將一個RDD轉換成陣列。 10. first:返回RDD中的第一個元素,不排序。 take:用於獲取RDD中從0到num-1下標的元素,不排序。 top:用於從RDD中,按照預設(降序)或者指定的排序規則,返回前num個元素。 takeOrdered:和top類似,只不過以和top相反的順序返回元素。 11. saveAsTextFile:用於將RDD以文字檔案的格式儲存到檔案系統中,只會儲存在Executor所在機器的本地目錄。 saveAsSequenceFile:用於將RDD以SequenceFile的檔案格式儲存到HDFS上. saveAsObjectFile:用於將RDD中的元素序列化成物件,儲存到檔案中。對於HDFS,預設採用SequenceFile儲存。 12. saveAsHadoopFile:將RDD儲存在HDFS上的檔案中,支援老版本Hadoop API,可以指定outputKeyClass、outputValueClass以及壓縮格式。 saveAsHadoopDataset:用於將RDD儲存到除了HDFS的其他儲存中,比如HBase。 (在JobConf中,通常需要關注或者設定五個引數:檔案的儲存路徑、key值的class型別、value值的class型別、RDD的輸出格式(OutputFormat)、以及壓縮相關的引數。) 13. saveAsNewAPIHadoopFile:用於將RDD資料儲存到HDFS上,使用新版本Hadoop API。 saveAsNewAPIHadoopDataset:採用新版本Hadoop API。