1. 程式人生 > >Spark 執行時常見異常及資料傾斜的解決方法

Spark 執行時常見異常及資料傾斜的解決方法

spark執行異常:
     現象1:
          有時會出現的一種情況非常普遍,在spark的作業中;shuffle file not found。(spark作業中,非常非常常見的)而且,有的時候,它是偶爾才會出現的一種情況。有的時候,出現這種情況以後,會重新去提交stage、task。重新執行一遍,發現就好了。沒有這種錯誤了。
     現象2:
          如果說,你是基於yarn來提交spark。比如yarn-cluster或者yarn-client。你可以指定提交到某個hadoop佇列上的。每個佇列都是可以有自己的資源的。
     比如在一個生產環境中的,給spark用的yarn資源佇列的情況:500G記憶體,200個cpu core。
     比如說,某個spark application,在spark-submit裡面你自己配了,executor,80個;每個executor,4G記憶體;每個executor,2個cpu core。你的spark作業每次執行,大概要消耗掉320G記憶體,以及160個cpu core。
     乍看起來,咱們的佇列資源,是足夠的,500G記憶體,280個cpu core。
     首先,第一點,你的spark作業實際執行起來以後,耗費掉的資源量,可能是比你在spark-submit裡面配置的,以及你預期的,是要大一些的。400G記憶體,190個cpu core。
     那麼這個時候,的確,咱們的佇列資源還是有一些剩餘的。但是問題是,如果你同時又提交了一個spark作業上去,一模一樣的。那就可能會出問題。
     第二個spark作業,又要申請320G記憶體+160個cpu core。結果,發現佇列資源不足。。。。
     此時,可能會出現兩種情況:(備註,具體出現哪種情況,跟你的YARN、Hadoop的版本,你們公司的一些運維引數,以及配置、硬體、資源肯能都有關係)
         1、YARN,發現資源不足時,你的spark作業,並沒有等在那裡,等待資源的分配,而是直接列印一行fail的log,直接就fail掉了。
         2、YARN,發現資源不足,你的spark作業,就h等在那裡。一直等待之前的spark作業執行完,等待有資源分配給自己來執行。
     採用如下方案:
         1、在你的J2EE(我們這個專案裡面,spark作業的執行,之前說過了,J2EE平臺觸發的,執行spark-submit指令碼),限制,同時只能提交一個spark作業到yarn上去執行,確保一個spark作業的資源肯定是有的。
         2、你應該採用一些簡單的排程區分的方式,比如說,你有的spark作業可能是要長時間執行的,比如執行30分鐘;有的spark作業,可能是短時間執行的,可能就執行2分鐘。此時,都提交到一個佇列上去,肯定不合適。
         很可能出現30分鐘的作業卡住後面一大堆2分鐘的作業。分佇列,可以申請(跟你們的YARN、Hadoop運維的同學申請)。你自己給自己搞兩個排程佇列。每個佇列的根據你要執行的作業的情況來設定。在你的J2EE程式裡面,
         要判斷,如果是長時間執行的作業,就乾脆都提交到某一個固定的佇列裡面去把;如果是短時間執行的作業,就統一提交到另外一個佇列裡面去。這樣,避免了長時間執行的作業,阻塞了短時間執行的作業。
         3、你的佇列裡面,無論何時,只會有一個作業在裡面執行。那麼此時,就應該用我們之前講過的效能調優的手段,去將每個佇列能承載的最大的資源,分配給你的每一個spark作業,比如80個executor;6G的記憶體;3個
         cpu core。儘量讓你的spark作業每一次執行,都達到最滿的資源使用率,最快的速度,最好的效能;並行度,240個cpu core,720個task。
         4、在J2EE中,通過執行緒池的方式(一個執行緒池對應一個資源佇列),來實現上述我們說的方案。
     現象3:用client模式去提交spark作業,觀察本地打印出來的log。如果出現了類似於Serializable、Serialize等等字眼,報錯的log,那麼恭喜大家,就碰到了序列化問題導致的報錯,雖然是報錯,但是序列化報錯,應該是屬於比較簡單的了,很好處理。
     序列化報錯要注意的三個點:
          1、你的運算元函式裡面,如果使用到了外部的自定義型別的變數,那麼此時,就要求你的自定義型別,必須是可序列化的。
              final Teacher teacher = new Teacher("leo");
              studentsRDD.foreach(new VoidFunction() {
                String teacherName = teacher.getName();
                ....
              }
              });
              public class Teacher implements Serializable {
              }
          2、如果要將自定義的型別,作為RDD的元素型別,那麼自定義的型別也必須是可以序列化的
              JavaPairRDD<Integer, Teacher> teacherRDD
              JavaPairRDD<Integer, Student> studentRDD
              studentRDD.join(teacherRDD)
              public class Teacher implements Serializable {
              }
              public class Student implements Serializable {
              }
          3、不能在上述兩種情況下,去使用一些第三方的,不支援序列化的型別
              Connection conn =
              studentsRDD.foreach(new VoidFunction() {
              public void call(Row row) throws Exception {
                conn.....
              }
              });
              Connection是不支援序列化的
     現象3:
          在運算元函式中,返回null
              //      return actionRDD.mapToPair(new PairFunction<Row, String, Row>() {
              //         private static final long serialVersionUID = 1L;
              //         @Override
              //         public Tuple2<String, Row> call(Row row) throws Exception {
              //            return new Tuple2<String, Row>("-999", RowFactory.createRow("-999"));
              //         }
              //      });

              大家可以看到,在有些運算元函式裡面,是需要我們有一個返回值的。但是,有時候,我們可能對某些值,就是不想有什麼返回值。我們如果直接返回NULL的話,那麼可以不幸的告訴大家,是不行的,會報錯的。
              Scala.Math(NULL),異常
              如果碰到你的確是對於某些值,不想要有返回值的話,有一個解決的辦法:
              1、在返回的時候,返回一些特殊的值,不要返回null,比如“-999”
              2、在通過運算元獲取到了一個RDD之後,可以對這個RDD執行filter操作,進行資料過濾。filter內,可以對資料進行判定,如果是-999,那麼就返回false,給過濾掉就可以了。
              3、大家不要忘了,之前咱們講過的那個運算元調優裡面的coalesce運算元,在filter之後,可以使用coalesce運算元壓縮一下RDD的partition的數量,讓各個partition的資料比較緊湊一些。也能提升一些效能。

     現象4:
          實踐經驗,碰到的yarn-cluster的問題:
          有的時候,執行一些包含了spark sql的spark作業,可能會碰到yarn-client模式下,可以正常提交執行;yarn-cluster模式下,可能是無法提交執行的,會報出JVM的PermGen(永久代)的記憶體溢位,OOM。
          yarn-client模式下,driver是執行在本地機器上的,spark使用的JVM的PermGen的配置,是本地的spark-class檔案(spark客戶端是預設有配置的),JVM的永久代的大小是128M,這個是沒有問題的;但是呢,在yarn-cluster模式下,driver是執行在yarn叢集的某個節點上的,使用的是沒有經過配置的預設設定(PermGen永久代大小),82M。
          spark-sql,它的內部是要進行很複雜的SQL的語義解析、語法樹的轉換等等,特別複雜,在這種複雜的情況下,如果說你的sql本身特別複雜的話,很可能會比較導致效能的消耗,記憶體的消耗。可能對PermGen永久代的佔用會比較大。
          所以,此時,如果對永久代的佔用需求,超過了82M的話,但是呢又在128M以內;就會出現如上所述的問題,yarn-client模式下,預設是128M,這個還能執行;如果在yarn-cluster模式下,預設是82M,就有問題了。會報出PermGen Out of Memory error log。
          解決方案:
             既然是JVM的PermGen永久代記憶體溢位,那麼就是記憶體不夠用。咱們呢,就給yarn-cluster模式下的,driver的PermGen多設定一些。
             spark-submit指令碼中,加入以下配置即可:
             --conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
             這個就設定了driver永久代的大小,預設是128M,最大是256M。那麼,這樣的話,就可以基本保證你的spark作業不會出現上述的yarn-cluster模式導致的永久代記憶體溢位的問題。
     spark資料傾斜,有兩種表現:
             1、你的大部分的task,都執行的特別特別快,刷刷刷,就執行完了(你要用client模式,standalone client,yarn client,本地機器主要一執行spark-submit指令碼,就會開始列印log),task175 finished;
             剩下幾個task,執行的特別特別慢,前面的task,一般1s可以執行完5個;最後發現1000個task,998,999 task,要執行1個小時,2個小時才能執行完一個task, 出現數據傾斜了
             還算好的,因為雖然老牛拉破車一樣,非常慢,但是至少還能跑。
             2、執行的時候,其他task都刷刷刷執行完了,也沒什麼特別的問題;但是有的task,就是會突然間,啪,報了一個OOM,JVM Out Of Memory,記憶體溢位了,task failed,task lost,resubmitting task。反覆執行幾次都
             到了某個task就是跑不通,最後就掛掉。 某個task就直接OOM,那麼基本上也是因為資料傾斜了,task分配的數量實在是太大了!!!所以記憶體放不下,然後你的task每處理一條資料,還要建立大量的物件。記憶體爆掉了。
             原因定位:
                根據log去定位出現數據傾斜的原因,基本只可能是因為發生了shuffle操作,在shuffle的過程中,出現了資料傾斜的問題。因為某個,或者某些key對應的資料,遠遠的高於其他的key。
                1、你在自己的程式裡面找找,哪些地方用了會產生shuffle的運算元,groupByKey、countByKey、reduceByKey、join
                2、看log log一般會報是在你的哪一行程式碼,導致了OOM異常;或者呢,看log,看看是執行到了第幾個stage!!!
                我們這裡不會去剖析stage的劃分演算法,(如果之前不瞭解,但是想要了解,建議先學習北風網的《Spark從入門到精通》),spark程式碼,是怎麼劃分成一個一個的stage的。哪一個stage,task特別慢,
                就能夠自己用肉眼去對你的spark程式碼進行stage的劃分,就能夠通過stage定位到你的程式碼,哪裡發生了資料傾斜
                去找找,程式碼那個地方,是哪個shuffle操作。
              結果方案:
                   一.聚合源資料
                   二.過濾導致傾斜的key
                   三.提高shuffle操作的reduce並行度(rdd.reduceByKey(new function(),1000)這樣寫就可以,這時reduce端的task就是1000個)
                   四.使用隨機key實現雙重聚合
                      1、使用場景
                        (1)groupByKey
                        (2)reduceByKey
                   五. 將reduce join 改成map join
                   六.sample取樣傾斜key單獨進行join
                         使用抽樣運算元:抽出10%或者更多的資料,然後從這些資料中找出key出現最多的那個,那他和需要join的物件進行join
                                       剩下的過濾掉這些key對應的資料,然後進行jion
                         原理:將key,從另外一個RDD中過濾出的資料,可能只有一條,或者幾條,此時,咱們可以任意進行擴容,擴成1000倍。
                               將從第一個RDD中拆分出來的那個傾斜key RDD,打上1000以內的一個隨機數。
                               這種情況下,還可以配合上,提升shuffle reduce並行度,join(rdd, 1000)。通常情況下,效果還是非常不錯的。
                               打散成100份,甚至1000份,2000份,去進行join,那麼就肯定沒有資料傾斜的問題了吧
                   七.採用隨機數和擴容表進行join解決數傾斜
                        缺點:這個方案是沒辦法徹底解決資料傾斜的,更多的,是一種對資料傾斜的緩解。
                        步驟:
                            1、選擇一個RDD,要用flatMap,進行擴容,將每條資料對映為多條資料,每個映射出來的資料,都帶了一個n以內的隨機數,通常來說會選擇10。
                            2、將另外一個RDD,做普通的map對映操作,每條資料都打上一個10以內的隨機數。
                            3、最後,將兩個處理後的RDD,進行join操作
                        侷限性:
                            1、因為你的兩個RDD都很大,所以你沒有辦法去將某一個RDD擴的特別大,一般咱們就是10倍。
                            2、如果就是10倍的話,那麼資料傾斜問題,的確是只能說是緩解和減輕,不能說徹底解決。