【Big Data 每日一題】Spark開發效能調優總結
1. 分配資源調優
Spark效能調優的王道就是分配資源,即增加和分配更多的資源對效能速度的提升是顯而易見的,基本上,在一定範圍之內,增加資源與效能的提升是成正比的,當公司資源有限,能分配的資源達到頂峰之後,那麼才去考慮做其他的調優
如何分配及分配哪些資源
在生產環境中,提交spark作業時,使用spark-submit shell指令碼,裡面調整對應的引數
常用引數
/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/bin/spark-submit \ --class com.hypers.sparkproject.spark.session.UserVisitSessionAnalyzeSpark \ --num-executors 3 \ --配置executor的數量 --driver-memory 1024M \ --配置driver的記憶體,影響不大 --executor-memory 2G \ --配置每個executor的記憶體大小 --executor-cores 3 \ --Spark standalone and YARN only --配置每個executor的cpu核數 /usr/loacl/recommend-1.0-SNAPSHOT.jar \
分配多少
- 第一種:Spark Standalone即Spark執行在自己的分散式框架時,需要知道每臺機器能夠使用的記憶體,CPU核數,假如每臺機器能夠使用4G記憶體和2個CPU核數,一共20臺機器,那麼就可以executor數量設定20,每個executor記憶體設定4G,每個executor設定2 CPU core
- 第二種: Yarn 當Spark執行在yarn上時,需要檢視資源佇列有多少資源,假如資源佇列有500G記憶體,100個CPU core可用,那麼就可以設定50個executor,每個executor記憶體設定10G,每個executor設定2個CPU core
總之,就是儘量的去調節到最大的大小(executor的數量和executor的記憶體)
資源分配是如何影響效能的
當我們從客戶端提交一個Spark應用程式時,SparkContext,DAGScheduler,TaskScheduler會將程式中的運算元,切分成大量的task,提交到executor上面執行
- 增加executor數量和executor的CPU核數 : 增加了並行執行能力,加入原來20個executor,每個executor的CPU核數為2個,那麼能夠並行執行的task數量就是40個task,當在資源允許的情況下增加這兩個指標,執行速度將會成倍增加
- 增加executor的記憶體 : 增加記憶體後,對效能的提升主要有三點:
- 如果要對RDD進行cache,那麼更多的記憶體就可以快取更多的資料,將更少的資料寫入磁碟,甚至不寫入磁碟,減少了磁碟IO
- 對於shuffle操作,在reduce端,會需要記憶體來存放拉取的資料並進行聚合,如果記憶體不足,也會寫入磁碟,如果給executor分配更多的記憶體,同樣減少磁碟IO
- 對於task的執行,可能會建立很多物件,如果記憶體較小,可能會頻繁導致JVM堆記憶體滿了,然後頻繁GC,垃圾回收,minor GC和full GC,速度很慢
2. 並行度調優
何為並行度
並行度指Spark作業中,各個stage的task數量
為什麼要設定並行度 ?
假設,現在已經在spark-submit腳本里面,給spark作業分配了足夠多的資源,比如50個executor,每個executor有10G記憶體,每個executor有3個CPU核。基本已經達到了叢集或者yarn佇列的資源上限。但是task沒有設定,或者設定的很少,比如就設定了,100個task。50個executor,每個executor有3個cpu核,也就是說,你的Application任何一個stage執行的時候,都有總數在150個cpu核,可以並行執行。但是你現在,只有100個task,平均分配一下,每個executor分配到2個task,那麼同時在執行的task,只有100個,每個executor只會並行執行2個task。每個executor剩下的一個cpu核,就浪費掉了。所以你的資源雖然分配足夠了,但是問題是,並行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。
合理的設定並行度
理想情況下,task數量設定成Spark Application 的總CPU核數,但是現實中總有一些task執行慢一些task快,導致快的先執行完,空餘的cpu 核就浪費掉了,所以官方推薦task數量要設定成Spark Application的總cpu核數的2~3 倍
如何設定並行度
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "500")
3. 重構RDD架構及RDD持久化 序列化
- RDD架構重構與優化
預設情況下,多次對一個RDD執行運算元,去獲取不同的RDD,都會對這個RDD以及之前的父RDD全部重新計算一次,在實際專案中,一定要避免出現一個RDD重複計算的情況, 所以,要儘量去複用RDD,差不多的RDD可以抽取為一個共同的RDD,供後面的RDD計算時反覆使用 - 公共RDD實現持久化
對於要多次計算和使用的公共RDD,一定要進行持久化
持久化:也就是說,將RDD的資料快取在記憶體中/磁碟中,(BlockManager),之後無論對這個RDD做多少次計算,都是直接取這個RDD的持久化的資料 - 持久化資料序列化
如果正常將資料持久化在記憶體中,那麼可能會導致記憶體的佔用過大,會導致OOM,當純記憶體無法支撐公共RDD資料完全存放的時候,就需要優先考慮使用序列化的方式在純記憶體中儲存,將RDD的每個partition的資料,序列化成一個大的位元組陣列,就一個物件,序列化後,大大減少記憶體的空間佔用
序列化的唯一缺點就是在獲取資料的時候需要反序列化, 如果序列化後純記憶體的方式還導致OOM,就只能考慮記憶體+無序列化的普通方式 - 持久化+雙副本機制
為了資料的高可靠性,而且記憶體充足,可以使用雙副本機制進行持久化
持久化的雙副本機制,持久化後的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;持久化的每個資料單元,儲存一份副本,放在其他節點上面;從而進行容錯;一個副本丟了,不用重新計算,還可以使用另外一份副本。
4. 廣播變數
廣播變數:Broadcast,將大變數廣播出去,而不是直接使用
- 為什麼要用Broadcast
當進行隨機抽取一些操作,或者從某個表裡讀取一些維度的資料,比如所有商品品類的資訊,在某個運算元函式中要使用到,加入該資料大小為100M,那麼1000個task將會消耗100G的記憶體, 叢集損失不可估量 - Broadcast的原理
預設的情況下,每個task執行的運算元中,使用到了外部的變數,每個task都會獲取一份變數的副本,所以會消耗很多的記憶體,進而導致RDD持久化記憶體不夠等情況,大大影響執行速度
廣播變數,在driver上會有一份初始的副本,task在執行的時候,如果要使用廣播變數中的資料,首先會在自己本地的Executor對應的BlockManager中嘗試獲取變數副本,並儲存在本地的BlockManager中,此後這個Executor上的所有task,都會直接使用本地的BlockManager中的副本,Executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變數副本,距離越近越好.
總而言之: 廣播變數的好處不是每一個task一份變數副本,而是變成每個節點的executor才一份副本,這樣的話就可以變數產生的副本大大減少
5. Kryo序列化的使用
什麼是序列化
預設情況下,Spark內部是使用java的序列化機制ObjectInputStream/ObjectOutputStream,即物件輸入輸出流機制來進行序列化, 這種序列化機制的好處在於,處理起來比較方便,只需在在運算元裡面使用的變數,必須是實現Serializable介面的,可序列化即可,但是這種預設的序列化機制的效率不高,序列化速度慢,序列化以後的資料佔用的記憶體空間相對很大
Spark支援使用Kryo序列化機制,比預設的java序列化機制速度要快很多,而且序列化後的資料大小大概是java序列化機制的1/10
Kryo序列化機制,一旦啟用,會生效的幾個地方
-
運算元函式中使用的外部變數
運算元函式中使用的外部變數,在經過kryo序列化之後,會優化網路傳輸的效能,優化叢集中記憶體的佔用和消耗 -
持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER
持久化RDD的時候,優化記憶體的佔用和消耗 -
shuffle
優化網路傳輸的效能
在Spark程式中如何使用序列化
- 第一步: 在SparkConf中設定一個屬性
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- 第二步: 註冊需要使用Kryo序列化的自定義的類
如果要達到Kryo的最佳效能的話,那麼就一定要註冊自定義的類
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
6.fastutil
fastutil 是擴充套件了java標準集合框架(Map,List,Set,HashMap,ArrayList,HashSet)的類庫,提供了特殊型別的Map,Set,List和queue,fastutil能夠提供更小的記憶體佔用,更快的存取速度,fastutil也提供了64位的array、set和list,以及高效能快速的,以及實用的IO類,來處理二進位制和文字型別的檔案;
fastutil最新版本要求Java 7以及以上版本
Spark中fastutil應用場景:
- 1.如果運算元函式使用了外部變數,那麼可以用三步來優化: a.使用Broadcast廣播變數優化, b. 使用Kryo序列化類庫優化,提升效能和效率,c.如果外部變數是某種比較大的集合,可以使用fastutil改寫外部變數
- 2.在運算元函式中,如果要建立比較大的Map.List等集合,可以考慮將這些集合型別使用fastutil類庫重寫
fastutil的使用:
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
7.資料本地化等待時長
什麼是本地化等待時長
Spark在Driver上,對Application的每一個stage的task,在進行分配之前,都會計算出每個task要計算的是哪個分片的資料也即是RDD的某個partition.Spark的task分配演算法優先會希望每個task正好分配到它要計算的資料所在的節點,這樣就避免了網路間傳輸資料
但是,task可能沒有機會分配到它的資料所在的節點,因為可能計算資源和計算能力都滿了,這種情況下,Spark會等待一段時間,過了這個時間,才會選擇一個比較差的本地化級別,比如將這個task分配到相鄰的一個節點上,這個時候肯定發生網路傳輸,會通過一個getRemote()方法,通過TransferService(網路資料傳輸元件)從資料所在節點的BlockManager中獲取資料,上述中的一段時間即為本地化等待時長
如何調節本地化等待時長
PROCESS_LOCAL:程序本地化,程式碼和資料在同一個程序中,也就是在同一個executor中;計算資料的task由executor執行,資料在executor的BlockManager中;效能最好
NODE_LOCAL:節點本地化,程式碼和資料在同一個節點中;比如說,資料作為一個HDFSblock塊,就在節點上,而task在節點上某個executor中執行;或者是,資料和task在一個節點上的不同executor中;資料需要在程序間進行傳輸
NO_PREF:對於task來說,資料從哪裡獲取都一樣,沒有好壞之分
RACK_LOCAL:機架本地化,資料和task在一個機架的兩個節點上;資料需要通過網路在節點之間進行傳輸
ANY:資料和task可能在叢集中的任何地方,而且不在一個機架中,效能最差
spark.locality.wait,預設是3s
在使用client模式測試時,在本地就可以看到比較全的日誌,日誌裡面會顯示:
如果大多都是PROCESS_LOCAL,那就不用調節了
如果是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下資料本地化的等待時長
調節完,應該是要反覆調節,每次調節完以後,再來執行,觀察日誌
看看大部分的task的本地化級別有沒有提升;看看,整個spark作業的執行時間有沒有縮短
調節方法:
spark.locality.wait,預設是3s;可以調節為6s,10s
預設情況下,下面3個的等待時長,都是跟上面那個是一樣的,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
new SparkConf()
.set("spark.locality.wait", "10")
8.JVM調優
JVM體系結構
- CLASS FILES : scala或者java類程式
- CLASS LOADER : 類載入子系統
-
RUNTIME DATA AREA : 執行資料域元件
- Java Stack(棧)
- HEAP (堆)
- Method Area(方法區)
- PC Register(程式計數器)
- Native Method Stack (本地方法棧)
-
Execution Engine : 執行引擎子系統
- Native Interface : 本地介面元件
Java Stack (棧)
棧也叫棧記憶體,是Java程式的執行區,是線上程建立時建立,它的生命期是跟隨執行緒的生命期,執行緒結束棧記憶體也就釋放,對於棧來說不存在垃圾回收問題,只要執行緒一結束,棧也隨之釋放
棧中的資料是以棧幀(Stack Frame)的格式存在,棧幀是一個記憶體區塊,是一個數據集,是一個有關方法(Method)和執行期資料的資料集,當一個方法A被呼叫時就產生了一個棧幀F1,並被壓入到棧中,A方法又呼叫B方法,於是產生棧幀F2也被壓入棧中,執行完畢後,先彈出F2,後彈出F1,即”先進後出”原則
Java Heap(堆)
一個JVM例項只存在一個堆記憶體,堆記憶體的大小是可以調節的,堆記憶體也是JVM中用於存放物件與陣列例項的地方,垃圾回收的主要區域就是這裡(還有可能是方法區Method Area),類載入器讀取了類檔案之後,需要把類,方法,常變數放到堆記憶體中,以方便執行器執行,堆記憶體分為三個部分:
- Permanent Space : 永久儲存區
- Young Generation Space 新生區 / New Generation
- Tenure Generation Space 養老區 / Old Generation
- New /Young Generation
又稱新生代,程式中新建的物件都會分配到新生代中,新生代又由Eden Space 和兩塊Survivor Space構成,可通過-Xmn引數來指定其大小,Eden 和兩塊Survivor Space的大小比例預設是8:1:1,這個比例可通過-XX:SurvivorRatio來指定
- Old Generation
又稱老年代,用於存放程式中經過幾次垃圾回收還存活的物件,例如快取的物件,老年代所佔的記憶體大小即為-Xmx大小減去-Xmn大小
- Permanent Generation
一個常住記憶體區域,用於存放JDK自身所攜帶的Class,Interface的元資料,即儲存的是執行環境必須的類資訊,被裝載到此區域的資料是不會被;垃圾回收掉的,直至關閉JVM才會釋放
Young/New Generation
年輕代與Spark調優息息相關,所以這裡單獨拿出來講解
所有新生成的物件首先都是放在年輕代中,年輕代的目標就是儘可能快速的收集掉那些生命週期短的物件
大部分物件在Eden區生成,當Eden區滿時,還存活的物件將被複制到Survivor區中(兩中的一個),當這個Survivor區滿的時候,此區存放的物件會被放在另一個Survivor區中,當另一個Survivor也滿的時候,從第一個Survivor複製過來的還存活的物件將被複制到老年代中,Survivor的兩個區是對稱的,沒有先後關係,所以同一個Survivor取中可能存在從Eden複製過來的物件和從另一個Survivor複製過來的物件,而且Survivor總有一個是空的,而且可以配置多餘兩個
Spark的JVM調優
- 降低cache操作的記憶體佔比
Spark task執行運算元函式時會生成大量物件,這些物件會被放入年輕代中,當年輕代記憶體比較小時,會導致年輕代中Eden區和Survivor區頻繁記憶體溢滿,導致頻繁的minor GC,而頻繁的minorGC或導致一些存活的短宣告週期(其實就是在後面用不到的物件)物件直接放入老年代中,而當老年代記憶體溢滿是,則會導致Full GC
full gc / minor gc,無論是快,還是慢,都會導致jvm的工作執行緒停止工作,簡而言之,就是說,gc的時候,spark停止工作了。等著垃圾回收結束。
總而言之,上面的情況都是由記憶體不足引起的即記憶體不充足的時候,問題:
1、頻繁minor gc,也會導致頻繁spark停止工作
2、老年代囤積大量活躍物件(短生命週期的物件),導致頻繁full gc,full gc時間很長,短則數十秒,長則數分鐘,甚至數小時。可能導致spark長時間停止工作。
3、嚴重影響咱們的spark的效能和執行的速度。
如何增大記憶體?
Spark中,堆記憶體又被劃分成了兩塊,一塊是專門用來給RDD的cache,persist操作進行RDD快取用的,另一塊就是用來給Spark運算元函式用的,存放函式中自己建立的物件
預設情況下,給RDD的cache操作的記憶體佔比是0.6,即百分之六十的記憶體用來給RDD做快取用,但其實RDD並不需要這麼大的記憶體,我們可以通過檢視每個stage中每個task執行的時間,GC時間等來判斷是否發生了頻繁的minorGC和fullGC,從而來調低這個比例
調節方法
spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
- Ececutor堆外記憶體
當Spark處理超大資料量時(數十億,百億級別),executor的堆外記憶體可能會不夠用,出現shuffle file can’t find, task lost,OOM等情況
預設情況下,這個堆外記憶體是300M,當執行超大資料量時,通常會出現問題,因此需要調節到1G,2G,4G等大小
調節方法必須在spark-submit提交指令碼中設定而不能在程式中設定
--conf spark.yarn.executor.memoryOverhead=2048
- GC引起的連線等待時長
Spark在處理超大資料量時,task可能會建立很大很多的物件,頻繁的讓JVM記憶體溢滿,導致頻繁GC,而前面提到過executor獲取資料優先的從本地關聯的blockmanager獲取,如果沒有的話,會通過transferService去遠端連線其他executor的blockmanager,如果正好碰到那個executor垃圾回收,那麼程式就會卡住,spark預設網路連線時長是60s,當超過60s沒有獲取到資料,則直接宣告任務失敗,也有可能DAGscheduler反覆提交幾次stage,TaskScheduler反覆提交task,則會大大影響spark執行速度,所以可以考慮適當調節等待時長
調節方式同調節堆外記憶體一樣,必須在提交spark程式的指令碼中設定
--conf spark.core.connection.ack.wait.timeout=300
9. shuffle 調優
shuffle原理
什麼情況下發生shuffle?
在Spark中,主要有以下幾個運算元
- groupByKey : 把分佈在各個節點上的資料中的同一個key對應的value都集中到一塊兒,集中到叢集中的一個節點中,也即是集中到一個節點的executor的一個task中
- reduceByKey : 運算元函式對values集合進行reduce操作,最後生成一個value
- countByKey : 在一個task中獲取同一個key對應的所有value,然後計數,統計總共有多少value
- join : 兩個RDD,Key相同的兩個value都集中到一個executor的task中
shuffle過程:
在一個shuffle過程中,前半部分stage中,每個task都會建立後半部分stage中相同task數量的檔案,比如stage後半部分有100個task,那麼前半部分的每個task都會建立100個檔案(先寫入到記憶體緩衝中,後溢滿寫入到磁碟),會將同一個key對應的values寫入同一個檔案中,shuffle後半部分的stage中的task,每個task都會從各個節點的task建立其中一份屬於自己的那份檔案中,拉取屬於自己的key-value對,然後task會有一個記憶體緩衝區,然後呼叫HashMap進行key-values的聚合,最終呼叫我們定義的聚合函式來進行相應的操作
shuffle調優之合併map端輸出檔案
預設情況下,Spark是不開啟合併map端輸出檔案機制的,所以當分批次執行task時,每批的task都會建立新的檔案,而不會共用,大大影響了效能,所以當有大量map檔案生成時,需要開啟該機制
設定方法
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
設定合併機制之後:
第一個stage,並行執行2個task,執行這兩個task時會建立下一個stage的檔案,執行完之後,會執行下一批次的2個task,而這一批次的task則不會建立新的檔案,會複用上一批次的task建立的檔案
第二stage的task在拉取上一個stage建立的檔案時就不會拉取那麼多檔案了,而是拉取少量檔案,每個輸出檔案都可能包含了多個task給自己的map端輸出
shuffle調優之map端記憶體緩衝和reduce記憶體佔比
預設情況下:
每個task的記憶體緩衝為32kb,reduce端記憶體佔比為0.2(即預設executor記憶體中劃分給reduce task的微20%)
所以在不調優的情況下,如果map端task處理的比較大,記憶體不足則溢滿寫入磁碟
比如:
每個task就處理320kb,32kb,總共會向磁碟溢寫320 / 32 = 10次。
每個task處理32000kb,32kb,總共會向磁碟溢寫32000 / 32 = 1000次。
同理,ruduce端也一樣
何時調優?
通過Spark UI檢視shuffle磁碟的read和write是不是很大,如果很大則應相應調優
如何調優?
spark.shuffle.file.buffer : 32kb -> 128kb
spark.shuffle.memoryFraction: 0.2 -> 0.3
10. Spark運算元調優之MapPartitions
MapPartitions提升Map類操作效能
spark中,最基本的原則是每個task處理一個RDD的partition
如果是普通的Map,假如一個partition中有一萬條資料,那麼map中的function就要執行和計算一萬次,但是使用MapPartitions操作之後,一個task只會執行一次function,function一次接收了所有partition資料,效能比較高
MapPartitions的缺點:
如果是普通的Map,一條一條處理資料,當出現記憶體不夠的情況時,那麼就可以將已經處理掉的資料從記憶體裡面垃圾回收掉,所以普通map通常不會出現OOM情況
如果是MapPartitions,對於大量資料來說,如果一個partiton資料有一百萬條,一次性傳入function之後,可能導致記憶體不足,但是又沒辦法騰出空間,直接就導致了記憶體溢位,OOM
所以,當使用MapPartitons運算元時,要估算每個partiton的資料能不能一下子快取到分配給executor的記憶體中,如果可以,就是用該運算元,對效能有顯著提升
11. Spark運算元調優之filter過後使用coalesce減少分割槽數量
spark程式,通常情況下,RDD在經過filter之後,會出現兩個情況:
-
每個partition內的資料可能就不太一樣,有的很多有的很少
這樣會導致後面在處理這些partition資料的時候,每個task處理的資料量相差懸殊,最終導致很嚴重的問題->資料傾斜 -
partition資料量減少
由於partition資料量減少,但是在後面進行處理的時候,還是按照跟partition相同數量的task來進行處理,這就導致的資源浪費
針對上面兩個問題,需要使用coalesce運算元來處理,該運算元能壓縮partiton的數量,減少partiton的資料量,而且讓每個partition的資料量都儘量均衡緊湊,從而便於後續task進行處理,從某種程度上提升spark程式的效能
12. Spark運算元調優之foreachPartition優化寫資料庫效能
foreach是對每條資料進行處理的,task對partition中的每一條資料都會執行function操作,如果function中有寫資料庫的操作,那麼有多少條資料就會建立和銷燬多少個數據庫連線,這對效能的影響很大
在生產環境中,通常都是使用foreachPartition來寫資料庫的,使用了該運算元之後,對於使用者自定義的function函式,就呼叫一次,一次傳入一個partition的所有資料,這裡只需建立一個數據庫連線,然後向資料庫傳送一條sql語句外加多組引數即可,但是這個時候要配合資料庫的批處理
同樣,該運算元在超大資料量面前同樣會出現OOM情況
13. Spark運算元調優之使用repartition解決SparkSQL低並行度效能問題
通常情況下,在上面第二條的並行度調優時,使用spark.default.parallelism
來設定並行度,那麼這個設定在什麼地方有效,什麼地方無效?
當程式中沒有使用SparkSQL,那麼整個sparkapplication的所有stage的並行度都是設定的這個引數,除非使用了coalesce運算元縮減過partition.
當程式中使用了SparkSQL,那麼SparkSQl的那麼stage的並行度無法設定,因為SparkSQL會預設的根據Hive表對應的hdfs檔案的block,自動設定SparkSQL那個stage的並行度,所以這就導致出現了用少量task來處理複雜邏輯的情況, 這種情況下,需要使用repartition來設定SparkSQL的並行度,即對從Hive中讀取出來的RDD,使用repartiton重新分割槽為預期的數量來設定並行度
14.Spark運算元調優之reduceByKey的本地聚合
reduceByKey相對於普通的shuffle操作(比如groupByKey),它的一個重要特點就是map端的本地聚合,見圖:
在map端,給下個stage的每個task建立的輸出檔案中,寫資料之前,會進行本地的combiner操作,也就是說,對每一個key,對應的values都會執行使用者自定義的運算元函式,比如+_,當進行了這個combiner操作之後,減少了資料量,也即是減少了磁碟IO,同時減少了網路傳輸,對效能有明顯提升,所以,在實際的專案中,能用reduceByKey實現的就儘量用該運算元實現
資料傾斜
資料傾斜原理及現象分析
資料傾斜是Spark中極其影響效能的現象,它甚至能導致程式無法跑完,更不用提效能調優什麼的了
資料傾斜如何產生的?
在shuffle操作的時候,是按照key來進行value的資料的輸出,拉取和聚合的,同一個key的values,一定是分配到同一個reduce task進行處理的,假如多個key對應的value一共有90萬條資料,但是可能某條key對應了88萬條,其他key最多也就對應數萬條資料,那麼處理這88萬條資料的reduce task肯定會特別耗費時間,甚至會直接導致OOM,這就是所謂的資料傾斜
資料傾斜解決方案
1.聚合源資料
Spark的資料來源通常情況下都是來自於Hive表,(HDFS或其它大資料分散式系統),而Hive本身就是適合做離線資料分析的,所以說通常要變換一下思路,能在Hive中做聚合的,通常就可以跑定時任務在Hive中做聚合,最終spark拿到的只是每個key對應的一個value值,然後就可以使用map來對這個特殊的value串來處理,省去了groupByKey的過程
2.過濾掉導致傾斜的key
這種情況只適合使用者能夠接受摒棄某些特殊的資料,比如大部分key都對應了幾十萬條,而少數key只對應了幾十條,那麼直接在Hive中過濾掉這些key就從源頭上避免了資料傾斜
3.提高shuffle操作reduce端並行度
提高shuffle操作reduce端並行度會有更多task來處理資料,那麼每個task處理的資料會相對來說更少一些
如何操作?
給shuffle運算元傳遞進去一個引數,即一個數字,這個數字就代表了shuffle操作時reduce端的並行度,然後在進行shuffle操作的時候,就會對應建立指定數量的reduce task
4.使用隨機key實現雙重聚合
這個場景主要用於reduceByKey,groupByKey,而非join
主要原理就是:
在第一輪聚合時,對key進行打散,將原先一樣的key,變成不一樣的key,相當於將每個key分組,然後針對key的多個分組,進行key的區域性聚合,接著再去掉key的字首,然後對所有key進行全域性聚合,這種方案對解決這兩個運算元產生的資料傾斜有比較好的效果
5.join運算元操作的資料傾斜解決方案
將reduce join轉換為map join
示例程式碼:
List<Tuple2<Long, Row>> userInfos = userid2InfoRDD.collect();
final Broadcast<List<Tuple2<Long, Row>>> userInfosBroadcast = sc.broadcast(userInfos);
JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2PartAggrInfoRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
// 得到使用者資訊map
List<Tuple2<Long, Row>> userInfos = userInfosBroadcast.value();
Map<Long, Row> userInfoMap = new HashMap<Long, Row>();
for(Tuple2<Long, Row> userInfo : userInfos) {
userInfoMap.put(userInfo._1, userInfo._2);
}
// 獲取到當前使用者對應的資訊
String partAggrInfo = tuple._2;
Row userInfoRow = userInfoMap.get(tuple._1);
String sessionid = StringUtils.getFieldFromConcatString(
partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
int age = userInfoRow.getInt(3);
String professional = userInfoRow.getString(4);
String city = userInfoRow.getString(5);
String sex = userInfoRow.getString(6);
String fullAggrInfo = partAggrInfo + "|"
+ Constants.FIELD_AGE + "=" + age + "|"
+ Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
+ Constants.FIELD_CITY + "=" + city + "|"
+ Constants.FIELD_SEX + "=" + sex;
return new Tuple2<String, String>(sessionid, fullAggrInfo);
}
});
原理圖:
說明:
普通的join,肯定是要走shuffle,那麼,既然走shuffle,那麼普通的join肯定是reduce join, 即將所有相同的key對應的values,聚合到一個task中,再進行join操作
那麼如何將reduce join轉換為map join?
當兩個RDD要進行join時,其中一個RDD是比較小的,那麼就可將該小資料量RDD廣播出去,該RDD資料將會在每個executor的blockmanager中駐留一份資料,然後在map操作中就可以使用該資料,這種方式下,根本就不會發生shuffle操作,從而從根本上杜絕了資料傾斜
sample取樣傾斜key進行兩次join
示例程式碼
JavaPairRDD<Long, String> sampledRDD = userid2PartAggrInfoRDD.sample(false, 0.1, 9);
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._1, 1L);
}
});
JavaPairRDD<Long, Long> computedSampledRDD = mappedSampledRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
JavaPairRDD<Long, Long> reversedSampledRDD = computedSampledRDD.mapToPair(
new PairFunction<Tuple2<Long,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._2, tuple._1);
}
});
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
JavaPairRDD<Long, String> skewedRDD = userid2PartAggrInfoRDD.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
});
JavaPairRDD<Long, String> commonRDD = userid2PartAggrInfoRDD.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return !tuple._1.equals(skewedUserid);
}
});
JavaPairRDD<String, Row> skewedUserid2infoRDD = userid2InfoRDD.filter(
new Function<Tuple2<Long,Row>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
}).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(
Tuple2<Long, Row> tuple) throws Exception {
Random random = new Random();
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
for(int i = 0; i < 100; i++) {
int prefix = random.nextInt(100);
list.add(new Tuple2<String, Row>(prefix + "_" + tuple._1, tuple._2));
}
return list;
}
});
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
}).join(skewedUserid2infoRDD).mapToPair(
new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Tuple2<String, Row>> call(
Tuple2<String, Tuple2<String, Row>> tuple)
throws Exception {
long userid = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Tuple2<String, Row>>(userid, tuple._2);
}
});
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(userid2InfoRDD);
JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);
JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = joinedRDD.mapToPair(
new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(
Tuple2<Long, Tuple2<String, Row>> tuple)
throws Exception {
String partAggrInfo = tuple._2._1;
Row userInfoRow = tuple._2._2;
String sessionid = StringUtils.getFieldFromConcatString(
partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
int age = userInfoRow.getInt(3);
String professional = userInfoRow.getString(4);
String city = userInfoRow.getString(5);
String sex = userInfoRow.getString(6);
String fullAggrInfo = partAggrInfo + "|"
+ Constants.FIELD_AGE + "=" + age + "|"
+ Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
+ Constants.FIELD_CITY + "=" + city + "|"
+ Constants.FIELD_SEX + "=" + sex;
return new Tuple2<String, String>(sessionid, fullAggrInfo);
}
});
這個方案的實現思路關鍵之處在於:
將發生資料傾斜的key單獨拉出來,放到一個RDD中,然後用這個原本會產生資料傾斜的key RDD和其他RDD單獨去join一下,這個時候,key對應的資料,可能就會分散到多個task中進行join操作
,如果該傾斜key與其他key混合在一起中,肯定會導致這個key對應的所有資料都聚合到一個task中,然後導致資料傾斜
最後,當取出來這個傾斜key之後,還可以通過增加隨機數字首的方法進行優化,如上面程式碼中:
針對取出來的這個資料傾斜key,從它要join的另一個RDD中過濾出來此key對應的資料,有可能只有一條,那麼就為該條資料的RDD使用flatmap運算元,打上100個隨機數,作為字首,返回一百條資料,然後對該資料傾斜key的RDD,給每條資料也打上一個100以內的隨機數作為字首,再去join操作,效能會提升很多,join之後使用map操作去除字首,然後再與非傾斜key join的結果進行union,即可得到預期結果
注意:當資料中產生資料傾斜的key很多時,就不適合使用這種方案了,需要考慮下一種方案
使用隨機數以及擴容表近join
該方案其實對資料傾斜的一種緩解,而非解決,它的思路是:
選擇一個RDD,使用flatmap進行擴容,將每條資料對映為多條資料,每個映射出來的資料,都帶了一個n以內的隨機數,通常是10以內的,對另外一個RDD,做普通的map對映操作,每條資料都打上10以內的隨機數,然後將兩個處理後的RDD,進行join操作
示例程式碼
JavaPairRDD<String, Row> expandedRDD = userid2InfoRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
throws Exception {
List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();
for(int i = 0; i < 10; i++) {
list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
}
return list;
}
});
JavaPairRDD<String, String> mappedRDD = userid2PartAggrInfoRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
});
JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);
JavaPairRDD<String, String> finalRDD = joinedRDD.mapToPair(
new PairFunction<Tuple2<String,Tuple2<String,Row>>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(
Tuple2<String, Tuple2<String, Row>> tuple)
throws Exception {
String partAggrInfo = tuple._2._1;
Row userInfoRow = tuple._2._2;
String sessionid = StringUtils.getFieldFromConcatString(
partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
int age = userInfoRow.getInt(3);
String professional = userInfoRow.getString(4);
String city = userInfoRow.getString(5);
String sex = userInfoRow.getString(6);
String fullAggrInfo = partAggrInfo + "|"
+ Constants.FIELD_AGE + "=" + age + "|"
+ Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
+ Constants.FIELD_CITY + "=" + city + "|"
+ Constants.FIELD_SEX + "=" + sex;
return new Tuple2<String, String>(sessionid, fullAggrInfo);
}
});
該方案的缺點就是,因為我們本身的RDD資料量非常大,無法進行大量擴容,一般也就10倍,所以這個方案也僅僅對資料傾斜起到了緩解作用
troubleshooting
troubleshooting之控制shuffle reduce端緩衝大小以避免OOM
Map端的task是不斷的輸出資料的,資料量可能是很大的,但是Reduce端的task並不是等到map端task將屬於自己的那份資料全部寫入磁碟檔案之後才去拉取的,實際是,map寫一點資料,reduce端的task就會去拉取一點資料,然後立即進行後面的聚合,運算元函式的應用
Reduce端task每次拉取多少資料是由buffer來決定的,因為拉取過來的資料都是先放在buffer的,然後才用後面的executor分配的堆記憶體佔比
那麼問題就出在這個buffer上面:
該buffer預設為48M,大部分情況下,reduce task不會拉取很多的資料,一般10M左右就計算處理了,所以大多數時候,不會出現什麼問題,但是有的時候,map端資料量非常大,寫出的速度特別快,reduce端的所有task拉取資料的時候,全部達到自己緩衝的極限值,也就是48M全部填滿,這個時候,再加上聚合函式的程式碼,可能會建立大量的物件,可能一下子會把記憶體爆掉,導致程式直接崩潰
針對這個情況,就應該減少reduce端task緩衝的大小,寧願多拉取幾次,也不一下子拉取很多資料,犧牲效能換取執行
換言之,當叢集資源充足,map端資料量又不是很大的情況下,就可以考慮調大此緩衝大小來提高效能
調節方式:
spark.reducer.maxSizeInFlight,48
spark.reducer.maxSizeInFlight,24
troubleshooting之解決JVM的GC導致拉取檔案失敗
在Spark作業中,有一種錯誤非常普遍,即shuffle file not found......
解析:
executor的JVM程序,可能記憶體不夠,那麼此時就會執行GC,minorGC 和 FullGC,無論哪種,一旦發生了GC,就會導致executor內,所有的工作執行緒全部停止,比如BlockManager,基於netty的網路通訊,而下一個stage的executor可能還沒有停止掉,task想要去上一個stage的的task所在的executor去拉取屬於自己的資料,結果由於對方在GC,就導致了拉取了半天的資料沒有拉取到,就直接報出shuffle file not found...
的錯誤,可能下一個stage又重新提交了stage或task之後,再執行就沒有問題了
如何避免上述情況呢? 兩個引數:
spark.shuffle.io.maxRetries 3
spark.shuffle.io.retryWait 5s
第一個引數的意思是shuffle檔案拉取的時候,如果沒有拉取到或拉取失敗,最多會重試幾次,第二個引數的意思是每一次重新拉取檔案的時間間隔是5s
所以說,當上一個stage的executor正在發生漫長的FullGC,導致第二個stage的executor嘗試去拉取檔案,而拉取不到,預設情況下,會反覆重試3次,每次間隔是5秒鐘,最多隻會等待3*5=15秒,如果15秒沒有拉取到資料,就會報出shuffle file not found...
錯誤
所以當程式頻繁報該錯誤時,需要調節大這兩個引數
troubleshooting之解決YARN佇列資源不足導致的Application連線失敗
當提交一個Spark程式時,如果沒有足夠的資源,一般會出現兩種情況(根據叢集的配置,hadoop的版本等情況不同):
一是Yarn發現資源不足時,不會Hang在那裡,而是直接列印fail日誌,直接fail
二是Yarn發現資源不足,會Hang在那裡直到有足夠的資源來執行
另外,當某個spark程式耗時比較長時,如果在同一個佇列中再提交一個耗時短的spark程式,那麼耗時短的會等很長時間,會很不合適,所以要考慮多個排程佇列
在CDH叢集中,可以設定多個排程佇列,動態資源選項新建,然後在J2EE中,通過執行緒池的方法(一個執行緒池對應一個資源佇列)來實現上述方案
ExecutorService threadPool = Executors.newFixedThreadPool(1);
threadPool.submit(new Runnable() {
@Override
public void run() {
}
});
troubleshooting之解決各種序列化導致的報錯
序列化報錯的出現:當用client模式去提交spark作業,檢視本地打印出來的log,如果出現了Serializable,Serialize等字眼,那麼就是序列化導致的報錯
處理方法:
1.運算元函式裡面如果使用到了外部自定義型別的變數,那麼自定義的型別必須是可序列化的
2.如果要將自定義的型別作為RDD的元素型別,那麼自定義的型別必須也是可序列化的
JavaPairRDD<Integer, Teacher> teacherRDD
JavaPairRDD<Integer, Student> studentRDD
studentRDD.join(teacherRDD)
public class Teacher implements Serializable {
}
public class Student implements Serializable {
}
3.不能在上述兩種情況下,使用一些第三方的,不支援序列化的型別
比如Connection conn = ....
troubleshooting之解決運算元函式返回NULL導致的問題
在某些運算元函式裡面,是需要有一個返回值的,但是有時候我們不想有返回值,如果直接返回NULL的話,會報錯的,遇到這種情況,在返回的時候,需要返回一些特殊值,比如”-999”,那麼通過這個運算元得到的RDD,再經過filter過濾操作,過濾掉就可以了
troubleshooting之解決yarn-client模式導致的網絡卡流量激增問題
當使用yarn-client模式提交spark程式時,driver是啟動在本地機器的,而且driver是全權負責所有的任務的排程的,也就是說,要跟yarn叢集上執行的多個executor進行頻繁的通訊,有可能導致網絡卡流量激增,那麼解決辦法就是使用yarn-cluster模式,這樣就不是本地的機器的driver來進行task排程了
troubleshooting之解決yarn-cluster模式的JVM記憶體溢位無法執行問題
有的時候,執行一些包含了sparkSQL的spark作業,可能會碰到yarn-client模式下提交可以正常執行,但是yarn-cluster模式下可能無法執行,報出JVM的PermGen永久代的記憶體溢位,即OOM
出現這種情況的原因:
yarn-client模式下,driver是執行在本地機器上的,spark使用的JVM的PermGen的配置,是本地的spark-class檔案(spark客戶端預設是有配置的),JVM永久代的大小是128M,但是在yarn-cluster模式下,driver是執行在叢集的某個節點上的,使用的是沒有經過配置的預設值82M
sparkSQL,它的內部是要進行很複雜的SQL的語義解析,語法樹的轉換等等,特別複雜,在這種複雜的情況下,如果說sql本身特別複雜的話,很可能會導致效能的消耗,記憶體的消耗,對PermGen佔用比較大,所以會出現這種情況
如何解決這種情況:
//yarn-cluster模式下
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
最後
sparkSQL,如果有大量的or語句,比如有成百上千的時候,此時可能就會出現一個driver端的jvm stack overflow,即JVM棧記憶體溢位的問題
JVM棧記憶體溢位,基本上就是由於呼叫的方法層級過多,因為產生了大量的,非常深的,超出了JVM棧深度限制的遞迴,所以這個時候可能要將sql語句拆分為多個sql子句來執行
troubleshooting之錯誤的持久化方式及checkpoint的使用
持久化方式
userRDD,如果想要對這個RDD做一個cache,希望能在後面多次使用這個RDD的時候,不用反覆重新計算RDD,而是可以直接使用各個節點的executor的BlockManager管理的記憶體/磁碟上的資料
錯誤方式:
uesrRDD.cache()
userRDD.count()
userRDD.take()
上面會持久化方式不會生效,反而會報file not found
的錯誤
正確方式:
userRDD=userRDD.cache()
val cacheuserRDD=userRDD.cache()
checkpoint使用
1.在程式碼中,用SparkContext設定一個checkpoint目錄,可以是一個容錯檔案系統的目錄,比如HDFS
2.在程式碼中,對需要進行checkpoint的RDD,執行RDD.checkpoint()
3.RDDCheckpointData(Spark內部的API),會接管該RDD,標記為marked for checkpoint,準備進行checkpoint
4.job執行完之後,會呼叫一個finalRDD.doCheckpoint()方法,會順著RDD lineage,回溯掃描,發現有標記為待checkpoint的RDD,就會進行二次標記,inProgressCheckpoint,即正在接受checkpoint操作
5.job執行完後,會啟動一個內部的新的job,去將標記為inProgressCheckpoint的RDD的資料,都寫入hdfs檔案中(如果rdd之前cache過,會直接從快取中獲取資料,寫入hdfs中,如果沒有cache過,那麼就會重新計算一遍這個RDD,再checkpoint)
6.將checkpoint過的RDD之前的依賴RDD,改成一個CheckpointRDD*,強制改變RDD的lineage,後面如果RDD的cache資料獲取失敗,直接會通過它的上游CheckpointRDD,去容錯的檔案系統中,比如hdfs,獲取Checkpoint資料
Checkpoint其實是cache的一個備胎