1. 程式人生 > >Spark RDD collect與collectPartitions

Spark RDD collect與collectPartitions

確切的應該說是RDD collect^_^

1.collect的作用    

    Spark內有collect方法,是Action操作裡邊的一個運算元,這個方法可以將RDD型別的資料轉化為陣列,你可以隨時val arr = data.collect(),將RDD型別資料轉化為陣列來存放並參與後續運算。

2.已知的弊端

    首先,從時間上來講,前邊已經說過了,collect是Action裡邊的,根據RDD的惰性機制,真正的計算髮生在RDD的Action操作。由於collect是從各節點將資料拉到driver端,需要重新分割槽,所以,一次collect就會導致一次Shuffle,而一次Shuffle排程一次stage,然而一次stage包含很多個已分解的任務碎片Task。這麼一來,會導致程式執行時間大大增加,屬於比較耗時的操作,即使是在local模式下也同樣耗時。

    其次,從環境上來講,本機local模式下執行並無太大區別,可若放在分散式環境下執行,一次collect操作會將分散式各個節點上的資料匯聚到一個driver節點上,而這麼一來,後續所執行的運算和操作就會脫離這個分散式環境而相當於單機環境下執行,這也與Spark的分散式理念不合。

    最後,將大量資料彙集到一個driver節點上,並且像這樣val arr = data.collect(),將資料用陣列存放,佔用了jvm堆記憶體,可想而知,是有多麼輕鬆就會記憶體溢位。

3.如何規避 

    若需要遍歷RDD中元素,大可不必使用collect,可以使用foreach語句;

    若需要列印RDD中元素,可用take語句,data.take(1000).foreach(println),這點官方文件裡有說明;

    若需要檢視其中內容,可用saveAsTextFile方法。

總之,單機環境下使用collect問題並不大,但分散式環境下儘量規避,如有其他需要,手動編寫程式碼實現相應功能就好。

補充:

    collectPartitions:同樣屬於Action的一種操作,同樣也會將資料彙集到Driver節點上,與collect區別並不是很大,唯一的區別是:collectPartitions產生資料型別不同於collect,collect是將所有RDD彙集到一個數組裡,而collectPartitions是將各個分割槽內所有元素儲存到一個數組裡,再將這些陣列彙集到driver端產生一個數組;collect產生一維陣列,而collectPartitions產生二維陣列。

例:

   RDD型別data,資料型別為[labeledPoint],labeledPoint為(label,features)

   那麼 val collectArr = data.collect();(collectArr內陣列元素為labeledPoint[label,features]) 

   而val collectPArr= data.collectPartitions();(collectPArr內陣列元素為Array[label,features],即為二維陣列)