1. 程式人生 > >資料傾斜原理及解決方案

資料傾斜原理及解決方案

導讀

相信很多接觸MapReduce的朋友對'資料傾斜'這四個字並不陌生,那麼究竟什麼是資料傾斜?又該怎樣解決這種該死的情況呢?

何為資料傾斜?

在弄清什麼是資料傾斜之前,我想讓大家看看資料分佈的概念:

正常的資料分佈理論上都是傾斜的,就是我們所說的20-80原理:80%的財富集中在20%的人手中, 80%的使用者只使用20%的功能 , 20%的使用者貢獻了80%的訪問量 , 不同的資料欄位可能的資料傾斜一般有兩種情況:

一種是唯一值非常少,極少數值有非常多的記錄值(唯一值少於幾千)

一種是唯一值比較多,這個欄位的某些值有遠遠多於其他值的記錄數,但是它的佔比也小於百分之一或千分之一

資料傾斜:

資料傾斜在MapReduce程式設計模型中十分常見,用最通俗易懂的話來說,資料傾斜無非就是大量的相同key被partition分配到一個分割槽裡,造成了'一個人累死,其他人閒死'的情況,這種情況是我們不能接受的,這也違背了平行計算的初衷,首先一個節點要承受著巨大的壓力,而其他節點計算完畢後要一直等待這個忙碌的節點,也拖累了整體的計算時間,可以說效率是十分低下的。

資料傾斜發生時的現象: 

1、絕大多數task執行得都非常快,但個別task執行的極慢。 
2、原本能正常執行的Spark作業,某天突然爆出OOM(記憶體溢位)異常。觀察異常棧,是我們寫的業務程式碼造成的

資料傾斜發生的原理 :

在進行shuffle的時候,必須將各個節點上相同的Key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或者join操作。如果某個key對應的資料量特別大的話,會發生資料傾斜。比如大部分key對應的10條資料,但個別key卻對應了100萬條資料,那麼大部分task會只分配到10條資料,而個別task可能會分配了100萬資料。整個spark作業的執行進度是由執行時間最長的那個task決定的。 
因此出現數據傾斜的時候,spark作業看起來會執行得非常緩慢,甚至可能因為某個task處理的資料量過大導致OOM。

解決方案

1、增加jvm記憶體,這適用於第一種情況(唯一值非常少,極少數值有非常多的記錄值(唯一值少於幾千)),這種情況下,往往只能通過硬體的手段來進行調優,增加jvm記憶體可以顯著的提高執行效率。

2、增加reduce的個數,這適用於第二種情況(唯一值比較多,這個欄位的某些值有遠遠多於其他值的記錄數,但是它的佔比也小於百分之一或千分之一),我們知道,這種情況下,最容易造成的結果就是大量相同key被partition到一個分割槽,從而一個reduce執行了大量的工作,而如果我們增加了reduce的個數,這種情況相對來說會減輕很多,畢竟計算的節點多了,就算工作量還是不均勻的,那也要小很多。

3、自定義分割槽,這需要使用者自己繼承partition類,指定分割槽策略,這種方式效果比較顯著。

4、重新設計key,有一種方案是在map階段時給key加上一個隨機數,有了隨機數的key就不會被大量的分配到同一節點(小几率),待到reduce後再把隨機數去掉即可。

5、使用combinner合併,combinner是在map階段,reduce之前的一箇中間階段,在這個階段可以選擇性的把大量的相同key資料先進行一個合併,可以看做是local reduce,然後再交給reduce來處理,這樣做的好處很多,即減輕了map端向reduce端傳送的資料量(減輕了網路頻寬),也減輕了map端和reduce端中間的shuffle階段的資料拉取數量(本地化磁碟IO速率),推薦使用這種方法。

如何定位發生資料傾斜的程式碼 

1、資料傾斜只會發生在shuffle中,下面是常用的可能會觸發shuffle操作的運算元:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現數據傾斜時,可能就是程式碼中使用了這些運算元的原因 

2、通過觀察spark UI的節目定位資料傾斜發生在第幾個stage中,如果是用yarn-client模式提交,那麼本地是可以直接看到log的,可以在log中找到當前執行到了第幾個stage;如果用yarn-cluster模式提交,可以通過Spark Web UI 來檢視當前執行到了第幾個stage。此外,無論是使用了yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI 上深入看一下當前這個stage各個task分配的資料量,從而進一步確定是不是task分配的資料不均勻導致了資料傾斜。 

3、根據之前學的stage的劃分演算法定位到極有可能發生資料傾斜的程式碼

這是沒有發生傾斜的例子,若41ms為1h即表示發生傾斜。 
也可檢視屬於第幾個stage。

檢視導致資料傾斜的key的分佈情況 

1. 如果是Spark SQL中的group by、join語句導致的資料傾斜,那麼就查詢一下SQL中使用的表的key分佈情況。 
2. 如果是對Spark RDD執行shuffle運算元導致的資料傾斜,那麼可以在Spark作業中加入檢視key分佈的程式碼,比如RDD.countByKey()。然後對統計出來的各個key出現的次數,collect/take到客戶端列印一下,就可以看到key的分佈情況。

資料傾斜詳細解決方案

一、使用Hive ETL(提取、轉換和載入) 預處理資料 

方案使用場景: 
導致資料傾斜的是Hive表。如果該Hive表中的資料本身很不均勻,而且業務場景需要頻繁的使用Spark對Hive表執行某個分析操作,那麼比較適合使用這種技術方案。 
思路:
此時可以評估,是否可以通過Hive來進行資料預處理。即通過Hive ETL 預先對資料按照Key進行聚合,或者是預先和其他表進行join,然後再Spark作業中針對的資料來源就是預處理後的Hive表。此時由於資料已經預先進行過聚合或者join操作了,那麼在Spark作業中也就不需要使用原先的shuffle類運算元執行這類操作了。 
原理:
從根源上解決了資料傾斜,因為徹底避免了在Spark中執行shuffle類運算元。 
但是因為畢竟資料本身就存在分佈不均勻的問題,所以在Hive ETL中進行groubBy或者join等shuffle操作時,還是會發生資料傾斜,導致Hive ETL速度很慢。只是避免了Spark程式發生資料傾斜。 
經驗:
在一些Java系統與Spark結合使用的專案中,會出現Java程式碼頻繁呼叫Spark作業的場景,而且對Spark作業的執行效能要求很高,就比較適合使用這種方案。將資料傾斜提前到上游的Hive ETL,每天僅執行一次,只有那一次是比較慢的,而之後每次Java呼叫Spark作業時,執行速度都會很快,能夠提供更好的使用者體驗。

二、過濾少數導致傾斜的key 

方案使用場景:
若發現導致傾斜的key就少數幾個,並且對計算本身的影響並不大。比如99%的key對應10條資料,但只有一個key對應100萬資料。 
思路:
若判斷少數幾個資料量特別多的key對作業的執行和計算結果不是那麼特別重要,可以直接過濾掉那幾個key。如在Spark SQL中就可以使用where子句過濾掉這些key,或者在Spark Core 中對RDD執行filter運算元過濾掉這些key。如果需要每次作業執行時,動態判定哪些key的資料量最多然後過濾,可以使用sample運算元對RDD進行取樣,然後計算每個key的數量,取資料量最多的key過濾即可。 
缺點:
適用場景不多,大多數情況下,導致傾斜的key還是很多的,並不是只有少數幾個。

三、提高shuffle操作的並行度 

方案使用場景:
若我們必須要面對資料傾斜問題,要這麼使用。 
思路:
在對RDD執行shuffle運算元時,給shuffle運算元傳入一個引數,如reduceByKey(1000),該引數設定了這個shuffle運算元執行時shuffle read task 的數量。對於Spark SQL中的shuffle類語句,如 groupBy 、join 等需要設定一個引數,即spark.sql.shuffle.partitions。該引數代表了shuffle read task 的並行度,預設值是200。 
原理:
增加shuffle read task 的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的資料。舉例來說,如果原本有5個key,每個key對應10條資料,這5個key都是分配給一個task的,那麼這個task就要處理50條資料。而增加了shuffle read task以後,每個task就分配到一個key,即每個task就處理10條資料,那麼自然每個task的執行時間都會變短了。 

實現起來比較簡單,可以有效緩解和減輕資料傾斜的影響。 
只是緩解了資料傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。

四、兩階段聚合(區域性聚合+全域性聚合) 

方案使用場景:
對RDD執行reduceByKey等聚合類shuffle運算元或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。 
思路: 
這個方案的核心實現思路就是進行兩階段聚合。第一次是區域性聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數後的資料,執行reduceByKey等聚合操作,進行區域性聚合,那麼區域性聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然後將各個key的字首給去掉,就會變成(hello,2)(hello,2),再次進行全域性聚合操作,就可以得到最終結果了,比如(hello, 4)。 

import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class AggWordCount11 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("AggWordCount11");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<String> list = Arrays.asList("you  jump","i    jump","you  jump","jump jump","jump jump","jump jump");
        JavaRDD<String> listRDD = sc.parallelize(list);
        JavaRDD<String> flatMap = listRDD.flatMap(new FlatMapFunction<String, String>() {

            public Iterable<String> call(String t) throws Exception {
                // TODO Auto-generated method stub
                return Arrays.asList(t.split("\t"));
            }

        });
        JavaPairRDD<String, Integer> wordRDD = flatMap.mapToPair(new PairFunction<String, String, Integer>() {

            public Tuple2<String, Integer> call(String t) throws Exception {

                return new Tuple2<String, Integer>(t,1);
            }
        });
        /**
         * 給rdd中的每一個key的字首都打上隨機數
         */
        JavaPairRDD<String, Integer> prefixRDD = wordRDD.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Integer>() {

            public Tuple2<String, Integer> call(Tuple2<String, Integer> t)
                    throws Exception {
               Random random = new Random();
               int prefix = random.nextInt(4);
                return  new Tuple2<String, Integer>(prefix+"_"+t._1,t._2);
            }

        });
        /**
         * 進行區域性聚合
         */
        JavaPairRDD<String, Integer> aggRDD = prefixRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {

            public Integer call(Integer v1, Integer v2) throws Exception {
                // TODO Auto-generated method stub
                return v1+v2;
            }
        });
        /**
         * 去除rdd中每個key的字首
         */
        JavaPairRDD<String, Integer> removePrefixRDD = aggRDD.mapToPair(new PairFunction<Tuple2<String,Integer>,String, Integer>() {

            public Tuple2<String, Integer> call(Tuple2<String, Integer> t)
                    throws Exception {
                String key = t._1.split("_")[1];
                return new Tuple2<String, Integer>(key,t._2);
            }
        });
        /**
         * 進行全域性聚合
         */

        removePrefixRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {

            public Integer call(Integer v1, Integer v2) throws Exception {
                // TODO Auto-generated method stub
                return v1+v2;
            }

        }).foreach(new VoidFunction<Tuple2<String,Integer>>() {

            public void call(Tuple2<String, Integer> t) throws Exception {
            System.out.println(t._1 + "    => "+t._2);

            }
        });

    }

}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object AggWordCount {
  def main(args: Array[String]): Unit = {
     val conf=new SparkConf().setMaster("local").setAppName("AggWordCount")
     val sc=new SparkContext(conf);
     val list=Array("you    jump","jump jump","jump jump","jump jump","jump jump","jump jump","jump jump","jump jump")
     val listRDD=sc.parallelize(list, 1);
     listRDD.flatMap { line => line.split("\t") }
     .map { word => (word,1) }
     .map(word =>{
      val prefix= (new util.Random).nextInt(4)
      (prefix+"_"+word._1,word._2)
     })
     .reduceByKey(_+_)
     .map(word =>{
       val key=word._1.split("_")(1)
       (key,word._2)
     })
     .reduceByKey(_+_)
     .foreach(result => println(result._1 + "  : "+result._2))
  }
}

方案優點: 
對於聚合類的shuffle操作導致的資料傾斜,效果是非常不錯的。通常都可以解決掉資料傾斜,或者至少是大幅度緩解資料傾斜,將Spark作業的效能提升數倍以上。 
方案缺點: 
僅僅適用於聚合類的shuffle操作,適用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

五、將reduce join 轉為map join 

方案使用場景:
在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的資料量比較小(幾百M或者一兩G)。 
實現思路:
不使用join運算元進行連線操作,而使用Broadcast變數與map類運算元實現join操作,進而完全規避掉shuffle類操作,徹底避免資料傾斜的發生和出現。將較小RDD中的資料直接通過collect運算元拉取到Driver端的記憶體中來,然後對其建立一個Broadcast變數;接著對另外RDD執行map類運算元,在運算元函式內,從Broadcast變數中獲取較小RDD 的全量資料,與當前RDD的每一條資料按照連線key進行比對,如果連線key相同的話,那麼就將兩個RDD的資料用你需要的方式連線起來。 
實現原理: 
普通的join是會走shuffle過程的,而一旦shuffle,就相當於會將相同key的資料拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以採用廣播小RDD全量資料+map運算元來實現與join同樣的效果,也就是mao join ,而此時不會發生shuffle操作,也就不會發生資料傾斜。 
方案優點: 
對join操作導致的資料傾斜,效果非常好,因為根本就不會發生shuffle,也就根本不會發生資料傾斜。 
方案缺點: 
適用場景較少,因為這個方案只適用於一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比較消耗記憶體資源,driver和每個Executor記憶體中都會駐留一份小RDD的全量資料。如果我們廣播出去的RDD資料比較大,比如10G以上,那麼就可能發生記憶體溢位了。因此並不適合兩個都是大表的情況。

package org.xtwy.sparkcore;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;

import scala.Tuple2;

public class MapjoinTest {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("MapjoinTest");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<String, String>> list1 = Arrays.asList(
                new Tuple2<String, String>("001", "令狐沖"),
                new Tuple2<String, String>("002", "任盈盈")
                );
        List<Tuple2<String, String>> list2 = Arrays.asList(
                new Tuple2<String, String>("001", "一班"),
                new Tuple2<String, String>("002", "二班")
                );
        JavaRDD<Tuple2<String, String>> list1RDD = sc.parallelize(list1);
        JavaRDD<Tuple2<String, String>> list2RDD = sc.parallelize(list2);
        //假設list1RDD資料量較小,一定防止記憶體溢位
        List<Tuple2<String, String>> rdd1data = list1RDD.collect();
        final Broadcast<List<Tuple2<String, String>>> rdd1broadcast = sc.broadcast(rdd1data);
        //返回值(,())
        JavaPairRDD<String, Tuple2<String, String>> resultRDD = list2RDD.mapToPair(new PairFunction<Tuple2<String,String>, String, Tuple2<String, String>>() {

            public Tuple2<String, Tuple2<String, String>> call(Tuple2<String, String> arg0) throws Exception {
                List<Tuple2<String, String>> rdd1data = rdd1broadcast.value();
                //ctrl shift o 匯入包
                Map<String,String> rdd1dataMap = new HashMap<String,String>();
                /**
                 * 將RDD1內資料放入Map結構中
                 */
                for(Tuple2<String, String> data:rdd1data){
                    rdd1dataMap.put(data._1, data._2);
                }
                String key=arg0._1;
                String value=arg0._2;
                String rdd1value = rdd1dataMap.get(key);
                return new Tuple2<String, Tuple2<String, String>>(key,new Tuple2<String, String>(value,rdd1value));
            }
        });
        resultRDD.foreach(new VoidFunction<Tuple2<String,Tuple2<String,String>>>() {

            public void call(Tuple2<String, Tuple2<String, String>> arg0) throws Exception {
                System.out.println(arg0._1+" "+arg0._2._1 + "姓名 " + arg0._2._2);

            }
        });

    }
}

六、取樣傾斜key並分拆join操作 

方案使用場景:
兩個RDD/Hive表進行join的時候,如果資料量都比較大,無法採用上第五點解決方案,那麼此時可以看一下兩個RDD/Hive表中key的分佈情況,若出現數據傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的資料量過大,而另一箇中的所有key都分佈比較均勻,那麼採用這個解決方案是比較合適的。 
實現思路:
對包含少數幾個資料量過大的key的那個RDD,通過sample運算元取樣出一份樣本來,然後統計一下每個key的資料量,計算出資料量最大的是哪幾個key。 
然後將這幾個key對應資料從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key打上n以內的隨機數作為字首,而不會導致傾斜的大部分key形成另外一個RDD。

接著將需要join的另一個RDD,也就是過濾出來的那幾個傾斜key對應的資料並形成一個單獨的RDD,將每條資料膨脹成n條資料,這n條資料都按順序附加一個0~n的字首,不會導致傾斜的大部分key也形成另外一個RDD 
(此時一共生存了四個RDD:兩個key有傾斜的RDD,兩個正常RDD) 
再將附加了隨機字首的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join。 
而另外兩普通的RDD就照常join即可。 
最後將兩次join的結果使用union運算元合併起來即可。

原理: 
對於join導致的資料傾斜,如果只是某幾個key導致了傾斜,可以將少數幾個key拆分為獨立RDD,並附加隨機字首打散成n份去進行join,此時這幾個key對於的資料就不會集中在少數幾個task上,而是分散到多個task進行join。 
優點:
對於join導致的資料傾斜,如果只是某幾個key導致了傾斜,此方法可以用最有效的方式打散key進行join,且只需要針對少數傾斜的key對應的資料進行擴容n倍,不需要對全量資料進行擴容,避免佔用過多記憶體。 
缺點: 
若key特別多,則不合適。

七、使用隨機字首和擴容RDD進行join 

方案使用場景:
若在進行join操作時,RDD中有大量的key導致資料傾斜的時候。 
思路:
首先檢視RDD/Hive表中的資料分佈情況,找到造成資料傾斜的RDD/Hive表,比如有多個key都對應了炒股哦萬條資料。 
然後將該RDD 的每條資料都打上一個n以內的隨即字首。 
同時對另外一個正常的RDD進行擴容,將每條資料都擴容成n條資料,擴容出來的每條資料都依次打上一個0~n的字首 
最後將兩個處理後的RDD進行join即可。 
原理: 
將原先一樣的key通過附加字首變成不一樣的key,然後就看可以將這些處理後的“不同的key”分散到多個task中那個去處理,而不是讓一個task去處理大量相同的key。此方法與方法六的區別在於,有大量傾斜key的情況,沒法將部分key拆分出來單獨處理,因此只能對整個RDD 進行資料擴容,對資源要求很高。 
缺點:
更多的是緩解資料傾斜,而不是徹底避免,而且需要對整個RDD進行擴容,對記憶體資源要求較高。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class ExtendRDDTest {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("ExtendRDDTest");
        JavaSparkContext sc = new JavaSparkContext(conf);
        List<Tuple2<String, String>> list1 = Arrays.asList(
                new Tuple2<String, String>("001", "令狐沖"),
                new Tuple2<String, String>("002", "任盈盈")
                );
        List<Tuple2<String, String>> list2 = Arrays.asList(
                new Tuple2<String, String>("001", "一班"),
                new Tuple2<String, String>("002", "二班")
                );
        JavaRDD<Tuple2<String, String>> list1RDD = sc.parallelize(list1);
        JavaRDD<Tuple2<String, String>> list2RDD = sc.parallelize(list2);
        //首先將其中key分佈比較均勻的RDD擴容100倍。
        JavaPairRDD<String, String> extendRDD = list1RDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String,String>, String, String>() {

            public Iterable<Tuple2<String, String>> call(Tuple2<String, String> arg0) throws Exception {
                ArrayList<Tuple2<String,String>> list = new ArrayList<Tuple2<String,String>>();
                for (int i =0;i<100;i++){
                    list.add(new Tuple2<String,String>(i+"_"+arg0._1,arg0._2));
                }
                return list;
            }
        });
        //將另外一個key分佈不均勻的RDD加上0~99的隨機數
        JavaPairRDD<String, String> mappedRDD = list2RDD.mapToPair(new PairFunction<Tuple2<String,String>, String, String>() {

            public Tuple2<String, String> call(Tuple2<String, String> arg0) throws Exception {
                Random random = new Random();
                int prefix = random.nextInt(100);
                return new Tuple2<String, String>(prefix+"_"+arg0._1,arg0._2);
            }
        });
        mappedRDD.join(extendRDD)
        .foreach(new VoidFunction<Tuple2<String,Tuple2<String,String>>>() {

            public void call(Tuple2<String, Tuple2<String, String>> arg0) throws Exception {
                System.out.println(arg0._1.split("_")[1]+" "+arg0._2._1 +"_"+arg0._2._2);

            }
        });
    }
}