1. 程式人生 > >Spark Transformation和Action運算元速查表

Spark Transformation和Action運算元速查表

Transformation運算元

Transformation運算元 作用
map(func) 返回一個新的分散式資料集,其中每個元素都是由源RDD中每一個元素經過func函式轉換得到的
filter(func) 返回一個新的資料集,其中包含的元素來自源RDD中元素經過func函式過濾後的結果(func函式返回true的結果)
flatMap(func) 類似於map, 但是每個元素可以對映到0到n個輸出元素(func函式必須返回的是一個Seq而不是單個元素)
mapPartitions(func) 類似於map, 但是它是基於RDD的每個Partition(或者資料block)獨立執行,所以如果RDD包含元素型別為T,則func函式必須是Iterator => Iterator 的對映函式
mapPartitionsWithIndex(func) 類似於mapPartitions,只是func多了一個整型的分割槽索引值,因此如果RDD包含元素型別為T,則func必須是Iterator => Iterator的對映函式
sample(withReplacement, fraction, seed) 取樣部分(比例取決於fraction)資料,同時可以指定是否使用回置取樣(withReplacement),以及隨機數種子(seed)
union(otherDataset) 返回源資料集和引數資料集(otherDataset)的並集
intersection(otherDataset) 返回源資料集和引數資料集(otherDataset)的交集
distinct([numTasks]) 返回對源資料集做元素去重後的新的資料集
groupByKey([numTasks]) 必須應用於鍵值對的元素型別,如源RDD包含(K,V)對,則該運算元返回一個新的資料集包含(K, Iterator)對。 注意:如果你需要按Key分組聚合的話(如sum或average),推薦使用reduceByKey或者aggregateByKey以獲得更好的效能。* 注意*:預設情況下,輸出計算的並行度取決於源RDD的分割槽個數。當然,你也可以通過設定可選引數numTasks來指定並行任務的個數
reduceByKey(func, [numTasks]) 如果源RDD包含元素型別為(K,V)對,則該運算元也返回包含(K, V)對的RDD, 只不過每個Key對應的Value是經過func函式聚合後的結果,而func函式本身是一個(V, V) => V的對映函式。 另外,和groupByKey類似,可以通過可選引數numTasks指定reduce任務的個數
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 如果源RDD包含(K, V)對,則返回的新RDD包含(K, V)對,其中每個Key對應的Value都是由combOp函式和一個”0”值zeroValue聚合得到。允許聚合後Value型別和輸入Value型別不同,避免了不必要的開銷。和groupByKey類似,可以通過可選引數numTasks指定reducer任務的個數
sortByKey([ascending], [numTasks]) 如果源RDD包含元素型別(K, V)對,其中K可以排序,則返回新的RDD包含(K, V)對,並按照K進行排序(由ascending引數決定是升序還是降序)
join(otherDataset, [numTasks]) 如果源RDD包含元素型別(K, V)且引數RDD(otherDataset)包含元素型別(K, W), 則返回的新RDD中將包含內聯後Key對應的(K, (V, W)對。外關聯(Outer joins)操作請參考leftOuterJoin、rightOuterJoin以及fullOuterJoin運算元)
cogroup(otherDataset, [numTasks]) 如果源RDD包含元素型別(K, V)且引數RDD(otherDataset)包含元素型別(K, W),則返回的新的RDD中包含(K, (Iterable, Iterable))。該運算元還有個別名:groupWith
cartesian(otherDataset) 如果源RDD包含元素型別T且引數RDD(otherDataset)包含元素型別U,則返回的新RDD包含前二者的笛卡爾積,其元素型別為(T, U)對
pipe(command, [envVars]) 以shell命令列管道處理RDD的每個分割槽,如:Perl或者bash指令碼。RDD中每個元素都將依次寫入程序的標準輸入(stdin),然後按行輸出到標準輸出(stdout),每一行輸出字串即成為一個新的RDD元素
coalesce(numPartitions) 將RDD的分割槽數減少到numPartitons。當以後大資料集被過濾成小資料集後,減少分割槽,可以提升效率
repartition(numPartitions) 將RDD資料重新混洗(reshuffle)並隨機分步到新的分割槽中,使資料分佈更均衡,新的分割槽個數取決於numPartitions。該運算元總是需要通過網路混洗所有資料。
repartitionAndSortWithPartitions(partitioner) 根據Partitioner(spark自帶有HashPartioner和RangePartitoner等)重新分割槽RDD,並且在每個結果分割槽中按Key做排序。這是一個組合運算元,功能上等價於先reparation再在每個分割槽內排序,但這個運算元內部做了優化(將排序過程下推到混洗同時進行),因此效能更好

Action運算元

Action運算元 作用
reduce(func) 將RDD中元素按func函式進行聚合,func函式是一個(T, T) => T 的對映函式,其中T為源RDD的元素型別,並且func需要滿足交換律和結合律以便支援平行計算
collect() 將資料集中所有元素以陣列形式返回驅動器(driver)程式。通常用於在RDD進行了filter或其他過濾後,將足夠小的資料子集返回到驅動器記憶體中,否則會OOM
count() 返回資料集中元素個數
first() 返回資料中首個元素(類似於take(1))
take(n) 返回資料集中前n個元素
takeSample(withReplacement, num, [seed]) 返回資料集的隨機取樣子集,最多包含num個元素,withReplacement表示是否使用回置取樣,最後一個引數為可選引數seed,隨機數生成器的種子
takeOrdered(n, [ordering]) 按元素排序(可以通過ordering自定義排序規則)後,返回前n個元素
saveAsTextFile(path) 將資料集中元素儲存到指定目錄下的文字檔案中(或者多個文字檔案),支援本地檔案系統、HDFS或者其他Hadoop支援的檔案系統。儲存過程中,Spark會呼叫每個元素的toString方法,將結果儲存成檔案中的一行。
saveAsSequenceFile(path) 將資料集中元素儲存到指定目錄下的Hadoop Sequence檔案中,支援本地檔案系統、HDFS或者其他任何Hadoop支援的檔案系統。適用於實現了Writeable介面的鍵值對RDD。在Scala中,同樣也適用於能夠被隱式轉換為Writeable的型別
saveAsObjectFile(path) 將RDD元素以Java序列化的格式儲存成檔案,儲存結果的檔案可以使用SparkContext.objectFile來讀取。
countByKey() 只能適用於包含鍵值對(K, V)的RDD,並返回一個雜湊表,包含(K, Int)對,表示每個Key的個數。
foreach(func) 在RDD的每個元素上執行func函式。通常被用於累加操作,如:更新一個累加器或者和外部儲存系統互操作。注意:用foreach操作除了累加器之外的變數可能導致未定義的行為,更詳細請參考“理解閉包”