1. 程式人生 > >spark調優之資料傾斜

spark調優之資料傾斜

(1)資料傾斜的介紹

  1)資料分割槽的策略:

     - 隨機分割槽:每一個數據分配的任意一個分割槽的概率是均等的
     - Hash分割槽:使用資料的Hash分割槽值,%分割槽數。(導致資料傾斜的原因)
     - 範圍分割槽:將資料範圍劃分,資料分配到不同的範圍中(分散式的全域性排序)

  2)資料傾斜的原因:

  Shuffle資料之後導致資料分佈不均勻,但是所有節點的機器的效能都是一樣的,程式也是一樣的,就是資料量不一致,所以決定了task的執行時長就被資料量決定了。

  3)定位資料傾斜的程式碼:

  資料傾斜發生在shuffle過程,有shuffle過程的運算元有:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。或者檢視哪一個task執行緩慢、記憶體溢位...

  4)檢視資料傾斜的key的分佈情況:

//使用spark中的抽樣運算元sample,檢視相應的key的分佈
val sampledPairs = pairs.sample(false, 0.1)  //抽樣
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

(2)資料傾斜的解決方案

  1)過濾掉少數資料傾斜的key:

  如果發現導致資料傾斜的key是極少數,並且對計算本身影響不大,那麼這種方案比較適用。
   實現原理:通過spark的sample運算元,定位到資料傾斜的key,然後使用filter運算元將其過濾即可。

  2)提高shuffle的並行度:

   這是一種嘗試性策略:就是提高增加shuffle read task的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的資料。

  3)兩階段的聚合(區域性聚合和全域性聚合):

   適用場景:對RDD執行reduceByKey等這類有聚合操作的shuffle運算元或者spark SQL使用使用group by語句進行分組聚合,比較適用。
spark調優之資料傾斜
原理:將原本相同的key通過附加隨機字首的方式,變成多個不同key,就可以讓原本被一個task處理的資料分散到多個task上做區域性聚合,進行解決單個task處理資料量過多的問題。接著去除隨機字首,再次進行全域性的聚合,就可以得到最終的結果。
程式碼實現:

object _01SparkDataSkewTwoStageOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val sc = SparkUtil.sparkContext("local[2]", "_01SparkDataSkewTwoStageOps")
        val list = List(
            "hello you hello hello me",
            "hello you hello hello shit",
            "oh hello she study"
        )

        val listRDD = sc.parallelize(list)

        val pairsRDD = listRDD.flatMap(line => line.split("\\s+")).map((_, 1))
        //step 1 找到發生資料傾斜key
        val sampleRDD = pairsRDD.sample(false, 0.6)
        val cbk= sampleRDD.countByKey()
//        cbkRDD.foreach(println)
        val sortedInfo = cbk.toBuffer.sortWith((t1, t2) => t1._2 > t2._2)
        val dataSkewKey = sortedInfo.head._1
//        sortedInfo.foreach(println)
        println("發生了資料傾斜的Key:" + dataSkewKey)
        //step 2 給對應的key打上N以內的隨機字首
        val prefixPairsRDD = pairsRDD.map{case (word, count) => {
            if(word.equals(dataSkewKey)) {
                val random = new Random()
                val prefix = random.nextInt(2)//0 1
                (s"${prefix}_${word}", count)
            } else {
                (word, count)
            }
        }}
        prefixPairsRDD.foreach(println)
        //step 3 區域性聚合
        val partAggrInfo = prefixPairsRDD.reduceByKey(_+_)
        println("===============>區域性聚合之後的結果:")
        partAggrInfo.foreach(println)
        //step 4 全域性聚合
        //step 4.1 去掉字首
        val unPrefixPairRDD = partAggrInfo.map{case (word, count) => {
            if(word.contains("_")) {
                (word.substring(word.indexOf("_") + 1), count)
            } else {
                (word, count)
            }
        }}
        println("================>去掉隨機字首之後的結果:")
        unPrefixPairRDD.foreach(println)
        // step 4.2 全域性聚合
        val fullAggrInfo = unPrefixPairRDD.reduceByKey(_+_)
        println("===============>全域性聚合之後的結果:")
        fullAggrInfo.foreach(println)
        sc.stop()
    }
}

  4)將reduce join 轉換為map join(大小表):

   適用場景:在對RDD使用join操作,或者是在sparksql 中使用join語句的時候,而且join操作中的一個RDD或者表的資料量比較小,此方法適用
   實現原理:有reduce join的過程一定有shuffle,有shuffle就可能出現數據的傾斜,所以將reduce join使用map join 代替。如果一個RDD是比較小的,那麼可以使用廣播變數的方式,將小RDD傳送到各個worker的executor中,實現本地的連線
程式碼實現:

object _02SparkRDDBroadcastOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val conf = new SparkConf()
            .setMaster("local[2]")
            .setAppName(s"${_02SparkRDDBroadcastOps.getClass.getSimpleName}")
        val sc = new SparkContext(conf)

        val stu = List(
            "1  鄭祥楷 1",
            "2  王佳豪 1",
            "3  劉鷹 2",
            "4  宋志華 3",
            "5  劉帆 4",
            "6  OLDLi 5"
        )

        val cls = List(
            "1 1807bd-bj",
            "2 1807bd-sz",
            "3 1807bd-wh",
            "4 1807bd-xa",
            "7 1805bd-bj"
        )
        /*
            使用廣播變數來完成上述操作
            一般使用者表都比較大,而班級表相對很小,符合我們在共享變數中提出的第一個假設
            所以我們可以嘗試使用廣播變數來進行解決
         */
        val stuRDD = sc.parallelize(stu)
        //cls-->map---->
        val map = cls.map{case line => {
            (line.substring(0, line.indexOf(" ")), line.substring(line.indexOf(" ")).trim)
        }}.toMap
        //map--->broadcast
        val clsMapBC:Broadcast[Map[String, String]] = sc.broadcast(map)

        stuRDD.map{case line => {
            val map = clsMapBC.value
            val fields = line.split("\\s+")
            val cid = fields(2)
//            map.get(cid)
            val className = map.getOrElse(cid, "UnKnown")
            s"${fields(0)}\t${fields(1)}\t${className}"//在mr中學習到的map join
        }}.foreach(println)
        sc.stop()
    }

  5)取樣傾斜的key並拆分join操作(大大表):

  適用場景:在hive兩張表進行join的時候,如果兩張表的資料都很大,並且,一張表的資料很均勻,但是另一張表的資料有少量的key資料量過大,此時使用這個解決方案
  實現原理:對於join導致的資料傾斜,如果只是某幾個key導致了傾斜,可以將少數幾個key分拆成獨立RDD,並附加隨機字首打散成n份去進行join,此時這幾個key對應的資料就不會集中在少數幾個task上,而是分散到多個task進行join了。
程式碼實現:

一張表中:
Id  num
1   100W
2    10
3    10
4    10
5    10

可以使用union的方式來啟動多個job並行執行:

//通過分離資料量大key來解決資料傾斜
select count(*) from t_test where id !=1 group by id
union
select count(*) from t_test where id ==1 group by id

  6)使用隨機字首和擴容RDD進行join(大量key的資料傾斜):

  適用場景:如果進行join操作時,RDD中有大量的key導致資料傾斜,那麼進行拆分可以也沒有意義,此時使用這種方法
spark調優之資料傾斜
  實現原理:這一種方案是針對有大量傾斜key的情況,沒法將部分key拆分出來進行單獨處理,因此只能對整個RDD進行資料擴容,對記憶體資源要求很高。
程式碼實現:
spark調優之資料傾斜
左表的連線條件的值,可以在某個範圍內進行隨機,並且這個隨機值有多少個,那麼右表的資料就要複製多少份。

object _03SparkJoinDataSkewOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val sc = SparkUtil.sparkContext("local[2]", "_03SparkJoinDataSkewOps")
        val list1 = List(
            "hello 1",
            "hello 2",
            "hello 3",
            "hello 4",
            "you 1",
            "me 2"
        )

        val list2 = List(
            "hello zhangsan dfsadfasdfsa",
            "hello lisi adfasdfasd",
            "you wangwu adfasdfs",
            "me zhouqi adfadfa"
        )
        //<key, value>
        val listRDD1 = sc.parallelize(list1).map(line => {
            val fields = line.split("\\s+")
            (fields(0), fields(1))
        })
        //<key, value>
        val listRDD2 = sc.parallelize(list2).map(line => {
            val fields = line.split("\\s+")
            (fields(0), fields(1))
        })
        val joinRDD: RDD[(String, (String, String))] = dataSkewRDDJoin(sc, listRDD1, listRDD2)
        println("最後進行join的結果:")
        joinRDD.foreach(println)
        sc.stop()
    }

    private def dataSkewRDDJoin(sc: SparkContext, listRDD1: RDD[(String, String)], listRDD2: RDD[(String, String)]) = {
        //假設listRDD1中的部分key有資料傾斜,所以我在進行join操作的時候,需要進行拆分計算
        //step 1 找到發生資料傾斜的key
        val dataSkewKeys = listRDD1.sample(false, 0.6).countByKey().toList.sortWith((t1, t2) => t1._2 > t2._2).take(1).map(t => t._1)
        println("通過sample運算元得到的可能發生資料傾斜的key:" + dataSkewKeys)
        //step 2 對listRDD1和listRDD2中的資料按照dataSkewKeys各拆分成兩個部分
        //step 2.1 講dataSkewKeys進行廣播
        val dskBC = sc.broadcast(dataSkewKeys)
        // step 2.2 進行拆分
        val dataSkewRDD1 = listRDD1.filter { case (word, value) => {
            //有資料傾斜的rdd--->dataskewRDD1
            val dsks = dskBC.value
            dsks.contains(word)
        }
        }

        val commonRDD1 = listRDD1.filter { case (word, value) => {
            //沒有資料傾斜的rdd--->commonRDD1
            val dsks = dskBC.value
            !dsks.contains(word)
        }
        }
        val dataSkewRDD2 = listRDD2.filter { case (word, value) => {
            //有資料傾斜的rdd--->dataskewRDD1
            val dsks = dskBC.value
            dsks.contains(word)
        }
        }

        val commonRDD2 = listRDD2.filter { case (word, value) => {
            //沒有資料傾斜的rdd--->commonRDD1
            val dsks = dskBC.value
            !dsks.contains(word)
        }
        }
      }
        //step 3 對dataskewRDD進行新增N以內隨機字首
        // step 3.1 新增隨機字首
        val prefixDSRDD1:RDD[(String, String)] = dataSkewRDD1.map { case (word, value) => {
            val random = new Random()
            val prefix = random.nextInt(2)
            (s"${prefix}_${word}", value)
        }
        }
        // step 3.2 另一個rdd進行擴容
        val prefixDSRDD2:RDD[(String, String)] = dataSkewRDD2.flatMap { case (word, value) => {
            val ab = ArrayBuffer[(String, String)]()
            for (i <- 0 until 2) {
                ab.append((s"${i}_${word}", value))
            }
            ab
        }
        }
        println("---->有資料傾斜RDD1新增字首成prefixDSRDD1的結果:" + prefixDSRDD1.collect().mkString(","))
        println("---->有資料傾斜RDD2擴容之後成prefixDSRDD2的結果:" + prefixDSRDD2.collect().mkString(","))

        // step 4 分步進行join操作
        // step 4.1 有資料傾斜的prefixDSRDD1和prefixDSRDD2進行join
        val prefixJoinDSRDD = prefixDSRDD1.join(prefixDSRDD2)
        //ste 4.2 無資料傾斜的commonRDD1和commonRDD2進行join
        val commonJoinRDD = commonRDD1.join(commonRDD2)
        // step 4.3 將隨機字首去除
        val dsJionRDD = prefixJoinDSRDD.map { case (word, (value1, value2)) => {
            (word.substring(2), (value1, value2))
        }
        }
        //step 5 將拆分進行join之後的結果進行union連線,得到最後的結果 sql union all
        val joinRDD = dsJionRDD.union(commonJoinRDD)
        joinRDD
    }
}

本博文參考至美團的spark調優https://tech.meituan.com/