1. 程式人生 > >關於Spark實際操作的一些實用乾貨(持續更新中.....)

關於Spark實際操作的一些實用乾貨(持續更新中.....)

1.有關local模式

    曾經在國外網站上看到一篇帖子,覺著很受用,拿來分享。

    說Spark的local模式,就是指在本機上執行的本機模式,所以,有關於你設定的executor.memory以及driver.memory的,並不會影響Spark本身記憶體設定問題。經本人測試之後發現,local模式下設定這兩個引數完全沒有作用。

2.spark.memory.fraction與spark.storage.memoryFraction

    記憶體佔比,用於對Spark效能進行調優,官方文件上就有介紹:

  • spark.memory.fraction expresses the size of M
     as a fraction of the (JVM heap space - 300MB) (default 0.6). The rest of the space (25%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.
  • spark.memory.storageFraction expresses the size of R as a fraction of M
     (default 0.5). R is the storage space within M where cached blocks immune to being evicted by execution.

    spark.memory.fraction(暫稱之為M)表示:片段M的大小是(JVM堆空間- 300MB)(預設為0.6)的一部分。其餘的空間(25%)保留給使用者資料結構、Spark中的內部元資料以及在稀疏和異常大的情況下防止OOM錯誤。

    spark.storage.memoryFraction(暫稱之為M.R):表示R的大小為M的一部分(預設為0.5)。R是M中的儲存空間,其中快取的塊不會被執行逐出

    這也就是為什麼明明設定記憶體大小為比如20G(-Xmx即最大允許堆記憶體決定了記憶體大小),而Spark會報出一下資訊:

    18/06/14 16:01:42 INFO MemoryStore: MemoryStore started with capacity 10.5 GB

3.-Xms -Xmx及-XX:PermSize -XX:MaxPermSize引數設定

首先,引數建議設定為:

        -Xms    實體記憶體的1/64

        -Xmx    實體記憶體的1/4

        -XX:PermSize    實體記憶體的1/64

        -XX:MaxPermSize    實體記憶體的1/4

    其次是Java的GC(Garbage Collection),所設定初始堆記憶體大小-Xms,當記憶體使用達到這個值的時候,GC才開始回收垃圾,意味著如果將-Xms與-Xmx設定大小相同的話,只有當記憶體耗盡時才會進行GC回收。

4.RDD通過collect轉陣列

RDD型別的資料可以通過collect方法轉化為陣列,但由於collect方法屬於對RDD操作中的Action操作,根據RDD的惰性機制可知,多一次Action操作就會多一次shuffle,而一次shuffle操作的結果一次排程stage的結果,然而一次stage又包含許多個Task,最終會導致程式執行耗時增加。

    另外,RDD通過collect,將原有分佈在各個executor程序的資料全部彙集到一個driver節點上,這個操作的實質是把分散式的環境拉出來做單機處理了,所以後續對collect而來的資料來說,已經脫離了Spark的分散式環境。

    再者,將所有資料聚集到一個driver節點,容易產生記憶體溢位是可想而知的。

5.RDD與JavaRDD

本人曾在Java環境下用RDD做map等操作,結果發現Java環境下使用map與Scala環境下使用map並不很一樣。比如:

data.map(it => {it +=1})),而Java中做同樣的map則需要data.map(Function<T, U> f, ClassTag<U> evidence$3);

本以為是Scala中scala.reflect.ClassTag類相關部分自己設定出了問題,找了半天Scala官方api也沒搞明白問題出在哪,後來嘗試用JavaRDD進行同樣的操作,問題解決。

具體方法就是匯入org.apache.spark.api.java.JavaRDD,並呼叫toJavaRDD方法data.toJavaRDD();然後進行map等操作即可。