1. 程式人生 > >spark 常用函數介紹(python)

spark 常用函數介紹(python)

put ons value result 組成 hat 是把 mbo flat

原文引自:https://www.cnblogs.com/yxpblog/p/5269314.html

在開始之前,我先介紹一下,RDD是什麽?

RDD是Spark中的抽象數據結構類型,任何數據在Spark中都被表示為RDD。從編程的角度來看,RDD可以簡單看成是一個數組。和普通數組的區別是,RDD中的數據是分區存儲的,這樣不同分區的數據就可以分布在不同的機器上,同時可以被並行處理。因此,Spark應用程序所做的無非是把需要處理的數據轉換為RDD,然後對RDD進行一系列的變換和操作從而得到結果。

  創建RDD:

1 >>> sc.parallelize([
1,2,3,4,5], 3) #意思是將數組中的元素轉換為RDD,並且存儲在3個分區上[1]、[2,3]、[4,5]。如果是4個分區:[1]、[2]、[3]、[4,5]

  上面這種是數組創建,也可以從文件系統或者HDFS中的文件創建出來,後面會講到。

只要搞懂了spark的函數們,你就成功了一大半。

spark的函數主要分兩類,TransformationsActions。Transformations為一些數據轉換類函數,actions為一些行動類函數:

轉換:轉換的返回值是一個新的RDD集合,而不是單個值。調用一個變換方法,不會有任何求值計算,它只獲取一個RDD作為參數,然後返回一個新的RDD。

行動:行動操作計算並返回一個新的值。當在一個RDD對象上調用行動函數時,會在這一時刻計算全部的數據處理查詢並返回結果值。

下面介紹spark常用的Transformations, Actions函數:

Transformations

map(func [, preservesPartitioning=False]) --- 返回一個新的分布式數據集,這個數據集中的每個元素都是經過func函數處理過的。

1 2 3 >>> data = [1,2,3,4,5] >>> distData = sc.parallelize(data).map
(lambda x: x+1).collect() #結果:[2,3,4,5,6]

filter(func) --- 返回一個新的數據集,這個數據集中的元素是通過func函數篩選後返回為true的元素(簡單的說就是,對數據集中的每個元素進行篩選,如果符合條件則返回true,不符合返回false,最後將返回為true的元素組成新的數據集返回)。

1 2 >>> rdd = sc.parallelize(data).filter(lambda x:x%2==0).collect() #結果:[2, 4]

flatMap(func [, preservesPartitioning=False]) --- 類似於map(func), 但是不同的是map對每個元素處理完後返回與原數據集相同元素數量的數據集,而flatMap返回的元素數不一定和原數據集相同。each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item)

1 2 3 4 5 6 7 8 9 10 11 12 13 #### for flatMap() >>> rdd = sc.parallelize([2,3,4]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect()) #結果:[1, 1, 1, 2, 2, 3] >>> sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect()) #結果:[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] #### for map() >>> rdd = sc.parallelize([2,3,4]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect()) #結果:[[1], [1, 2], [1, 2, 3]] >>> sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect()) #結果:[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]

mapPartitions(func [, preservesPartitioning=False]) ---mapPartitions是map的一個變種。map的輸入函數是應用於RDD中每個元素,而mapPartitions的輸入函數是應用於每個分區,也就是把每個分區中的內容作為整體來處理的。

1 2 3 4 >>> rdd = sc.parallelize([1,2,3,4,5], 3) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() #結果:[1,5,9]

mapPartitionsWithIndex(func [, preservesPartitioning=False]) ---Similar to mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes.

1 2 3 4 >>> rdd = sc.parallelize([1,2,3,4,5], 3) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).collect() #結果:[0,1,2] #三個分區的索引

reduceByKey(func [, numPartitions=None, partitionFunc=<function portable_hash at 0x7fa664f3cb90>]) --- reduceByKey就是對元素為kv對的RDD中Key相同的元素的value進行reduce,因此,key相同的多個元素的值被reduce為一個值,然後與原RDD中的key組成一個新的kv對。

1 2 3 4 5 >>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) >>> #或者 sorted(rdd.reduceByKey(lambda a,b:a+b).collect()) #結果:[(‘a‘, 2), (‘b‘, 1)]

aggregateByKey(zeroValue)(seqOp, combOp [, numPartitions=None]) ---

sortByKey([ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fa665048c80>]) --- 返回排序後的數據集。該函數就是隊kv對的RDD數據進行排序,keyfunc是對key進行處理的函數,如非需要,不用管。

1 2 3 4 5 6 >>> tmp = [(‘a‘, 1), (‘b‘, 2), (‘1‘, 3), (‘D‘, 4)] >>> sc.parallelize(tmp).sortByKey(True, 1).collect() #結果: [(‘1‘, 3), (‘D‘, 4), (‘a‘, 1), (‘b‘, 2)] >>> sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect() #結果:[(‘1‘, 3), (‘a‘, 1), (‘b‘, 2), (‘D‘, 4)] #註意,比較兩個結果可看出,keyfunc對鍵的處理只是在數據處理的過程中起作用,不能真正的去改變鍵名

join(otherDataset [, numPartitions=None]) --- join就是對元素為kv對的RDD中key相同的value收集到一起組成(v1,v2),然後與原RDD中的key組合成一個新的kv對,返回。

1 2 3 4 >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2), ("a", 3)]) >>> sorted(x.join(y).collect()) #結果:[(‘a‘, (1, 2)), (‘a‘, (1, 3))]

cartesian(otherDataset) --- 返回一個笛卡爾積的數據集,這個數據集是通過計算兩個RDDs得到的。

1 2 3 4 >>> x = sc.parallelize([1,2,3]) >>> y = sc.parallelize([4,5]) >>> x.cartesian(y).collect() #結果:[(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]

Action (這裏只講支持python的,java和scala的後面用到了在做詳解,當然支持python就一定支持java和scala)

reduce(func) --- reduce將RDD中元素兩兩傳遞給輸入函數,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函數直到最後只有一個值為止。

1 2 3 >>> from operator import add >>> sc.parallelize([1,2,3,4,5]).reduce(add) # 結果:15

collect() --- 返回RDD中的數據,以list形式。

1 2 >>> sc.parallelize([1,2,3,4,5]).collect() #結果:[1,2,3,4,5]

count() --- 返回RDD中的元素個數。

1 2 >>> sc.parallelize([1,2,3,4,5]).count #結果:5

first() --- 返回RDD中的第一個元素。

1 2 >>> sc.parallelize([1,2,3,4,5]).first() #結果:1

take(n) --- 返回RDD中前n個元素。

1 2 >>> sc.parallelize([1,2,3,4,5]).take(2) #結果:[1,2]

takeOrdered(n [, key=None]) --- 返回RDD中前n個元素,但是是升序(默認)排列後的前n個元素,或者是通過key函數指定後的RDD(這個key我也沒理解透,後面在做詳解)

1 2 3 4 >>> sc.parallelize([9,7,3,2,6,4]).takeOrdered(3) #結果:[2,3,4] >>> sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x:-x) #結果:[9,7,6]

saveAsTextFile(path [, compressionCodecClass=None]) --- 該函數將RDD保存到文件系統裏面,並且將其轉換為文本行的文件中的每個元素調用 tostring 方法。

parameters: path - 保存於文件系統的路徑

       compressionCodecClass - (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”

1 2 3 4 5 6 7 >>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) >>> from fileinput import input >>> from glob import glob >>> ‘‘.join(sorted(input(glob(tempFile.name + "/part-0000*")))) ‘0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n‘

Empty lines are tolerated when saving to text files:

1 2 3 4 5 >>> tempFile2 = NamedTemporaryFile(delete=True) >>> tempFile2.close() >>> sc.parallelize([‘‘, ‘foo‘, ‘‘, ‘bar‘, ‘‘]).saveAsTextFile(tempFile2.name) >>> ‘‘.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) ‘\n\n\nbar\nfoo\n‘

Using compressionCodecClass:

1 2 3 4 5 6 7 8 >>> tempFile3 = NamedTemporaryFile(delete=True) >>> tempFile3.close() >>> codec = "org.apache.hadoop.io.compress.GzipCodec" >>> sc.parallelize([‘foo‘, ‘bar‘]).saveAsTextFile(tempFile3.name, codec) >>> from fileinput import input, hook_compressed >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)) >>> b‘‘.join(result).decode(‘utf-8‘) u‘bar\nfoo\n‘

countByKey() --- 返回一個字典(key,count),該函數操作數據集為kv形式的數據,用於統計RDD中擁有相同key的元素個數。

1 2 3 4 5 >>> defdict = sc.parallelize([("a",1), ("b",1), ("a", 1)]).countByKey() >>> defdict #結果:defaultdict(<type ‘int‘>, {‘a‘: 2, ‘b‘: 1}) >>> defdict.items() #結果:[(‘a‘, 2), (‘b‘, 1)]

countByValue() --- 返回一個字典(value,count),該函數操作一個list數據集,用於統計RDD中擁有相同value的元素個數。

1 2 >>> sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items() #結果:[(1, 2), (2, 4), (3, 3), (5, 1)]

foreach(func) --- 運行函數func來處理RDD中的每個元素,這個函數常被用來updating an Accumulator或者與外部存儲系統的交互。

1 2 3 >>> def f(x): print(x) >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) #note: 打印是隨機的,並不是一定按1,2,3,4,5的順序打印

spark 常用函數介紹(python)