1. 程式人生 > >零基礎入門大資料探勘之spark中的幾種map

零基礎入門大資料探勘之spark中的幾種map

今天再來說一下spark裡面的幾種map方法。前面的文章介紹過單純的map,但是spark還有幾種map值得對比一下,主要是下面幾種:

  • map:普通的map
  • flatMap:在普通map的基礎上多了一個操作,扁平化操作;
  • mapPartitions:相對於分割槽Partition而言的,即對每個分割槽分別進行一次性的map。
  • mapValues(function) :適合key-value對的map操作。
  • flatMapValues(function) :同樣適合key-value對的map操作,與mapValues略有不同。

這幾類都是實現輸入輸出一對一型的,即輸入一個東西,輸出一個東西。

(1)普通的map比較簡單,假設一個文字:

hello world
a new line
hello
...
the end

如果我想對每一行進行空格分割,返回的依然是個矩陣的話,就是一般的map方法:

val arr=sc.parallelize(Array("hello world","a new line","the end"))
val res = arr.map(x=>x.split(" "))

得到的結果就會是這樣的:

[[hello],[world] ;
[a], [new], [line] ;
[the], [end]]

也就是每一行獨立進行執行分割,得到的結果也相互獨立;

(2)flatMap,在普通map後面再進行壓平,所謂壓平,就是把每一行得到的結果進行串聯成一行。比如上面map後本來有三行結果的,每行結果裡面又有分割的結果,flatMap後就只有一行。

val arr=sc.parallelize(Array("hello world","a new line","the end"))
val res = arr.flatMap(x=>x.split(" "))

得到的結果就是下面這樣:

[[hello],[world], [a], [new], [line], [the], [end]]

這就是壓平。

(3)mapPartitions,是一種分割槽進行的map操作。

理解這個之前,先理解分割槽Partitions的概念。所謂分割槽,就是分散式儲存的,整個資料不是存在一臺機器上,而是分散存在好幾臺機器上的不同區域,就是分割槽。而mapPartitions就是對每個分割槽單獨處理,每個分割槽中又是進行統一的讀入記憶體,然後一次性處理的。

來看一下map是怎麼操作的,map雖然也是分散式處理的,但是是一條條讀入記憶體,處理完以後出記憶體存結果。而mapPartitions是一個分割槽中一次性讀入記憶體,然後一起一個map處理,節省了很多單次讀記憶體的操作,相對來說速度更快。

當資料量很大,又有很多分割槽的時候,mapPartitions的速度是更快的,但是有一個前提是每一個小分割槽的量不能太大,必須保證小分割槽可以一次性全部讀入自己機器的記憶體,如果不滿足這個條件,mapPartitions將報錯。而對比來說map就沒有這個問題,反正map是一條一條的進行分散式處理的。

(4)mapValues(function) :該函式只適用於key-value對形式的資料。對裡面每一對資料,保持key不變,對value進行function函式處理,得到的結果與key形成新的key-value對。所以該函式返回也是key-value對。比如:

>>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
>>> def f(x): return len(x)
>>> x.mapValues(f).collect()
    [('a', 3), ('b', 1)]

(5)flatMapValues(function):flatMapValues類似於mapValues,不同的在於flatMapValues應用於元素為key-value對的RDD中Value,每個一元素的Value被輸入函式f對映為一系列的值,然後這些值再與原RDD中的Key組成一系列新的key-value對。所以與mapValues最大的區別是,mapValues每一對輸出是一對,而flatMapValues每一對輸出可不止一對,可以是很多對。

>>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
    >>> def f(x): return x
    >>> x.flatMapValues(f).collect()
    [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

關注公號【AInewworld】,第一時間獲取精彩內容
在這裡插入圖片描述