1. 程式人生 > >Spark效能優化之道——解決Spark資料傾斜(Data Skew)的N種姿勢

Spark效能優化之道——解決Spark資料傾斜(Data Skew)的N種姿勢

摘要

本文結合例項詳細闡明瞭Spark資料傾斜的幾種場景以及對應的解決方案,包括避免資料來源傾斜,調整並行度,使用自定義Partitioner,使用Map側Join代替Reduce側Join,給傾斜Key加上隨機字首等。

為何要處理資料傾斜(Data Skew)

什麼是資料傾斜

對Spark/Hadoop這樣的大資料系統來講,資料量大並不可怕,可怕的是資料傾斜。

何謂資料傾斜?資料傾斜指的是,並行處理的資料集中,某一部分(如Spark或Kafka的一個Partition)的資料顯著多於其它部分,從而使得該部分的處理速度成為整個資料集處理的瓶頸。

資料傾斜是如何造成的

在Spark中,同一個Stage的不同Partition可以並行處理,而具有依賴關係的不同Stage之間是序列處理的。假設某個Spark Job分為Stage 0和Stage 1兩個Stage,且Stage 1依賴於Stage 0,那Stage 0完全處理結束之前不會處理Stage 1。而Stage 0可能包含N個Task,這N個Task可以並行進行。如果其中N-1個Task都在10秒內完成,而另外一個Task卻耗時1分鐘,那該Stage的總時間至少為1分鐘。換句話說,一個Stage所耗費的時間,主要由最慢的那個Task決定。

由於同一個Stage內的所有Task執行相同的計算,在排除不同計算節點計算能力差異的前提下,不同Task之間耗時的差異主要由該Task所處理的資料量決定。

Stage的資料來源主要分為如下兩類
- 從資料來源直接讀取。如讀取HDFS,Kafka
- 讀取上一個Stage的Shuffle資料

如何緩解/消除資料傾斜

儘量避免資料來源的資料傾斜

以Spark Stream通過DirectStream方式讀取Kafka資料為例。由於Kafka的每一個Partition對應Spark的一個Task(Partition),所以Kafka內相關Topic的各Partition之間資料是否平衡,直接決定Spark處理該資料時是否會產生資料傾斜。

如《Kafka設計解析(一)- Kafka背景及架構介紹》一文所述,Kafka某一Topic內訊息在不同Partition之間的分佈,主要由Producer端所使用的Partition實現類決定。如果使用隨機Partitioner,則每條訊息會隨機發送到一個Partition中,從而從概率上來講,各Partition間的資料會達到平衡。此時源Stage(直接讀取Kafka資料的Stage)不會產生資料傾斜。

但很多時候,業務場景可能會要求將具備同一特徵的資料順序消費,此時就需要將具有相同特徵的資料放於同一個Partition中。一個典型的場景是,需要將同一個使用者相關的PV資訊置於同一個Partition中。此時,如果產生了資料傾斜,則需要通過其它方式處理。

調整並行度分散同一個Task的不同Key

原理

Spark在做Shuffle時,預設使用HashPartitioner(非Hash Shuffle)對資料進行分割槽。如果並行度設定的不合適,可能造成大量不相同的Key對應的資料被分配到了同一個Task上,造成該Task所處理的資料遠大於其它Task,從而造成資料傾斜。

如果調整Shuffle時的並行度,使得原本被分配到同一Task的不同Key發配到不同Task上處理,則可降低原Task所需處理的資料量,從而緩解資料傾斜問題造成的短板效應。
spark change parallelism

案例

現有一張測試表,名為student_external,內有10.5億條資料,每條資料有一個唯一的id值。現從中取出id取值為9億到10.5億的共1.5條資料,並通過一些處理,使得id為9億到9.4億間的所有資料對12取模後餘數為8(即在Shuffle並行度為12時該資料集全部被HashPartition分配到第8個Task),其它資料集對其id除以100取整,從而使得id大於9.4億的資料在Shuffle時可被均勻分配到所有Task中,而id小於9.4億的資料全部分配到同一個Task中。處理過程如下

INSERT OVERWRITE TABLE test
SELECT CASE WHEN id < 940000000 THEN (9500000  + (CAST (RAND() * 8 AS INTEGER)) * 12 )
       ELSE CAST(id/100 AS INTEGER)
       END,
       name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

通過上述處理,一份可能造成後續資料傾斜的測試資料即以準備好。接下來,使用Spark讀取該測試資料,並通過groupByKey(12)對id分組處理,且Shuffle並行度為12。程式碼如下

public class SparkDataSkew {
  public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder()
      .appName("SparkDataSkewTunning")
      .config("hive.metastore.uris", "thrift://hadoop1:9083")
      .enableHiveSupport()
      .getOrCreate();

    Dataset<Row> dataframe = sparkSession.sql( "select * from test");
    dataframe.toJavaRDD()
      .mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1)))
      .groupByKey(12)
      .mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> {
        int id = tuple._1();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());
        return new Tuple2<Integer, Integer>(id, atomicInteger.get());
      }).count();

      sparkSession.stop();
      sparkSession.close();
  }

}

本次實驗所使用叢集節點數為4,每個節點可被Yarn使用的CPU核數為16,記憶體為16GB。使用如下方式提交上述應用,將啟動4個Executor,每個Executor可使用核數為12(該配置並非生產環境下的最優配置,僅用於本文實驗),可用記憶體為12GB。

spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar

GroupBy Stage的Task狀態如下圖所示,Task 8處理的記錄數為4500萬,遠大於(9倍於)其它11個Task處理的500萬記錄。而Task 8所耗費的時間為38秒,遠高於其它11個Task的平均時間(16秒)。整個Stage的時間也為38秒,該時間主要由最慢的Task 8決定。
data skew

在這種情況下,可以通過調整Shuffle並行度,使得原來被分配到同一個Task(即該例中的Task 8)的不同Key分配到不同Task,從而降低Task 8所需處理的資料量,緩解資料傾斜。

通過groupByKey(48)將Shuffle並行度調整為48,重新提交到Spark。新的Job的GroupBy Stage所有Task狀態如下圖所示。
add parallelism

從上圖可知,記錄數最多的Task 20處理的記錄數約為1125萬,相比於並行度為12時Task 8的4500萬,降低了75%左右,而其耗時從原來Task 8的38秒降到了24秒。

在這種場景下,調整並行度,並不意味著一定要增加並行度,也可能是減小並行度。如果通過groupByKey(11)將Shuffle並行度調整為11,重新提交到Spark。新Job的GroupBy Stage的所有Task狀態如下圖所示。
reduce parallelism

從上圖可見,處理記錄數最多的Task 6所處理的記錄數約為1045萬,耗時為23秒。處理記錄數最少的Task 1處理的記錄數約為545萬,耗時12秒。

總結

適用場景
大量不同的Key被分配到了相同的Task造成該Task資料量過大。

解決方案
調整並行度。一般是增大並行度,但有時如本例減小並行度也可達到效果。

優勢
實現簡單,可在需要Shuffle的操作運算元上直接設定並行度或者使用spark.default.parallelism設定。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=[num_tasks]設定並行度。可用最小的代價解決問題。一般如果出現數據傾斜,都可以通過這種方法先試驗幾次,如果問題未解決,再嘗試其它方法。

劣勢
適用場景少,只能將分配到同一Task的不同Key分散開,但對於同一Key傾斜嚴重的情況該方法並不適用。並且該方法一般只能緩解資料傾斜,沒有徹底消除問題。從實踐經驗來看,其效果一般。

自定義Partitioner

原理

使用自定義的Partitioner(預設為HashPartitioner),將原本被分配到同一個Task的不同Key分配到不同Task。

案例

以上述資料集為例,繼續將併發度設定為12,但是在groupByKey運算元上,使用自定義的Partitioner(實現如下)

.groupByKey(new Partitioner() {
  @Override
  public int numPartitions() {
    return 12;
  }

  @Override
  public int getPartition(Object key) {
    int id = Integer.parseInt(key.toString());
    if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
      return (id - 9500000) / 12;
    } else {
      return id % 12;
    }
  }
})

由下圖可見,使用自定義Partition後,耗時最長的Task 6處理約1000萬條資料,用時15秒。並且各Task所處理的資料集大小相當。
customizec partitioner

總結

適用場景
大量不同的Key被分配到了相同的Task造成該Task資料量過大。

解決方案
使用自定義的Partitioner實現類代替預設的HashPartitioner,儘量將所有不同的Key均勻分配到不同的Task中。

優勢
不影響原有的並行度設計。如果改變並行度,後續Stage的並行度也會預設改變,可能會影響後續Stage。

劣勢
適用場景有限,只能將不同Key分散開,對於同一Key對應資料集非常大的場景不適用。效果與調整並行度類似,只能緩解資料傾斜而不能完全消除資料傾斜。而且需要根據資料特點自定義專用的Partitioner,不夠靈活。

將Reduce side Join轉變為Map side Join

原理

通過Spark的Broadcast機制,將Reduce側Join轉化為Map側Join,避免Shuffle從而完全消除Shuffle帶來的資料傾斜。
spark map join

案例

通過如下SQL建立一張具有傾斜Key且總記錄數為1.5億的大表test。

INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 980000000 THEN (95000000  + (CAST (RAND() * 4 AS INT) + 1) * 48 )
       ELSE CAST(id/10 AS INT) END AS STRING),
       name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

使用如下SQL建立一張資料分佈均勻且總記錄數為50萬的小表test_new。

INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/10 AS INT) AS STRING),
       name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

直接通過Spark Thrift Server提交如下SQL將表test與表test_new進行Join並將Join結果存於表test_join中。

INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

該SQL對應的DAG如下圖所示。從該圖可見,該執行過程總共分為三個Stage,前兩個用於從Hive中讀取資料,同時二者進行Shuffle,通過最後一個Stage進行Join並將結果寫入表test_join中。
reduce join DAG

從下圖可見,最近Join Stage各Task處理的資料傾斜嚴重,處理資料量最大的Task耗時7.1分鐘,遠高於其它無資料傾斜的Task約2s秒的耗時。
reduce join DAG

接下來,嘗試通過Broadcast實現Map側Join。實現Map側Join的方法,並非直接通過CACHE TABLE test_new將小表test_new進行cache。現通過如下SQL進行Join。

CACHE TABLE test_new;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

通過如下DAG圖可見,該操作仍分為三個Stage,且仍然有Shuffle存在,唯一不同的是,小表的讀取不再直接掃描Hive表,而是掃描記憶體中快取的表。
reduce join DAG

並且資料傾斜仍然存在。如下圖所示,最慢的Task耗時為7.1分鐘,遠高於其它Task的約2秒。
reduce join DAG

正確的使用Broadcast實現Map側Join的方式是,通過SET spark.sql.autoBroadcastJoinThreshold=104857600;將Broadcast的閾值設定得足夠大。

再次通過如下SQL進行Join。

SET spark.sql.autoBroadcastJoinThreshold=104857600;
INSERT OVERWRITE TABLE test_join
SELECT test_new.id, test_new.name
FROM test
JOIN test_new
ON test.id = test_new.id;

通過如下DAG圖可見,該方案只包含一個Stage。
reduce join DAG

並且從下圖可見,各Task耗時相當,無明顯資料傾斜現象。並且總耗時為1.5分鐘,遠低於Reduce側Join的7.3分鐘。
reduce join DAG

總結

適用場景
參與Join的一邊資料集足夠小,可被載入進Driver並通過Broadcast方法廣播到各個Executor中。

解決方案
在Java/Scala程式碼中將小資料集資料拉取到Driver,然後通過broadcast方案將小資料集的資料廣播到各Executor。或者在使用SQL前,將broadcast的閾值調整得足夠多,從而使用broadcast生效。進而將Reduce側Join替換為Map側Join。

優勢
避免了Shuffle,徹底消除了資料傾斜產生的條件,可極大提升效能。

劣勢
要求參與Join的一側資料集足夠小,並且主要適用於Join的場景,不適合聚合的場景,適用條件有限。

為skew的key增加隨機前/字尾

原理

為資料量特別大的Key增加隨機前/字尾,使得原來Key相同的資料變為Key不相同的資料,從而使傾斜的資料集分散到不同的Task中,徹底解決資料傾斜問題。Join另一則的資料中,與傾斜Key對應的部分資料,與隨機字首集作笛卡爾乘積,從而保證無論資料傾斜側傾斜Key如何加字首,都能與之正常Join。
spark random prefix

案例

通過如下SQL,將id為9億到9.08億共800萬條資料的id轉為9500048或者9500096,其它資料的id除以100取整。從而該資料集中,id為9500048和9500096的資料各400萬,其它id對應的資料記錄數均為100條。這些資料存於名為test的表中。

對於另外一張小表test_new,取出50萬條資料,並將id(遞增且唯一)除以100取整,使得所有id都對應100條資料。

INSERT OVERWRITE TABLE test
SELECT CAST(CASE WHEN id < 908000000 THEN (9500000  + (CAST (RAND() * 2 AS INT) + 1) * 48 )
  ELSE CAST(id/100 AS INT) END AS STRING),
  name
FROM student_external
WHERE id BETWEEN 900000000 AND 1050000000;

INSERT OVERWRITE TABLE test_new
SELECT CAST(CAST(id/100 AS INT) AS STRING),
  name
FROM student_delta_external
WHERE id BETWEEN 950000000 AND 950500000;

通過如下程式碼,讀取test表對應的資料夾內的資料並轉換為JavaPairRDD存於leftRDD中,同樣讀取test表對應的資料存於rightRDD中。通過RDD的join運算元對leftRDD與rightRDD進行Join,並指定並行度為48。

public class SparkDataSkew{
  public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect");
    sparkConf.set("spark.default.parallelism", parallelism + "");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

    JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
      .mapToPair((String row) -> {
        String[] str = row.split(",");
        return new Tuple2<String, String>(str[0], str[1]);
      });

    JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
      .mapToPair((String row) -> {
        String[] str = row.split(",");
          return new Tuple2<String, String>(str[0], str[1]);
      });

    leftRDD.join(rightRDD, parallelism)
      .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()))
      .foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
        AtomicInteger atomicInteger = new AtomicInteger();
          iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
      });

    javaSparkContext.stop();
    javaSparkContext.close();
  }
}

從下圖可看出,整個Join耗時1分54秒,其中Join Stage耗時1.7分鐘。
few skewed key join

通過分析Join Stage的所有Task可知,在其它Task所處理記錄數為192.71萬的同時Task 32的處理的記錄數為992.72萬,故它耗時為1.7分鐘,遠高於其它Task的約10秒。這與上文準備資料集時,將id為9500048為9500096對應的資料量設定非常大,其它id對應的資料集非常均勻相符合。
few skewed key join

現通過如下操作,實現傾斜Key的分散處理
- 將leftRDD中傾斜的key(即9500048與9500096)對應的資料單獨過濾出來,且加上1到24的隨機字首,並將字首與原資料用逗號分隔(以方便之後去掉字首)形成單獨的leftSkewRDD
- 將rightRDD中傾斜key對應的資料抽取出來,並通過flatMap操作將該資料集中每條資料均轉換為24條資料(每條分別加上1到24的隨機字首),形成單獨的rightSkewRDD
- 將leftSkewRDD與rightSkewRDD進行Join,並將並行度設定為48,且在Join過程中將隨機字首去掉,得到傾斜資料集的Join結果skewedJoinRDD
- 將leftRDD中不包含傾斜Key的資料抽取出來作為單獨的leftUnSkewRDD
- 對leftUnSkewRDD與原始的rightRDD進行Join,並行度也設定為48,得到Join結果unskewedJoinRDD
- 通過union運算元將skewedJoinRDD與unskewedJoinRDD進行合併,從而得到完整的Join結果集

具體實現程式碼如下

public class SparkDataSkew{
    public static void main(String[] args) {
      int parallelism = 48;
      SparkConf sparkConf = new SparkConf();
      sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
      sparkConf.set("spark.default.parallelism", parallelism + "");
      JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

      JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
        .mapToPair((String row) -> {
          String[] str = row.split(",");
            return new Tuple2<String, String>(str[0], str[1]);
        });

        JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
          .mapToPair((String row) -> {
            String[] str = row.split(",");
              return new Tuple2<String, String>(str[0], str[1]);
          });

        String[] skewedKeyArray = new String[]{"9500048", "9500096"};
        Set<String> skewedKeySet = new HashSet<String>();
        List<String> addList = new ArrayList<String>();
        for(int i = 1; i <=24; i++) {
            addList.add(i + "");
        }
        for(String key : skewedKeyArray) {
            skewedKeySet.add(key);
        }

        Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
        Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

        JavaPairRDD<String, String> leftSkewRDD = leftRDD
          .filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
          .mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));

        JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
          .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
          .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
          .collect(Collectors.toList())
          .iterator()
        );

        JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
          .join(rightSkewRDD, parallelism)
          .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

        JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1()));
        JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()));

        skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
          AtomicInteger atomicInteger = new AtomicInteger();
          iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
        });

        javaSparkContext.stop();
        javaSparkContext.close();
    }
}

從下圖可看出,整個Join耗時58秒,其中Join Stage耗時33秒。
few skewed key join

通過分析Join Stage的所有Task可知
- 由於Join分傾斜資料集Join和非傾斜資料集Join,而各Join的並行度均為48,故總的並行度為96
- 由於提交任務時,設定的Executor個數為4,每個Executor的core數為12,故可用Core數為48,所以前48個Task同時啟動(其Launch時間相同),後48個Task的啟動時間各不相同(等待前面的Task結束才開始)
- 由於傾斜Key被加上隨機字首,原本相同的Key變為不同的Key,被分散到不同的Task處理,故在所有Task中,未發現所處理資料集明顯高於其它Task的情況

few skewed key join

實際上,由於傾斜Key與非傾斜Key的操作完全獨立,可並行進行。而本實驗受限於可用總核數為48,可同時執行的總Task數為48,故而該方案只是將總耗時減少一半(效率提升一倍)。如果資源充足,可併發執行Task數增多,該方案的優勢將更為明顯。在實際專案中,該方案往往可提升數倍至10倍的效率。

總結

適用場景
兩張表都比較大,無法使用Map則Join。其中一個RDD有少數幾個Key的資料量過大,另外一個RDD的Key分佈較為均勻。

解決方案
將有資料傾斜的RDD中傾斜Key對應的資料集單獨抽取出來加上隨機字首,另外一個RDD每條資料分別與隨機字首結合形成新的RDD(相當於將其資料增到到原來的N倍,N即為隨機字首的總個數),然後將二者Join並去掉字首。然後將不包含傾斜Key的剩餘資料進行Join。最後將兩次Join的結果集通過union合併,即可得到全部Join結果。

優勢
相對於Map則Join,更能適應大資料集的Join。如果資源充足,傾斜部分資料集與非傾斜部分資料集可並行進行,效率提升明顯。且只針對傾斜部分的資料做資料擴充套件,增加的資源消耗有限。

劣勢
如果傾斜Key非常多,則另一側資料膨脹非常大,此方案不適用。而且此時對傾斜Key與非傾斜Key分開處理,需要掃描資料集兩遍,增加了開銷。

大表隨機新增N種隨機字首,小表擴大N倍

原理

如果出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在資料傾斜的資料集全部加上隨機字首,然後對另外一個不存在嚴重資料傾斜的資料集整體與隨機字首集作笛卡爾乘積(即將資料量擴大N倍)。
spark random prefix

案例

這裡給出示例程式碼,讀者可參考上文中分拆出少數傾斜Key新增隨機字首的方法,自行測試。

public class SparkDataSkew {
  public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("ResolveDataSkewWithNAndRandom");
    sparkConf.set("spark.default.parallelism", parallelism + "");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

    JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
      .mapToPair((String row) -> {
        String[] str = row.split(",");
        return new Tuple2<String, String>(str[0], str[1]);
      });

    JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
      .mapToPair((String row) -> {
        String[] str = row.split(",");
        return new Tuple2<String, String>(str[0], str[1]);
    });

    List<String> addList = new ArrayList<String>();
    for(int i = 1; i <=48; i++) {
      addList.add(i + "");
    }

    Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

    JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2()));

    JavaPairRDD<String, String> rightNewRDD = rightRDD
      .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
      .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
      .collect(Collectors.toList())
      .iterator()
    );

    JavaPairRDD<String, String> joinRDD = leftRandomRDD
      .join(rightNewRDD, parallelism)
      .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

    joinRDD.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
      AtomicInteger atomicInteger = new AtomicInteger();
      iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
    });

    javaSparkContext.stop();
    javaSparkContext.close();
  }
}

總結

適用場景
一個數據集存在的傾斜Key比較多,另外一個數據集資料分佈比較均勻。

優勢
對大部分場景都適用,效果不錯。

劣勢
需要將一個數據集整體擴大N倍,會增加資源消耗。

總結

對於資料傾斜,並無一個統一的一勞永逸的方法。更多的時候,是結合資料特點(資料集大小,傾斜Key的多少等)綜合使用上文所述的多種方法。