1. 程式人生 > >Spark調優基本策略

Spark調優基本策略

1. RDD的持久化
    cahce()
    persist()
    checkpoint()
2. 避免建立重複的RDD
     儘可能複用同一個RDD,類似於多個RDD的資料有重疊或者包含的情況,應該儘量複用一個RDD,以儘可能減少RDD的數量,從而減少運算元計算次數
4.儘量避免使用shuffle類運算元
    spark執行過程中,最消耗效能的地方就是shuffle過程(簡單說,就是將分佈在叢集中多個節點上的同一個key拉取到同一個節點上進行操作)
    shuffle過程中,各個節點上相同的key都會先寫入本地磁碟檔案中,然後其他節點需要通過網路傳輸拉取各個節點上的磁碟檔案中的相同的key,而且相同key都拉取到同一個節點進行聚合操作時,還可能會因為一個節點上處理的key過多,導致記憶體不夠存放,從而溢寫到磁碟檔案中。
    磁碟IO和網路資料傳輸也是shuffle效能較差的主要原因。
    儘量使用map類的非shuffle運算元。
    repartition(重分割槽)類操作:repartition、repartitionAndSortWithinPartitions、coalesce等
    bykey類操作:reduceByKey、groupByKey、sortByKey等
    join類操作:join、cogroup等
    類如:join –>Broadcast+map  (Broadcast:資料量小於1G內的RDD,每份資料放入executor副本中)
5.使用map-side預聚合的shuffle操作
    因為業務需要,一定要使用shuffle操作,無法用map類運算元替代,儘量使用map-side預聚合的運算元。(在每個節點本地對相同的key進行一次聚合操作)
    map-side預聚合之後,每個節點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的資料數量,從而也就減少了磁碟IO以及網路傳輸開銷。
    通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey運算元來替代掉groupByKey運算元。因為reduceByKey和aggregateByKey運算元都會使用使用者自定義的函式對每個節點本地的相同key進行預聚合。而groupByKey運算元是不會進行預聚合的,全量的資料會在叢集的各個節點之間分發和傳輸,效能相對來說比較差。
   groupByKey中 shuffle操作中沒有預聚合操作
6.使用高效能的運算元
   除了shuffle相關的運算元有優化原則外,其他的運算元也有相應的優化原則。
     a . 使用mapPartitions替代map。一次函式呼叫會處理一個partition所有的資料,而不是一次函式呼叫處理一條。但是可能會出現記憶體溢位(OOM)問題。
     b .  使用foreachPartitions提到foreach。
     c . 使用filter之後進行coalesce操作。使用filter後,建議使用coalesce運算元,手動減少RDD的資料量,將RDD的資料壓縮到更少的partition中去。  (coalesce:RDD的分割槽進行重新劃分,repartition只是coalesce介面中shuffle為true的簡易實現)
     d . 使用repartitionAndSortWithinPartitions替代repartition與sort類操作。如果需要在repartition重分割槽後,還有進行排序,建議使用repartitionAndSortWithinPartitions。因為該運算元可以一邊進行重分割槽的shuffle操作,一邊進行排序(同時進行)。
 7.廣播大變數
    在開發中,遇到需要在運算元函式中使用外部變數的場景(如配置檔案)(尤其是大變數,比如100M以上的大集合)。此時就應該使用Spark的廣播功能來提升效能
 在運算元函式中使用到外部變數時,預設情況下,spark會將變數複製多個副本,通過網路傳遞到task中,此時每個task都有一個變數副本。若使用的外部變數比較大,建議使用Spark的廣播功能,對該變數進行廣播。廣播後的變數會保證每個executor的記憶體中,只駐留一份變數副本,而executor中的task執行時共享該executor中的那份變數副本。從而大大減少變數副本的數量,減少網路傳輸的效能開銷,並減少對executor記憶體的佔用開銷。
8.使用kryo優化序列化效能
     JAVA序列化:是指把JAVA物件轉換位位元組序列的過程
     JAVA反序列化:是指把位元組序列恢復為JAVA物件的過程。
 當兩個程序進行遠端通訊時,可以相互發送各種型別的資料,包括文字、圖片、音訊、視訊等,而這些資料都會以二進位制序列的形式在網路上傳送。
 那麼當兩個JAVA程序進行通訊時,要實現程序間物件傳送就需要JAVA序列化和反序列化,也就是傳送方需要把這個JAVA物件轉換為位元組序列,然後再網路上傳送;另一方面,接收方需要從位元組序列中回覆出JAVA物件。
 JAVA序列化的好處有:實現了資料的持久化,通過序列化可以把資料永久地儲存到硬碟上;利用序列化實現遠端通訊,即在網路上傳送物件的位元組序列。
 spark中主要有三個地方涉及到了序列化:
      1.在運算元函式中使用到外部變數時,該變數會被序列化後進行網路傳輸
      2.將自定義的型別作為RDD的泛型型別(如JavaRDD,student是自定義型別),所有自定義型別物件都會進行序列化,這種情況下,也要求自定義的類必須實現Serializable介面
      3. 使用可序列化的持久化策略時(如MEMORY_ONLY_SER),spark會將RDD中每個partition都序列化成一個大的位元組陣列
      對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化。Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是spark同時支援使用Keyo序列化庫,Kryo序列化類庫的效能比Java序列化類庫的效能高很多。
        SparkConf().set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
        Scala版本:
                 val conf = new SparkConf().setMaster(…).setAppName(…)
                 conf.registerKryoClasses(Array(classOf[Counter] ))
                 val sc = new SparkContext(conf)
        Java版本:
                 SparkConf conf = new SparkConf().setMaster(…).setAppName(…)
                 conf.registerKryoClasses(Counter.class)
                 JavaSparkContext sc = new JavaSparkContext(conf)
        如果註冊的要序列化的自定義的型別,本身特別大,比如包含了超過100個field。那麼就會導致要序列化的物件過大。此時就需要對Kryo本身進行優化。因為Kryo內部的快取可能不夠存放那麼大的class物件。此時就需要呼叫SparkConf.set()方法,設定spark.kryoserializer.buffer.mb引數的值,將其調大。
        預設情況下它的值是2,就是說最大能快取2M的物件,然後進行序列化。可以在必要時將其調大。比如設定為10。
9.優化資料結構
         Java中,有三種類型比較耗費記憶體:
         1.物件:每個JAVA物件都有物件頭、引用等額外的資訊。比較佔用記憶體空間
         2.字串:每個字串內部都有一個字元陣列以及長度等額外資訊
         3.集合型別:比如HashMap、LinkedList等,因為集合型別內部通常會使用一些內部類來封裝集合元素,比如Map.Entry
原博主 fanyao4144 的CSDN 部落格:https://blog.csdn.net/fanyao4144/article/details/78759931