1. 程式人生 > >spark調優之數據傾斜

spark調優之數據傾斜

ref etc substr -- comm fff hive hello read

(1)數據傾斜的介紹

  1)數據分區的策略:

     - 隨機分區:每一個數據分配的任意一個分區的概率是均等的
     - Hash分區:使用數據的Hash分區值,%分區數。(導致數據傾斜的原因)
     - 範圍分區:將數據範圍劃分,數據分配到不同的範圍中(分布式的全局排序)

  2)數據傾斜的原因:

  Shuffle數據之後導致數據分布不均勻,但是所有節點的機器的性能都是一樣的,程序也是一樣的,就是數據量不一致,所以決定了task的執行時長就被數據量決定了。

  3)定位數據傾斜的代碼:

  數據傾斜發生在shuffle過程,有shuffle過程的算子有:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。或者查看哪一個task執行緩慢、內存溢出...

  4)查看數據傾斜的key的分布情況:

//使用spark中的抽樣算子sample,查看相應的key的分布
val sampledPairs = pairs.sample(false, 0.1)  //抽樣
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

(2)數據傾斜的解決方案

  1)過濾掉少數數據傾斜的key:

  如果發現導致數據傾斜的key是極少數,並且對計算本身影響不大,那麽這種方案比較適用。
   實現原理:通過spark的sample算子,定位到數據傾斜的key,然後使用filter算子將其過濾即可。

  2)提高shuffle的並行度:

   這是一種嘗試性策略:就是提高增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。

  3)兩階段的聚合(局部聚合和全局聚合):

   適用場景:對RDD執行reduceByKey等這類有聚合操作的shuffle算子或者spark SQL使用使用group by語句進行分組聚合,比較適用。
技術分享圖片
原理:將原本相同的key通過附加隨機前綴的方式,變成多個不同key,就可以讓原本被一個task處理的數據分散到多個task上做局部聚合,進行解決單個task處理數據量過多的問題。接著去除隨機前綴,再次進行全局的聚合,就可以得到最終的結果。
代碼實現:

object _01SparkDataSkewTwoStageOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val sc = SparkUtil.sparkContext("local[2]", "_01SparkDataSkewTwoStageOps")
        val list = List(
            "hello you hello hello me",
            "hello you hello hello shit",
            "oh hello she study"
        )

        val listRDD = sc.parallelize(list)

        val pairsRDD = listRDD.flatMap(line => line.split("\\s+")).map((_, 1))
        //step 1 找到發生數據傾斜key
        val sampleRDD = pairsRDD.sample(false, 0.6)
        val cbk= sampleRDD.countByKey()
//        cbkRDD.foreach(println)
        val sortedInfo = cbk.toBuffer.sortWith((t1, t2) => t1._2 > t2._2)
        val dataSkewKey = sortedInfo.head._1
//        sortedInfo.foreach(println)
        println("發生了數據傾斜的Key:" + dataSkewKey)
        //step 2 給對應的key打上N以內的隨機前綴
        val prefixPairsRDD = pairsRDD.map{case (word, count) => {
            if(word.equals(dataSkewKey)) {
                val random = new Random()
                val prefix = random.nextInt(2)//0 1
                (s"${prefix}_${word}", count)
            } else {
                (word, count)
            }
        }}
        prefixPairsRDD.foreach(println)
        //step 3 局部聚合
        val partAggrInfo = prefixPairsRDD.reduceByKey(_+_)
        println("===============>局部聚合之後的結果:")
        partAggrInfo.foreach(println)
        //step 4 全局聚合
        //step 4.1 去掉前綴
        val unPrefixPairRDD = partAggrInfo.map{case (word, count) => {
            if(word.contains("_")) {
                (word.substring(word.indexOf("_") + 1), count)
            } else {
                (word, count)
            }
        }}
        println("================>去掉隨機前綴之後的結果:")
        unPrefixPairRDD.foreach(println)
        // step 4.2 全局聚合
        val fullAggrInfo = unPrefixPairRDD.reduceByKey(_+_)
        println("===============>全局聚合之後的結果:")
        fullAggrInfo.foreach(println)
        sc.stop()
    }
}

  4)將reduce join 轉換為map join(大小表):

   適用場景:在對RDD使用join操作,或者是在sparksql 中使用join語句的時候,而且join操作中的一個RDD或者表的數據量比較小,此方法適用
   實現原理:有reduce join的過程一定有shuffle,有shuffle就可能出現數據的傾斜,所以將reduce join使用map join 代替。如果一個RDD是比較小的,那麽可以使用廣播變量的方式,將小RDD發送到各個worker的executor中,實現本地的連接
代碼實現:

object _02SparkRDDBroadcastOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val conf = new SparkConf()
            .setMaster("local[2]")
            .setAppName(s"${_02SparkRDDBroadcastOps.getClass.getSimpleName}")
        val sc = new SparkContext(conf)

        val stu = List(
            "1  鄭祥楷 1",
            "2  王佳豪 1",
            "3  劉鷹 2",
            "4  宋誌華 3",
            "5  劉帆 4",
            "6  OLDLi 5"
        )

        val cls = List(
            "1 1807bd-bj",
            "2 1807bd-sz",
            "3 1807bd-wh",
            "4 1807bd-xa",
            "7 1805bd-bj"
        )
        /*
            使用廣播變量來完成上述操作
            一般用戶表都比較大,而班級表相對很小,符合我們在共享變量中提出的第一個假設
            所以我們可以嘗試使用廣播變量來進行解決
         */
        val stuRDD = sc.parallelize(stu)
        //cls-->map---->
        val map = cls.map{case line => {
            (line.substring(0, line.indexOf(" ")), line.substring(line.indexOf(" ")).trim)
        }}.toMap
        //map--->broadcast
        val clsMapBC:Broadcast[Map[String, String]] = sc.broadcast(map)

        stuRDD.map{case line => {
            val map = clsMapBC.value
            val fields = line.split("\\s+")
            val cid = fields(2)
//            map.get(cid)
            val className = map.getOrElse(cid, "UnKnown")
            s"${fields(0)}\t${fields(1)}\t${className}"//在mr中學習到的map join
        }}.foreach(println)
        sc.stop()
    }

  5)采樣傾斜的key並拆分join操作(大大表):

  適用場景:在hive兩張表進行join的時候,如果兩張表的數據都很大,並且,一張表的數據很均勻,但是另一張表的數據有少量的key數據量過大,此時使用這個解決方案
  實現原理:對於join導致的數據傾斜,如果只是某幾個key導致了傾斜,可以將少數幾個key分拆成獨立RDD,並附加隨機前綴打散成n份去進行join,此時這幾個key對應的數據就不會集中在少數幾個task上,而是分散到多個task進行join了。
代碼實現:

一張表中:
Id  num
1   100W
2    10
3    10
4    10
5    10

可以使用union的方式來啟動多個job並行執行:

//通過分離數據量大key來解決數據傾斜
select count(*) from t_test where id !=1 group by id
union
select count(*) from t_test where id ==1 group by id

  6)使用隨機前綴和擴容RDD進行join(大量key的數據傾斜):

  適用場景:如果進行join操作時,RDD中有大量的key導致數據傾斜,那麽進行拆分可以也沒有意義,此時使用這種方法
技術分享圖片
  實現原理:這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,因此只能對整個RDD進行數據擴容,對內存資源要求很高。
代碼實現:
技術分享圖片
左表的連接條件的值,可以在某個範圍內進行隨機,並且這個隨機值有多少個,那麽右表的數據就要復制多少份。

object _03SparkJoinDataSkewOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val sc = SparkUtil.sparkContext("local[2]", "_03SparkJoinDataSkewOps")
        val list1 = List(
            "hello 1",
            "hello 2",
            "hello 3",
            "hello 4",
            "you 1",
            "me 2"
        )

        val list2 = List(
            "hello zhangsan dfsadfasdfsa",
            "hello lisi adfasdfasd",
            "you wangwu adfasdfs",
            "me zhouqi adfadfa"
        )
        //<key, value>
        val listRDD1 = sc.parallelize(list1).map(line => {
            val fields = line.split("\\s+")
            (fields(0), fields(1))
        })
        //<key, value>
        val listRDD2 = sc.parallelize(list2).map(line => {
            val fields = line.split("\\s+")
            (fields(0), fields(1))
        })
        val joinRDD: RDD[(String, (String, String))] = dataSkewRDDJoin(sc, listRDD1, listRDD2)
        println("最後進行join的結果:")
        joinRDD.foreach(println)
        sc.stop()
    }

    private def dataSkewRDDJoin(sc: SparkContext, listRDD1: RDD[(String, String)], listRDD2: RDD[(String, String)]) = {
        //假設listRDD1中的部分key有數據傾斜,所以我在進行join操作的時候,需要進行拆分計算
        //step 1 找到發生數據傾斜的key
        val dataSkewKeys = listRDD1.sample(false, 0.6).countByKey().toList.sortWith((t1, t2) => t1._2 > t2._2).take(1).map(t => t._1)
        println("通過sample算子得到的可能發生數據傾斜的key:" + dataSkewKeys)
        //step 2 對listRDD1和listRDD2中的數據按照dataSkewKeys各拆分成兩個部分
        //step 2.1 講dataSkewKeys進行廣播
        val dskBC = sc.broadcast(dataSkewKeys)
        // step 2.2 進行拆分
        val dataSkewRDD1 = listRDD1.filter { case (word, value) => {
            //有數據傾斜的rdd--->dataskewRDD1
            val dsks = dskBC.value
            dsks.contains(word)
        }
        }

        val commonRDD1 = listRDD1.filter { case (word, value) => {
            //沒有數據傾斜的rdd--->commonRDD1
            val dsks = dskBC.value
            !dsks.contains(word)
        }
        }
        val dataSkewRDD2 = listRDD2.filter { case (word, value) => {
            //有數據傾斜的rdd--->dataskewRDD1
            val dsks = dskBC.value
            dsks.contains(word)
        }
        }

        val commonRDD2 = listRDD2.filter { case (word, value) => {
            //沒有數據傾斜的rdd--->commonRDD1
            val dsks = dskBC.value
            !dsks.contains(word)
        }
        }
      }
        //step 3 對dataskewRDD進行添加N以內隨機前綴
        // step 3.1 添加隨機前綴
        val prefixDSRDD1:RDD[(String, String)] = dataSkewRDD1.map { case (word, value) => {
            val random = new Random()
            val prefix = random.nextInt(2)
            (s"${prefix}_${word}", value)
        }
        }
        // step 3.2 另一個rdd進行擴容
        val prefixDSRDD2:RDD[(String, String)] = dataSkewRDD2.flatMap { case (word, value) => {
            val ab = ArrayBuffer[(String, String)]()
            for (i <- 0 until 2) {
                ab.append((s"${i}_${word}", value))
            }
            ab
        }
        }
        println("---->有數據傾斜RDD1添加前綴成prefixDSRDD1的結果:" + prefixDSRDD1.collect().mkString(","))
        println("---->有數據傾斜RDD2擴容之後成prefixDSRDD2的結果:" + prefixDSRDD2.collect().mkString(","))

        // step 4 分步進行join操作
        // step 4.1 有數據傾斜的prefixDSRDD1和prefixDSRDD2進行join
        val prefixJoinDSRDD = prefixDSRDD1.join(prefixDSRDD2)
        //ste 4.2 無數據傾斜的commonRDD1和commonRDD2進行join
        val commonJoinRDD = commonRDD1.join(commonRDD2)
        // step 4.3 將隨機前綴去除
        val dsJionRDD = prefixJoinDSRDD.map { case (word, (value1, value2)) => {
            (word.substring(2), (value1, value2))
        }
        }
        //step 5 將拆分進行join之後的結果進行union連接,得到最後的結果 sql union all
        val joinRDD = dsJionRDD.union(commonJoinRDD)
        joinRDD
    }
}

本博文參考至美團的spark調優:https://tech.meituan.com/

spark調優之數據傾斜