關於Spark實際操作的一些實用乾貨(持續更新中.....)
1.有關local模式
曾經在國外網站上看到一篇帖子,覺著很受用,拿來分享。
說Spark的local模式,就是指在本機上執行的本機模式,所以,有關於你設定的executor.memory以及driver.memory的,並不會影響Spark本身記憶體設定問題。經本人測試之後發現,local模式下設定這兩個引數完全沒有作用。
2.spark.memory.fraction與spark.storage.memoryFraction
記憶體佔比,用於對Spark效能進行調優,官方文件上就有介紹:
spark.memory.fraction
expresses the size ofM
spark.memory.storageFraction
expresses the size ofR
as a fraction ofM
R
is the storage space withinM
where cached blocks immune to being evicted by execution.
spark.memory.fraction(暫稱之為M)表示:片段M的大小是(JVM堆空間- 300MB)(預設為0.6)的一部分。其餘的空間(25%)保留給使用者資料結構、Spark中的內部元資料以及在稀疏和異常大的情況下防止OOM錯誤。
spark.storage.memoryFraction(暫稱之為M.R):表示R的大小為M的一部分(預設為0.5)。R是M中的儲存空間,其中快取的塊不會被執行逐出
這也就是為什麼明明設定記憶體大小為比如20G(-Xmx即最大允許堆記憶體決定了記憶體大小),而Spark會報出一下資訊:
18/06/14 16:01:42 INFO MemoryStore: MemoryStore started with capacity 10.5 GB
3.-Xms -Xmx及-XX:PermSize -XX:MaxPermSize引數設定
首先,引數建議設定為:
-Xms 實體記憶體的1/64
-Xmx 實體記憶體的1/4
-XX:PermSize 實體記憶體的1/64
-XX:MaxPermSize 實體記憶體的1/4
其次是Java的GC(Garbage Collection),所設定初始堆記憶體大小-Xms,當記憶體使用達到這個值的時候,GC才開始回收垃圾,意味著如果將-Xms與-Xmx設定大小相同的話,只有當記憶體耗盡時才會進行GC回收。
4.RDD通過collect轉陣列
RDD型別的資料可以通過collect方法轉化為陣列,但由於collect方法屬於對RDD操作中的Action操作,根據RDD的惰性機制可知,多一次Action操作就會多一次shuffle,而一次shuffle操作的結果一次排程stage的結果,然而一次stage又包含許多個Task,最終會導致程式執行耗時增加。
另外,RDD通過collect,將原有分佈在各個executor程序的資料全部彙集到一個driver節點上,這個操作的實質是把分散式的環境拉出來做單機處理了,所以後續對collect而來的資料來說,已經脫離了Spark的分散式環境。
再者,將所有資料聚集到一個driver節點,容易產生記憶體溢位是可想而知的。
5.RDD與JavaRDD
本人曾在Java環境下用RDD做map等操作,結果發現Java環境下使用map與Scala環境下使用map並不很一樣。比如:
data.map(it => {it +=1})),而Java中做同樣的map則需要data.map(Function<T, U> f, ClassTag<U> evidence$3);
本以為是Scala中scala.reflect.ClassTag類相關部分自己設定出了問題,找了半天Scala官方api也沒搞明白問題出在哪,後來嘗試用JavaRDD進行同樣的操作,問題解決。
具體方法就是匯入org.apache.spark.api.java.JavaRDD,並呼叫toJavaRDD方法data.toJavaRDD();然後進行map等操作即可。