1. 程式人生 > >spark RDD collect()

spark RDD collect()

  1. collect的作用
    Spark內有collect方法,是Action操作裡邊的一個運算元,這個方法可以將RDD型別的資料轉化為陣列,同時會從遠端叢集是拉取資料到driver端。
  2. 已知的弊端
    首先,collect是Action裡邊的,根據RDD的惰性機制,真正的計算髮生在RDD的Action操作。那麼,一次collect就會導致一次Shuffle,而一次Shuffle排程一次stage,然而一次stage包含很多個已分解的任務碎片Task。這麼一來,會導致程式執行時間大大增加,屬於比較耗時的操作,即使是在local模式下也同樣耗時。
    其次,從環境上來講,本機local模式下執行並無太大區別,可若放在分散式環境下執行,一次collect操作會將分散式各個節點上的資料匯聚到一個driver節點上,而這麼一來,後續所執行的運算和操作就會脫離這個分散式環境而相當於單機環境下執行,這也與Spark的分散式理念不合。
    最後,將大量資料彙集到一個driver節點上,並且像這樣val arr = data.collect()
    ,將資料用陣列存放,佔用了jvm堆記憶體,可想而知,是有多麼輕鬆就會記憶體溢位。
  3. 如何規避
    若需要遍歷RDD中元素,大可不必使用collect,可以使用foreach語句;
    若需要列印RDD中元素,可用take語句,返回資料集前n個元素,data.take(1000).foreach(println),這點官方文件裡有說明;
    若需要檢視其中內容,可用saveAsTextFile方法。
    總之,單機環境下使用collect問題並不大,但分散式環境下儘量規避,如有其他需要,手動編寫程式碼實現相應功能就好。

  4. 補充:
    collectPartitions:同樣屬於Action的一種操作,同樣也會將資料彙集到Driver節點上,與collect區別並不是很大,唯一的區別是:collectPartitions產生資料型別不同於collect,collect是將所有RDD彙集到一個數組裡,而collectPartitions是將各個分割槽內所有元素儲存到一個數組裡,再將這些陣列彙集到driver端產生一個數組;collect產生一維陣列,而collectPartitions產生二維陣列。