1. 程式人生 > >【Big Data 每日一題】Spark開發效能調優總結

【Big Data 每日一題】Spark開發效能調優總結

1. 分配資源調優

Spark效能調優的王道就是分配資源,即增加和分配更多的資源對效能速度的提升是顯而易見的,基本上,在一定範圍之內,增加資源與效能的提升是成正比的,當公司資源有限,能分配的資源達到頂峰之後,那麼才去考慮做其他的調優

如何分配及分配哪些資源

在生產環境中,提交spark作業時,使用spark-submit shell指令碼,裡面調整對應的引數 
常用引數

/opt/modules/spark-1.6.1-bin-2.5.0-cdh5.3.6/bin/spark-submit \
--class com.hypers.sparkproject.spark.session.UserVisitSessionAnalyzeSpark \
--num-executors 3 \                           --配置executor的數量
--driver-memory  1024M \                      --配置driver的記憶體,影響不大
--executor-memory 2G \                        --配置每個executor的記憶體大小
--executor-cores 3 \ --Spark standalone and YARN only --配置每個executor的cpu核數
/usr/loacl/recommend-1.0-SNAPSHOT.jar \

分配多少

  • 第一種:Spark Standalone即Spark執行在自己的分散式框架時,需要知道每臺機器能夠使用的記憶體,CPU核數,假如每臺機器能夠使用4G記憶體和2個CPU核數,一共20臺機器,那麼就可以executor數量設定20,每個executor記憶體設定4G,每個executor設定2 CPU core
  • 第二種: Yarn 當Spark執行在yarn上時,需要檢視資源佇列有多少資源,假如資源佇列有500G記憶體,100個CPU core可用,那麼就可以設定50個executor,每個executor記憶體設定10G,每個executor設定2個CPU core

總之,就是儘量的去調節到最大的大小(executor的數量和executor的記憶體)

資源分配是如何影響效能的

當我們從客戶端提交一個Spark應用程式時,SparkContext,DAGScheduler,TaskScheduler會將程式中的運算元,切分成大量的task,提交到executor上面執行

  • 增加executor數量和executor的CPU核數 : 增加了並行執行能力,加入原來20個executor,每個executor的CPU核數為2個,那麼能夠並行執行的task數量就是40個task,當在資源允許的情況下增加這兩個指標,執行速度將會成倍增加
  • 增加executor的記憶體 : 增加記憶體後,對效能的提升主要有三點: 
    1. 如果要對RDD進行cache,那麼更多的記憶體就可以快取更多的資料,將更少的資料寫入磁碟,甚至不寫入磁碟,減少了磁碟IO
    2. 對於shuffle操作,在reduce端,會需要記憶體來存放拉取的資料並進行聚合,如果記憶體不足,也會寫入磁碟,如果給executor分配更多的記憶體,同樣減少磁碟IO
    3. 對於task的執行,可能會建立很多物件,如果記憶體較小,可能會頻繁導致JVM堆記憶體滿了,然後頻繁GC,垃圾回收,minor GC和full GC,速度很慢

2. 並行度調優

何為並行度

並行度指Spark作業中,各個stage的task數量 
為什麼要設定並行度 ? 
假設,現在已經在spark-submit腳本里面,給spark作業分配了足夠多的資源,比如50個executor,每個executor有10G記憶體,每個executor有3個CPU核。基本已經達到了叢集或者yarn佇列的資源上限。但是task沒有設定,或者設定的很少,比如就設定了,100個task。50個executor,每個executor有3個cpu核,也就是說,你的Application任何一個stage執行的時候,都有總數在150個cpu核,可以並行執行。但是你現在,只有100個task,平均分配一下,每個executor分配到2個task,那麼同時在執行的task,只有100個,每個executor只會並行執行2個task。每個executor剩下的一個cpu核,就浪費掉了。所以你的資源雖然分配足夠了,但是問題是,並行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。

合理的設定並行度

理想情況下,task數量設定成Spark Application 的總CPU核數,但是現實中總有一些task執行慢一些task快,導致快的先執行完,空餘的cpu 核就浪費掉了,所以官方推薦task數量要設定成Spark Application的總cpu核數的2~3 倍

如何設定並行度

SparkConf conf = new SparkConf()
  .set("spark.default.parallelism", "500")

 

3. 重構RDD架構及RDD持久化 序列化

  • RDD架構重構與優化 
    預設情況下,多次對一個RDD執行運算元,去獲取不同的RDD,都會對這個RDD以及之前的父RDD全部重新計算一次,在實際專案中,一定要避免出現一個RDD重複計算的情況, 所以,要儘量去複用RDD,差不多的RDD可以抽取為一個共同的RDD,供後面的RDD計算時反覆使用
  • 公共RDD實現持久化 
    對於要多次計算和使用的公共RDD,一定要進行持久化 
    持久化:也就是說,將RDD的資料快取在記憶體中/磁碟中,(BlockManager),之後無論對這個RDD做多少次計算,都是直接取這個RDD的持久化的資料
  • 持久化資料序列化 
    如果正常將資料持久化在記憶體中,那麼可能會導致記憶體的佔用過大,會導致OOM,當純記憶體無法支撐公共RDD資料完全存放的時候,就需要優先考慮使用序列化的方式在純記憶體中儲存,將RDD的每個partition的資料,序列化成一個大的位元組陣列,就一個物件,序列化後,大大減少記憶體的空間佔用 
    序列化的唯一缺點就是在獲取資料的時候需要反序列化, 如果序列化後純記憶體的方式還導致OOM,就只能考慮記憶體+無序列化的普通方式
  • 持久化+雙副本機制 
    為了資料的高可靠性,而且記憶體充足,可以使用雙副本機制進行持久化 
    持久化的雙副本機制,持久化後的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;持久化的每個資料單元,儲存一份副本,放在其他節點上面;從而進行容錯;一個副本丟了,不用重新計算,還可以使用另外一份副本。

4. 廣播變數

廣播變數:Broadcast,將大變數廣播出去,而不是直接使用

  • 為什麼要用Broadcast 
    當進行隨機抽取一些操作,或者從某個表裡讀取一些維度的資料,比如所有商品品類的資訊,在某個運算元函式中要使用到,加入該資料大小為100M,那麼1000個task將會消耗100G的記憶體, 叢集損失不可估量
  • Broadcast的原理 
    預設的情況下,每個task執行的運算元中,使用到了外部的變數,每個task都會獲取一份變數的副本,所以會消耗很多的記憶體,進而導致RDD持久化記憶體不夠等情況,大大影響執行速度 
    廣播變數,在driver上會有一份初始的副本,task在執行的時候,如果要使用廣播變數中的資料,首先會在自己本地的Executor對應的BlockManager中嘗試獲取變數副本,並儲存在本地的BlockManager中,此後這個Executor上的所有task,都會直接使用本地的BlockManager中的副本,Executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變數副本,距離越近越好. 
    總而言之: 廣播變數的好處不是每一個task一份變數副本,而是變成每個節點的executor才一份副本,這樣的話就可以變數產生的副本大大減少

5. Kryo序列化的使用

什麼是序列化

預設情況下,Spark內部是使用java的序列化機制ObjectInputStream/ObjectOutputStream,即物件輸入輸出流機制來進行序列化, 這種序列化機制的好處在於,處理起來比較方便,只需在在運算元裡面使用的變數,必須是實現Serializable介面的,可序列化即可,但是這種預設的序列化機制的效率不高,序列化速度慢,序列化以後的資料佔用的記憶體空間相對很大 
Spark支援使用Kryo序列化機制,比預設的java序列化機制速度要快很多,而且序列化後的資料大小大概是java序列化機制的1/10 
Kryo序列化機制,一旦啟用,會生效的幾個地方

  • 運算元函式中使用的外部變數 
    運算元函式中使用的外部變數,在經過kryo序列化之後,會優化網路傳輸的效能,優化叢集中記憶體的佔用和消耗

  • 持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER 
    持久化RDD的時候,優化記憶體的佔用和消耗

  • shuffle 
    優化網路傳輸的效能

在Spark程式中如何使用序列化

  • 第一步: 在SparkConf中設定一個屬性
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

 

  • 第二步: 註冊需要使用Kryo序列化的自定義的類 
    如果要達到Kryo的最佳效能的話,那麼就一定要註冊自定義的類
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})

6.fastutil

fastutil 是擴充套件了java標準集合框架(Map,List,Set,HashMap,ArrayList,HashSet)的類庫,提供了特殊型別的Map,Set,List和queue,fastutil能夠提供更小的記憶體佔用,更快的存取速度,fastutil也提供了64位的array、set和list,以及高效能快速的,以及實用的IO類,來處理二進位制和文字型別的檔案; 
fastutil最新版本要求Java 7以及以上版本 
Spark中fastutil應用場景:

  • 1.如果運算元函式使用了外部變數,那麼可以用三步來優化: a.使用Broadcast廣播變數優化, b. 使用Kryo序列化類庫優化,提升效能和效率,c.如果外部變數是某種比較大的集合,可以使用fastutil改寫外部變數
  • 2.在運算元函式中,如果要建立比較大的Map.List等集合,可以考慮將這些集合型別使用fastutil類庫重寫

fastutil的使用:

<dependency>
    <groupId>fastutil</groupId>
    <artifactId>fastutil</artifactId>
    <version>5.0.9</version>
</dependency>

7.資料本地化等待時長

什麼是本地化等待時長

Spark在Driver上,對Application的每一個stage的task,在進行分配之前,都會計算出每個task要計算的是哪個分片的資料也即是RDD的某個partition.Spark的task分配演算法優先會希望每個task正好分配到它要計算的資料所在的節點,這樣就避免了網路間傳輸資料 
但是,task可能沒有機會分配到它的資料所在的節點,因為可能計算資源和計算能力都滿了,這種情況下,Spark會等待一段時間,過了這個時間,才會選擇一個比較差的本地化級別,比如將這個task分配到相鄰的一個節點上,這個時候肯定發生網路傳輸,會通過一個getRemote()方法,通過TransferService(網路資料傳輸元件)從資料所在節點的BlockManager中獲取資料,上述中的一段時間即為本地化等待時長

如何調節本地化等待時長

PROCESS_LOCAL:程序本地化,程式碼和資料在同一個程序中,也就是在同一個executor中;計算資料的task由executor執行,資料在executor的BlockManager中;效能最好
NODE_LOCAL:節點本地化,程式碼和資料在同一個節點中;比如說,資料作為一個HDFSblock塊,就在節點上,而task在節點上某個executor中執行;或者是,資料和task在一個節點上的不同executor中;資料需要在程序間進行傳輸
NO_PREF:對於task來說,資料從哪裡獲取都一樣,沒有好壞之分
RACK_LOCAL:機架本地化,資料和task在一個機架的兩個節點上;資料需要通過網路在節點之間進行傳輸
ANY:資料和task可能在叢集中的任何地方,而且不在一個機架中,效能最差

spark.locality.wait,預設是3s

在使用client模式測試時,在本地就可以看到比較全的日誌,日誌裡面會顯示: 
image_1bh8t33kl1k5r1veui9u1cjr1bbd9.png-80.2kB
如果大多都是PROCESS_LOCAL,那就不用調節了 
如果是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下資料本地化的等待時長 
調節完,應該是要反覆調節,每次調節完以後,再來執行,觀察日誌 
看看大部分的task的本地化級別有沒有提升;看看,整個spark作業的執行時間有沒有縮短 
調節方法:

spark.locality.wait,預設是3s;可以調節為6s,10s
預設情況下,下面3個的等待時長,都是跟上面那個是一樣的,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

new SparkConf()
  .set("spark.locality.wait", "10")

8.JVM調優

JVM體系結構

image_1bh909ge5fl315j1d7916rt3241g.png-240.3kB

  • CLASS FILES : scala或者java類程式
  • CLASS LOADER : 類載入子系統
  • RUNTIME DATA AREA : 執行資料域元件

    • Java Stack(棧)
    • HEAP (堆)
    • Method Area(方法區)
    • PC Register(程式計數器)
    • Native Method Stack (本地方法棧)
  • Execution Engine : 執行引擎子系統

  • Native Interface : 本地介面元件

Java Stack (棧)

棧也叫棧記憶體,是Java程式的執行區,是線上程建立時建立,它的生命期是跟隨執行緒的生命期,執行緒結束棧記憶體也就釋放,對於棧來說不存在垃圾回收問題,只要執行緒一結束,棧也隨之釋放 
棧中的資料是以棧幀(Stack Frame)的格式存在,棧幀是一個記憶體區塊,是一個數據集,是一個有關方法(Method)和執行期資料的資料集,當一個方法A被呼叫時就產生了一個棧幀F1,並被壓入到棧中,A方法又呼叫B方法,於是產生棧幀F2也被壓入棧中,執行完畢後,先彈出F2,後彈出F1,即”先進後出”原則

Java Heap(堆)

一個JVM例項只存在一個堆記憶體,堆記憶體的大小是可以調節的,堆記憶體也是JVM中用於存放物件與陣列例項的地方,垃圾回收的主要區域就是這裡(還有可能是方法區Method Area),類載入器讀取了類檔案之後,需要把類,方法,常變數放到堆記憶體中,以方便執行器執行,堆記憶體分為三個部分:

  • Permanent Space : 永久儲存區
  • Young Generation Space 新生區 / New Generation
  • Tenure Generation Space 養老區 / Old Generation

image_1bh92ep9u19fq1ot41lh0hmd1urc1t.png-259.9kB
- New /Young Generation 
又稱新生代,程式中新建的物件都會分配到新生代中,新生代又由Eden Space 和兩塊Survivor Space構成,可通過-Xmn引數來指定其大小,Eden 和兩塊Survivor Space的大小比例預設是8:1:1,這個比例可通過-XX:SurvivorRatio來指定 
- Old Generation 
又稱老年代,用於存放程式中經過幾次垃圾回收還存活的物件,例如快取的物件,老年代所佔的記憶體大小即為-Xmx大小減去-Xmn大小 
- Permanent Generation 
一個常住記憶體區域,用於存放JDK自身所攜帶的Class,Interface的元資料,即儲存的是執行環境必須的類資訊,被裝載到此區域的資料是不會被;垃圾回收掉的,直至關閉JVM才會釋放

Young/New Generation

年輕代與Spark調優息息相關,所以這裡單獨拿出來講解 
所有新生成的物件首先都是放在年輕代中,年輕代的目標就是儘可能快速的收集掉那些生命週期短的物件 
大部分物件在Eden區生成,當Eden區滿時,還存活的物件將被複制到Survivor區中(兩中的一個),當這個Survivor區滿的時候,此區存放的物件會被放在另一個Survivor區中,當另一個Survivor也滿的時候,從第一個Survivor複製過來的還存活的物件將被複制到老年代中,Survivor的兩個區是對稱的,沒有先後關係,所以同一個Survivor取中可能存在從Eden複製過來的物件和從另一個Survivor複製過來的物件,而且Survivor總有一個是空的,而且可以配置多餘兩個

Spark的JVM調優

  • 降低cache操作的記憶體佔比 
    Spark task執行運算元函式時會生成大量物件,這些物件會被放入年輕代中,當年輕代記憶體比較小時,會導致年輕代中Eden區和Survivor區頻繁記憶體溢滿,導致頻繁的minor GC,而頻繁的minorGC或導致一些存活的短宣告週期(其實就是在後面用不到的物件)物件直接放入老年代中,而當老年代記憶體溢滿是,則會導致Full GC 
    full gc / minor gc,無論是快,還是慢,都會導致jvm的工作執行緒停止工作,簡而言之,就是說,gc的時候,spark停止工作了。等著垃圾回收結束。 
    總而言之,上面的情況都是由記憶體不足引起的即記憶體不充足的時候,問題: 
    1、頻繁minor gc,也會導致頻繁spark停止工作 
    2、老年代囤積大量活躍物件(短生命週期的物件),導致頻繁full gc,full gc時間很長,短則數十秒,長則數分鐘,甚至數小時。可能導致spark長時間停止工作。 
    3、嚴重影響咱們的spark的效能和執行的速度。

如何增大記憶體? 
Spark中,堆記憶體又被劃分成了兩塊,一塊是專門用來給RDD的cache,persist操作進行RDD快取用的,另一塊就是用來給Spark運算元函式用的,存放函式中自己建立的物件 
預設情況下,給RDD的cache操作的記憶體佔比是0.6,即百分之六十的記憶體用來給RDD做快取用,但其實RDD並不需要這麼大的記憶體,我們可以通過檢視每個stage中每個task執行的時間,GC時間等來判斷是否發生了頻繁的minorGC和fullGC,從而來調低這個比例 
調節方法

spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
  • Ececutor堆外記憶體 

當Spark處理超大資料量時(數十億,百億級別),executor的堆外記憶體可能會不夠用,出現shuffle file can’t find, task lost,OOM等情況 
預設情況下,這個堆外記憶體是300M,當執行超大資料量時,通常會出現問題,因此需要調節到1G,2G,4G等大小 
調節方法必須在spark-submit提交指令碼中設定而不能在程式中設定

--conf spark.yarn.executor.memoryOverhead=2048
  • GC引起的連線等待時長 
    Spark在處理超大資料量時,task可能會建立很大很多的物件,頻繁的讓JVM記憶體溢滿,導致頻繁GC,而前面提到過executor獲取資料優先的從本地關聯的blockmanager獲取,如果沒有的話,會通過transferService去遠端連線其他executor的blockmanager,如果正好碰到那個executor垃圾回收,那麼程式就會卡住,spark預設網路連線時長是60s,當超過60s沒有獲取到資料,則直接宣告任務失敗,也有可能DAGscheduler反覆提交幾次stage,TaskScheduler反覆提交task,則會大大影響spark執行速度,所以可以考慮適當調節等待時長 
    調節方式同調節堆外記憶體一樣,必須在提交spark程式的指令碼中設定
--conf spark.core.connection.ack.wait.timeout=300

 

9. shuffle 調優

shuffle原理

什麼情況下發生shuffle? 
在Spark中,主要有以下幾個運算元 
- groupByKey : 把分佈在各個節點上的資料中的同一個key對應的value都集中到一塊兒,集中到叢集中的一個節點中,也即是集中到一個節點的executor的一個task中 
- reduceByKey : 運算元函式對values集合進行reduce操作,最後生成一個value 
- countByKey : 在一個task中獲取同一個key對應的所有value,然後計數,統計總共有多少value 
- join : 兩個RDD,Key相同的兩個value都集中到一個executor的task中 
shuffle過程: 
image_1bh9qd7te2551p4n1itf1p42eij13.png-73.1kB
在一個shuffle過程中,前半部分stage中,每個task都會建立後半部分stage中相同task數量的檔案,比如stage後半部分有100個task,那麼前半部分的每個task都會建立100個檔案(先寫入到記憶體緩衝中,後溢滿寫入到磁碟),會將同一個key對應的values寫入同一個檔案中,shuffle後半部分的stage中的task,每個task都會從各個節點的task建立其中一份屬於自己的那份檔案中,拉取屬於自己的key-value對,然後task會有一個記憶體緩衝區,然後呼叫HashMap進行key-values的聚合,最終呼叫我們定義的聚合函式來進行相應的操作

shuffle調優之合併map端輸出檔案

預設情況下,Spark是不開啟合併map端輸出檔案機制的,所以當分批次執行task時,每批的task都會建立新的檔案,而不會共用,大大影響了效能,所以當有大量map檔案生成時,需要開啟該機制 
設定方法

new SparkConf().set("spark.shuffle.consolidateFiles", "true")

設定合併機制之後: 
第一個stage,並行執行2個task,執行這兩個task時會建立下一個stage的檔案,執行完之後,會執行下一批次的2個task,而這一批次的task則不會建立新的檔案,會複用上一批次的task建立的檔案 
第二stage的task在拉取上一個stage建立的檔案時就不會拉取那麼多檔案了,而是拉取少量檔案,每個輸出檔案都可能包含了多個task給自己的map端輸出

shuffle調優之map端記憶體緩衝和reduce記憶體佔比

預設情況下: 
每個task的記憶體緩衝為32kb,reduce端記憶體佔比為0.2(即預設executor記憶體中劃分給reduce task的微20%) 
所以在不調優的情況下,如果map端task處理的比較大,記憶體不足則溢滿寫入磁碟 
比如: 
每個task就處理320kb,32kb,總共會向磁碟溢寫320 / 32 = 10次。 
每個task處理32000kb,32kb,總共會向磁碟溢寫32000 / 32 = 1000次。 
同理,ruduce端也一樣 
何時調優? 
通過Spark UI檢視shuffle磁碟的read和write是不是很大,如果很大則應相應調優 
如何調優?

spark.shuffle.file.buffer : 32kb -> 128kb 
spark.shuffle.memoryFraction: 0.2 -> 0.3

 

10. Spark運算元調優之MapPartitions

MapPartitions提升Map類操作效能 
spark中,最基本的原則是每個task處理一個RDD的partition 
如果是普通的Map,假如一個partition中有一萬條資料,那麼map中的function就要執行和計算一萬次,但是使用MapPartitions操作之後,一個task只會執行一次function,function一次接收了所有partition資料,效能比較高 
MapPartitions的缺點: 
如果是普通的Map,一條一條處理資料,當出現記憶體不夠的情況時,那麼就可以將已經處理掉的資料從記憶體裡面垃圾回收掉,所以普通map通常不會出現OOM情況 
如果是MapPartitions,對於大量資料來說,如果一個partiton資料有一百萬條,一次性傳入function之後,可能導致記憶體不足,但是又沒辦法騰出空間,直接就導致了記憶體溢位,OOM 
所以,當使用MapPartitons運算元時,要估算每個partiton的資料能不能一下子快取到分配給executor的記憶體中,如果可以,就是用該運算元,對效能有顯著提升

11. Spark運算元調優之filter過後使用coalesce減少分割槽數量

image_1bha894gat251o2h1rpnlu57743h.png-38kB 
spark程式,通常情況下,RDD在經過filter之後,會出現兩個情況:

  • 每個partition內的資料可能就不太一樣,有的很多有的很少 
    這樣會導致後面在處理這些partition資料的時候,每個task處理的資料量相差懸殊,最終導致很嚴重的問題->資料傾斜

  • partition資料量減少 
    由於partition資料量減少,但是在後面進行處理的時候,還是按照跟partition相同數量的task來進行處理,這就導致的資源浪費

針對上面兩個問題,需要使用coalesce運算元來處理,該運算元能壓縮partiton的數量,減少partiton的資料量,而且讓每個partition的資料量都儘量均衡緊湊,從而便於後續task進行處理,從某種程度上提升spark程式的效能

12. Spark運算元調優之foreachPartition優化寫資料庫效能

foreach是對每條資料進行處理的,task對partition中的每一條資料都會執行function操作,如果function中有寫資料庫的操作,那麼有多少條資料就會建立和銷燬多少個數據庫連線,這對效能的影響很大 
在生產環境中,通常都是使用foreachPartition來寫資料庫的,使用了該運算元之後,對於使用者自定義的function函式,就呼叫一次,一次傳入一個partition的所有資料,這裡只需建立一個數據庫連線,然後向資料庫傳送一條sql語句外加多組引數即可,但是這個時候要配合資料庫的批處理 
同樣,該運算元在超大資料量面前同樣會出現OOM情況

13. Spark運算元調優之使用repartition解決SparkSQL低並行度效能問題

通常情況下,在上面第二條的並行度調優時,使用spark.default.parallelism來設定並行度,那麼這個設定在什麼地方有效,什麼地方無效? 
當程式中沒有使用SparkSQL,那麼整個sparkapplication的所有stage的並行度都是設定的這個引數,除非使用了coalesce運算元縮減過partition. 
當程式中使用了SparkSQL,那麼SparkSQl的那麼stage的並行度無法設定,因為SparkSQL會預設的根據Hive表對應的hdfs檔案的block,自動設定SparkSQL那個stage的並行度,所以這就導致出現了用少量task來處理複雜邏輯的情況, 這種情況下,需要使用repartition來設定SparkSQL的並行度,即對從Hive中讀取出來的RDD,使用repartiton重新分割槽為預期的數量來設定並行度

14.Spark運算元調優之reduceByKey的本地聚合

reduceByKey相對於普通的shuffle操作(比如groupByKey),它的一個重要特點就是map端的本地聚合,見圖: 
image_1bhadbmvb4v8b48lva1jmb1eum3u.png-39.7kB 
在map端,給下個stage的每個task建立的輸出檔案中,寫資料之前,會進行本地的combiner操作,也就是說,對每一個key,對應的values都會執行使用者自定義的運算元函式,比如+_,當進行了這個combiner操作之後,減少了資料量,也即是減少了磁碟IO,同時減少了網路傳輸,對效能有明顯提升,所以,在實際的專案中,能用reduceByKey實現的就儘量用該運算元實現

資料傾斜

資料傾斜原理及現象分析

資料傾斜是Spark中極其影響效能的現象,它甚至能導致程式無法跑完,更不用提效能調優什麼的了 
資料傾斜如何產生的? 
在shuffle操作的時候,是按照key來進行value的資料的輸出,拉取和聚合的,同一個key的values,一定是分配到同一個reduce task進行處理的,假如多個key對應的value一共有90萬條資料,但是可能某條key對應了88萬條,其他key最多也就對應數萬條資料,那麼處理這88萬條資料的reduce task肯定會特別耗費時間,甚至會直接導致OOM,這就是所謂的資料傾斜

資料傾斜解決方案

1.聚合源資料

Spark的資料來源通常情況下都是來自於Hive表,(HDFS或其它大資料分散式系統),而Hive本身就是適合做離線資料分析的,所以說通常要變換一下思路,能在Hive中做聚合的,通常就可以跑定時任務在Hive中做聚合,最終spark拿到的只是每個key對應的一個value值,然後就可以使用map來對這個特殊的value串來處理,省去了groupByKey的過程

2.過濾掉導致傾斜的key

這種情況只適合使用者能夠接受摒棄某些特殊的資料,比如大部分key都對應了幾十萬條,而少數key只對應了幾十條,那麼直接在Hive中過濾掉這些key就從源頭上避免了資料傾斜

3.提高shuffle操作reduce端並行度

提高shuffle操作reduce端並行度會有更多task來處理資料,那麼每個task處理的資料會相對來說更少一些 
如何操作? 
給shuffle運算元傳遞進去一個引數,即一個數字,這個數字就代表了shuffle操作時reduce端的並行度,然後在進行shuffle操作的時候,就會對應建立指定數量的reduce task

4.使用隨機key實現雙重聚合

image_1bhblioiiqqmabg1kgml6pirn9.png-23.8kB 
這個場景主要用於reduceByKey,groupByKey,而非join 
主要原理就是: 
在第一輪聚合時,對key進行打散,將原先一樣的key,變成不一樣的key,相當於將每個key分組,然後針對key的多個分組,進行key的區域性聚合,接著再去掉key的字首,然後對所有key進行全域性聚合,這種方案對解決這兩個運算元產生的資料傾斜有比較好的效果

5.join運算元操作的資料傾斜解決方案

將reduce join轉換為map join

示例程式碼:

        List<Tuple2<Long, Row>> userInfos = userid2InfoRDD.collect();
        final Broadcast<List<Tuple2<Long, Row>>> userInfosBroadcast = sc.broadcast(userInfos);

        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2PartAggrInfoRDD.mapToPair(

                new PairFunction<Tuple2<Long,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                            throws Exception {
                        // 得到使用者資訊map
                        List<Tuple2<Long, Row>> userInfos = userInfosBroadcast.value();

                        Map<Long, Row> userInfoMap = new HashMap<Long, Row>();
                        for(Tuple2<Long, Row> userInfo : userInfos) {
                            userInfoMap.put(userInfo._1, userInfo._2);
                        }

                        // 獲取到當前使用者對應的資訊
                        String partAggrInfo = tuple._2;
                        Row userInfoRow = userInfoMap.get(tuple._1);

                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);

                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;

                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }

                });

 

原理圖: 
image_1bhbn0j7812urii3oa4tt51q0lm.png-34.9kB 
說明: 
普通的join,肯定是要走shuffle,那麼,既然走shuffle,那麼普通的join肯定是reduce join, 即將所有相同的key對應的values,聚合到一個task中,再進行join操作 
那麼如何將reduce join轉換為map join? 
當兩個RDD要進行join時,其中一個RDD是比較小的,那麼就可將該小資料量RDD廣播出去,該RDD資料將會在每個executor的blockmanager中駐留一份資料,然後在map操作中就可以使用該資料,這種方式下,根本就不會發生shuffle操作,從而從根本上杜絕了資料傾斜

sample取樣傾斜key進行兩次join

示例程式碼

        JavaPairRDD<Long, String> sampledRDD = userid2PartAggrInfoRDD.sample(false, 0.1, 9);

        JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(

                new PairFunction<Tuple2<Long,String>, Long, Long>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                            throws Exception {
                        return new Tuple2<Long, Long>(tuple._1, 1L);
                    }

                });

        JavaPairRDD<Long, Long> computedSampledRDD = mappedSampledRDD.reduceByKey(

                new Function2<Long, Long, Long>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Long call(Long v1, Long v2) throws Exception {
                        return v1 + v2;
                    }

                });

        JavaPairRDD<Long, Long> reversedSampledRDD = computedSampledRDD.mapToPair(

                new PairFunction<Tuple2<Long,Long>, Long, Long>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                            throws Exception {
                        return new Tuple2<Long, Long>(tuple._2, tuple._1);
                    }

                });

        final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;  

        JavaPairRDD<Long, String> skewedRDD = userid2PartAggrInfoRDD.filter(

                new Function<Tuple2<Long,String>, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                        return tuple._1.equals(skewedUserid);
                    }

                });

        JavaPairRDD<Long, String> commonRDD = userid2PartAggrInfoRDD.filter(

                new Function<Tuple2<Long,String>, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                        return !tuple._1.equals(skewedUserid);
                    }

                });

        JavaPairRDD<String, Row> skewedUserid2infoRDD = userid2InfoRDD.filter(

                new Function<Tuple2<Long,Row>, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<Long, Row> tuple) throws Exception {
                        return tuple._1.equals(skewedUserid);
                    }

                }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterable<Tuple2<String, Row>> call(
                            Tuple2<Long, Row> tuple) throws Exception {
                        Random random = new Random();
                        List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();

                        for(int i = 0; i < 100; i++) {
                            int prefix = random.nextInt(100);
                            list.add(new Tuple2<String, Row>(prefix + "_" + tuple._1, tuple._2));
                        }

                        return list;
                    }

                });

        JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(

                new PairFunction<Tuple2<Long,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                            throws Exception {
                        Random random = new Random();
                        int prefix = random.nextInt(100);
                        return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
                    }

                }).join(skewedUserid2infoRDD).mapToPair(

                        new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {

                            private static final long serialVersionUID = 1L;

                            @Override
                            public Tuple2<Long, Tuple2<String, Row>> call(
                                    Tuple2<String, Tuple2<String, Row>> tuple)
                                    throws Exception {
                                long userid = Long.valueOf(tuple._1.split("_")[1]);  
                                return new Tuple2<Long, Tuple2<String, Row>>(userid, tuple._2);  
                            }

                        });

        JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(userid2InfoRDD);

        JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = joinedRDD.mapToPair(

                new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<Long, Tuple2<String, Row>> tuple)
                            throws Exception {
                        String partAggrInfo = tuple._2._1;
                        Row userInfoRow = tuple._2._2;

                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);

                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;

                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }

                });

 

這個方案的實現思路關鍵之處在於: 
將發生資料傾斜的key單獨拉出來,放到一個RDD中,然後用這個原本會產生資料傾斜的key RDD和其他RDD單獨去join一下,這個時候,key對應的資料,可能就會分散到多個task中進行join操作 
,如果該傾斜key與其他key混合在一起中,肯定會導致這個key對應的所有資料都聚合到一個task中,然後導致資料傾斜 
image_1bhcr0hpm1a9ghsi1mq71v4v128i13.png-51.3kB
最後,當取出來這個傾斜key之後,還可以通過增加隨機數字首的方法進行優化,如上面程式碼中: 
針對取出來的這個資料傾斜key,從它要join的另一個RDD中過濾出來此key對應的資料,有可能只有一條,那麼就為該條資料的RDD使用flatmap運算元,打上100個隨機數,作為字首,返回一百條資料,然後對該資料傾斜key的RDD,給每條資料也打上一個100以內的隨機數作為字首,再去join操作,效能會提升很多,join之後使用map操作去除字首,然後再與非傾斜key join的結果進行union,即可得到預期結果 
注意:當資料中產生資料傾斜的key很多時,就不適合使用這種方案了,需要考慮下一種方案

使用隨機數以及擴容表近join

該方案其實對資料傾斜的一種緩解,而非解決,它的思路是: 
選擇一個RDD,使用flatmap進行擴容,將每條資料對映為多條資料,每個映射出來的資料,都帶了一個n以內的隨機數,通常是10以內的,對另外一個RDD,做普通的map對映操作,每條資料都打上10以內的隨機數,然後將兩個處理後的RDD,進行join操作 
示例程式碼

        JavaPairRDD<String, Row> expandedRDD = userid2InfoRDD.flatMapToPair(

                new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)
                            throws Exception {
                        List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();

                        for(int i = 0; i < 10; i++) {
                            list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));
                        }

                        return list;
                    }

                });

        JavaPairRDD<String, String> mappedRDD = userid2PartAggrInfoRDD.mapToPair(

                new PairFunction<Tuple2<Long,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                            throws Exception {
                        Random random = new Random();
                        int prefix = random.nextInt(10);
                        return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);  
                    }

                });

        JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);

        JavaPairRDD<String, String> finalRDD = joinedRDD.mapToPair(

                new PairFunction<Tuple2<String,Tuple2<String,Row>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<String, Tuple2<String, Row>> tuple)
                            throws Exception {
                        String partAggrInfo = tuple._2._1;
                        Row userInfoRow = tuple._2._2;

                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);

                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;

                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }

                });

 

該方案的缺點就是,因為我們本身的RDD資料量非常大,無法進行大量擴容,一般也就10倍,所以這個方案也僅僅對資料傾斜起到了緩解作用

troubleshooting

troubleshooting之控制shuffle reduce端緩衝大小以避免OOM

Map端的task是不斷的輸出資料的,資料量可能是很大的,但是Reduce端的task並不是等到map端task將屬於自己的那份資料全部寫入磁碟檔案之後才去拉取的,實際是,map寫一點資料,reduce端的task就會去拉取一點資料,然後立即進行後面的聚合,運算元函式的應用 
Reduce端task每次拉取多少資料是由buffer來決定的,因為拉取過來的資料都是先放在buffer的,然後才用後面的executor分配的堆記憶體佔比 
那麼問題就出在這個buffer上面: 
該buffer預設為48M,大部分情況下,reduce task不會拉取很多的資料,一般10M左右就計算處理了,所以大多數時候,不會出現什麼問題,但是有的時候,map端資料量非常大,寫出的速度特別快,reduce端的所有task拉取資料的時候,全部達到自己緩衝的極限值,也就是48M全部填滿,這個時候,再加上聚合函式的程式碼,可能會建立大量的物件,可能一下子會把記憶體爆掉,導致程式直接崩潰 
針對這個情況,就應該減少reduce端task緩衝的大小,寧願多拉取幾次,也不一下子拉取很多資料,犧牲效能換取執行 
換言之,當叢集資源充足,map端資料量又不是很大的情況下,就可以考慮調大此緩衝大小來提高效能 
調節方式:

spark.reducer.maxSizeInFlight,48
spark.reducer.maxSizeInFlight,24

troubleshooting之解決JVM的GC導致拉取檔案失敗

在Spark作業中,有一種錯誤非常普遍,即shuffle file not found...... 
解析: 
executor的JVM程序,可能記憶體不夠,那麼此時就會執行GC,minorGC 和 FullGC,無論哪種,一旦發生了GC,就會導致executor內,所有的工作執行緒全部停止,比如BlockManager,基於netty的網路通訊,而下一個stage的executor可能還沒有停止掉,task想要去上一個stage的的task所在的executor去拉取屬於自己的資料,結果由於對方在GC,就導致了拉取了半天的資料沒有拉取到,就直接報出shuffle file not found...的錯誤,可能下一個stage又重新提交了stage或task之後,再執行就沒有問題了 
如何避免上述情況呢? 兩個引數:

spark.shuffle.io.maxRetries 3
spark.shuffle.io.retryWait 5s

第一個引數的意思是shuffle檔案拉取的時候,如果沒有拉取到或拉取失敗,最多會重試幾次,第二個引數的意思是每一次重新拉取檔案的時間間隔是5s 
所以說,當上一個stage的executor正在發生漫長的FullGC,導致第二個stage的executor嘗試去拉取檔案,而拉取不到,預設情況下,會反覆重試3次,每次間隔是5秒鐘,最多隻會等待3*5=15秒,如果15秒沒有拉取到資料,就會報出shuffle file not found... 錯誤 
所以當程式頻繁報該錯誤時,需要調節大這兩個引數

troubleshooting之解決YARN佇列資源不足導致的Application連線失敗

當提交一個Spark程式時,如果沒有足夠的資源,一般會出現兩種情況(根據叢集的配置,hadoop的版本等情況不同): 
一是Yarn發現資源不足時,不會Hang在那裡,而是直接列印fail日誌,直接fail 
二是Yarn發現資源不足,會Hang在那裡直到有足夠的資源來執行 
另外,當某個spark程式耗時比較長時,如果在同一個佇列中再提交一個耗時短的spark程式,那麼耗時短的會等很長時間,會很不合適,所以要考慮多個排程佇列 
在CDH叢集中,可以設定多個排程佇列,動態資源選項新建,然後在J2EE中,通過執行緒池的方法(一個執行緒池對應一個資源佇列)來實現上述方案

ExecutorService threadPool = Executors.newFixedThreadPool(1);
threadPool.submit(new Runnable() {
@Override
public void run() {
}
});

troubleshooting之解決各種序列化導致的報錯

序列化報錯的出現:當用client模式去提交spark作業,檢視本地打印出來的log,如果出現了Serializable,Serialize等字眼,那麼就是序列化導致的報錯 
處理方法:

1.運算元函式裡面如果使用到了外部自定義型別的變數,那麼自定義的型別必須是可序列化的

2.如果要將自定義的型別作為RDD的元素型別,那麼自定義的型別必須也是可序列化的

JavaPairRDD<Integer, Teacher> teacherRDD
JavaPairRDD<Integer, Student> studentRDD
studentRDD.join(teacherRDD)

public class Teacher implements Serializable {

}
public class Student implements Serializable {

}

3.不能在上述兩種情況下,使用一些第三方的,不支援序列化的型別

比如Connection conn = ....

troubleshooting之解決運算元函式返回NULL導致的問題

在某些運算元函式裡面,是需要有一個返回值的,但是有時候我們不想有返回值,如果直接返回NULL的話,會報錯的,遇到這種情況,在返回的時候,需要返回一些特殊值,比如”-999”,那麼通過這個運算元得到的RDD,再經過filter過濾操作,過濾掉就可以了

troubleshooting之解決yarn-client模式導致的網絡卡流量激增問題

當使用yarn-client模式提交spark程式時,driver是啟動在本地機器的,而且driver是全權負責所有的任務的排程的,也就是說,要跟yarn叢集上執行的多個executor進行頻繁的通訊,有可能導致網絡卡流量激增,那麼解決辦法就是使用yarn-cluster模式,這樣就不是本地的機器的driver來進行task排程了

troubleshooting之解決yarn-cluster模式的JVM記憶體溢位無法執行問題

有的時候,執行一些包含了sparkSQL的spark作業,可能會碰到yarn-client模式下提交可以正常執行,但是yarn-cluster模式下可能無法執行,報出JVM的PermGen永久代的記憶體溢位,即OOM 
出現這種情況的原因: 
yarn-client模式下,driver是執行在本地機器上的,spark使用的JVM的PermGen的配置,是本地的spark-class檔案(spark客戶端預設是有配置的),JVM永久代的大小是128M,但是在yarn-cluster模式下,driver是執行在叢集的某個節點上的,使用的是沒有經過配置的預設值82M 
sparkSQL,它的內部是要進行很複雜的SQL的語義解析,語法樹的轉換等等,特別複雜,在這種複雜的情況下,如果說sql本身特別複雜的話,很可能會導致效能的消耗,記憶體的消耗,對PermGen佔用比較大,所以會出現這種情況 
如何解決這種情況:

//yarn-cluster模式下
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

 

最後 
sparkSQL,如果有大量的or語句,比如有成百上千的時候,此時可能就會出現一個driver端的jvm stack overflow,即JVM棧記憶體溢位的問題 
JVM棧記憶體溢位,基本上就是由於呼叫的方法層級過多,因為產生了大量的,非常深的,超出了JVM棧深度限制的遞迴,所以這個時候可能要將sql語句拆分為多個sql子句來執行

troubleshooting之錯誤的持久化方式及checkpoint的使用

持久化方式

userRDD,如果想要對這個RDD做一個cache,希望能在後面多次使用這個RDD的時候,不用反覆重新計算RDD,而是可以直接使用各個節點的executor的BlockManager管理的記憶體/磁碟上的資料 
錯誤方式:

uesrRDD.cache()
userRDD.count()
userRDD.take()

上面會持久化方式不會生效,反而會報file not found的錯誤 
正確方式:

userRDD=userRDD.cache()
val cacheuserRDD=userRDD.cache()

 

checkpoint使用

1.在程式碼中,用SparkContext設定一個checkpoint目錄,可以是一個容錯檔案系統的目錄,比如HDFS 
2.在程式碼中,對需要進行checkpoint的RDD,執行RDD.checkpoint() 
3.RDDCheckpointData(Spark內部的API),會接管該RDD,標記為marked for checkpoint,準備進行checkpoint 
4.job執行完之後,會呼叫一個finalRDD.doCheckpoint()方法,會順著RDD lineage,回溯掃描,發現有標記為待checkpoint的RDD,就會進行二次標記,inProgressCheckpoint,即正在接受checkpoint操作 
5.job執行完後,會啟動一個內部的新的job,去將標記為inProgressCheckpoint的RDD的資料,都寫入hdfs檔案中(如果rdd之前cache過,會直接從快取中獲取資料,寫入hdfs中,如果沒有cache過,那麼就會重新計算一遍這個RDD,再checkpoint) 
6.將checkpoint過的RDD之前的依賴RDD,改成一個CheckpointRDD*,強制改變RDD的lineage,後面如果RDD的cache資料獲取失敗,直接會通過它的上游CheckpointRDD,去容錯的檔案系統中,比如hdfs,獲取Checkpoint資料 
Checkpoint其實是cache的一個備胎

 

參考:https://blog.csdn.net/vinfly_li/article/details/79415342