1. 程式人生 > >spark(二)優化思路

spark(二)優化思路

hash 分區 location lock 自己 其他 jvm的內存 磁盤 包括

優化思路

內存優化

內存優化大概分為三個方向

1.所有對象的總內存(包括數據和java對象)

2.訪問這些對象的開銷

3.垃圾回收的開銷

其中Java的原生對象往往都能被很快的訪問,但是會多占據2-5倍或更多的內存,有下面4點原因

·每個單獨的java對象都有一個對象頭(16字節),其中包括指向對象的指針(棧->堆),如果該對象只有幾個屬性,那麽對象頭可能比實際數據占用的空間都大(嚴重浪費資源)

·java每個string都包含了40字節的額外開銷(因為底層其實是存儲在數組,需要記錄數組的指針,長度等信息),每個字符包含2字節(UTF-16編碼)。例如一個10字符的string,實際占用內存空間60字節

·常見的集合類,例如linkedlist,hashmap都有用到鏈表,其中的對象頭,元素指針都會占據額外的空間

·基礎類型的包裝,例如Integer

明確內存消耗

一般情況,可以把數據轉成rdd,然後通過spark自帶的UI的Storage頁面來觀察rdd占用內存大小。

其它特殊的對象,可以用spark自帶的工具類SizeEstimator來評估內存大小,包括對廣播數據的內存占用評估

優化數據結構

避免使用需要額外開銷的java原生的數據結構,比如鏈表,hashmap,包裝類。下面是常見的方法

·盡量使用數組結構和基礎類型,

·在嵌套的數據結構中,盡量避免小對象和指針

·考慮用數字或者枚舉來代替string作為key

·如果內存少於32GB,可以優化JVM -XX:+UseCompressedOops ,OOP = “ordinary object pointer” 普通對象指針。可以讓指針由8->4字節,壓縮的一般是對象的相關指針(不是用來壓縮數據的)。

一般建議場景是在分配給JVM的內存大小在[4G,32G], 如果小於4G,那麽JVM會使用低虛擬地址空間(low virutal address space,64位下模擬32位),這樣就不需要做壓解壓動作了。而對於大於32G,將采用默認的隨機地址分配特性,進行壓解壓。

數據序列化

選擇合適的序列化協議,一般而言用Kryo,比java原生序列化快很多

數據存儲

RDD persistence API

通過把數據序列化至內存,或者磁盤,或者其他策略

GC優化

所有給spark的內存資源,有一部分是用於cache RDD的,剩下的用於jvm的堆和棧等使用。

默認的比例是cache RDD占總內存的60%,可以通過spark.storage.memoryFraction來更改。

一般情況下,官方文檔建議這個比值不要超過JVM Old Gen區域的比值。這也很容易理解,因為RDD Cache數據通常都是長期駐留內存的,理論上也就是說最終會被轉移到Old Gen區域(如果該RDD還沒有被刪除的話),如果這部分數據允許的尺寸太大,勢必把Old Gen區域占滿,造成頻繁的FULL GC,這種情況就可以調小該值。

·確認資源是否給足driver的cpu和memory,executor的cpu和memory

·當出現過多的full GC時候,可以減小RDD cache的內存空間

·當出現過多的minor GC時候,可以增加JVM中Eden區的大小,通過4/3的比例增加

·其它常規JVM優化方法,線程棧的內存大小,永久代的堆內存大小等

分區數優化

分區數多就是task多,整個任務的並發度就高,但也不是越多越好,假設你有100條數據,有50個分區,平均一個分區就處理兩條數據,這樣就造成了嚴重的浪費,更多的時間浪費在分區間的shuffle,和driver的聚合。

下面是幾個優化建議

·每個cpu core上跑2~3個tasks

·當task上的數據大於20KB的時候,可以考慮

·在當前的分區數的1.5倍來進行調優

關於分區

除了顯示的聲明rdd或者dataframe的分區數外,還有兩種控制分區數的配置,

1.spark.sql.shuffle.partitions

針對dataframe和一些sql操作的分區數

默認的分區數為父RDD的最大分區數

2.spark.default.parallelism

針對rdd的默認分區數

一般分區數取決於executor的core數量,因為partition越多task越多,而task是spark的最小處理單元。executor的core數量不夠,task再多也只能排隊,反而慢了。

註:默認的shuffle後分區數為200

共享變量

廣播

當遇到全局性的數據需要使用時,可以采用廣播的方式

廣播變量的優勢:是因為不是每個task一份變量副本,而是變成每個節點的executor才一份副本。這樣的話,就可以讓變量產生的副本大大減少。

廣播變量,初始的時候,就在Drvier上有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變量副本;如果本地沒有,BlockManager,也許會從遠程的Driver上面去獲取變量副本;也有可能從距離比較近的其他節點的Executor的BlockManager上去獲取,並保存在本地的BlockManager中;BlockManager負責管理某個Executor對應的內存和磁盤上的數據,此後這個executor上的task,都會直接使用本地的BlockManager中的副本。

例如,50個executor,1000個task。一個map,10M。默認情況下,1000個task,1000份副本。10G的數據,網絡傳輸,在集群中,耗費10G的內存資源。如果使用了廣播變量。50個execurtor,50個副本。500M的數據,網絡傳輸,而且不一定都是從Driver傳輸到每個節點,還可能是就近從最近的節點的executor的bockmanager上拉取變量副本,網絡傳輸速度大大增加;500M,大大降低了內存消耗。

累加器

全局的累加器,可以用於統計全局性的數據

數據本地化

數據本地化是一個影響spark jobs性能的主要方面。其實運行分為兩塊,一塊數據,一塊代碼,最好的情況就是數據不動(數據量太大),代碼會部署在各個executor上。

可以通過調節spark.locality相關配置來決定任務的運行選擇。

接口優化

1.reduceBy和groupBy

同理,reduceByKey,aggregateByKey,groupByKey等

優先使用reduceBy

reduceBy會優先合並本地的rdd,這樣就大大的減少了shuffle的數據量

2.coalesce和repartition

看源碼可知repartition是采用shuffle的coalesce。從性能上來講,coalesce是本地合並,也就是同一個executor合並,這樣可以減少網絡傳輸帶來的性能損失,並且是窄依賴,數據恢復也方便。而reparation直接采用shuffle的方式合並。優先使用coalesce。但是在大量分區需要合並的時候,要考慮一下策略。比如,現在一共有1000個分區,需要合並成10個分區。

如果直接采用coalesce(10),可能導致合並的速度並不快(原因未知),而采用reparation(10)並發度會多很多。最終性能還是repartition好一點。

應用場景,設原rdd分區大小為M,現rdd分區大小為N
M>N,並且差10倍以內,考慮用coalesce
M>N,並且差10倍以上,考慮用repartition
M<N,考慮用repartition

註:也可以采用混合使用,先coalesce,把分區數降下來,然後采用repartition,當然這就需要實際測試,觀察哪種性能更加

動態分配資源

Spark on yarn支持一種特殊的資源分配機制

從spark1.2開始就提供該機制。你的application在運行過程中會返回給資源池你所擁有的資源(比如你問yarn要了2G,跑玩數據預處理後,接下來的計算只需要1G,那剩下的1G就先還給yarn)

可以通過下面配置開啟

spark.dynamicAllocation.enabled=true

參考資料

//官方配置文檔

http://spark.apache.org/docs/1.5.0/configuration.html

//spark官方提供的思路

https://spark.apache.org/docs/1.5.0/tuning.html

//cloudera提供的思路

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

//IBM提供的相關資料

https://www.ibm.com/support/knowledgecenter/en/SSZU2E_2.2.0/performance_tuning/application_overview.html

spark(二)優化思路