1. 程式人生 > >Spark高級排序與TopN問題揭密

Spark高級排序與TopN問題揭密

大數據 Spark

[TOC]


引入

前面進行過wordcount的單詞統計例子,關鍵是,如何對統計的單詞按照單詞個數來進行排序?

如下:

scala> val retRDD = sc.textFile("hdfs://ns1/hello").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

scala> val retSortRDD = retRDD.map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1))

scala> retSortRDD.collect().foreach(println)
...
(hello,3)
(me,1)
(you,1)
(he,1)

下面的測試都需要引入maven的依賴

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.10.5</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

Spark二次排序

測試數據與說明

需要進行二次排序的數據格式如下:

field_1‘ ‘field_2(使用空格分割)
 20 21
 50 51
 50 52
 50 53
 50 54
 60 51
 60 53
 60 52
 60 56
 60 57
 70 58
 60 61
 70 54

思路下面的代碼註釋會有詳細的說明,這裏要指出的是,在下面的排序過程中,分別使用Java和Scala進行排序的操作,並且:

  • Java版本
    • 方式1:使元素具備比較性--->需要使用SecondarySort對象
    • 方式2:提供比較器--->需要使用SecondarySort對象
    • 不管使用哪一種方式,都需要使用一個新的變量對象SecondarySort
  • Scala版本
    • 方式1:使元素具備比較性,其實就是Java版本方式1的scala實現--->需要使用SecondarySort對象
    • 方式2:使用sortBy的第一種方式,基於原始的數據進行排序--->不需要使用SecondarySort對象
    • 方式3:使用sortBy的第二種方式,將原始數據進行轉換--->需要使用SecondarySort對象

所以這個二次排序的例子包含Java和Scala總共5個版本的實現,非常有價值!

公共對象

其實就是SecondarySort對象,如下:

package cn.xpleaf.bigdata.spark.java.core.domain;

import scala.Serializable;

public class SecondarySort implements Comparable<SecondarySort>, Serializable {
    private int first;
    private int second;

    public SecondarySort(int first, int second) {
        this.first = first;
        this.second = second;
    }

    public int getFirst() {
        return first;
    }

    public void setFirst(int first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }

    @Override
    public int compareTo(SecondarySort that) {
        int ret = this.getFirst()  - that.getFirst();
        if(ret == 0) {
            ret = that.getSecond() - this.getSecond();
        }
        return ret;
    }

    @Override
    public String toString() {
        return this.first + " " + this.second;
    }
}

Java版本

測試代碼如下:

package cn.xpleaf.bigdata.spark.java.core.p3;

import cn.xpleaf.bigdata.spark.java.core.domain.SecondarySort;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Serializable;
import scala.Tuple2;

import java.util.Comparator;

/**
 * Java 版本的二次排序
 *   field_1‘ ‘field_2(使用空格分割)
 *   20 21
 50 51
 50 52
 50 53
 50 54
 60 51
 60 53
 60 52
 60 56
 60 57
 70 58
 60 61
 70 54
 需求:首先按照第一列升序排序,如果第一列相等,按照第二列降序排序
 分析:要排序的話,使用sortByKey,也可以使用sortBy
 如果用sortByKey的話,只能按照key來排序,現在的是用第一列做key?還是第二列?
 根據需求,只能使用復合key(既包含第一列,也包含第二列),因為要進行比較,所以該復合key必須具備比較性,要麽該操作提供一個比較器
 問題是查看該操作的時候,並沒有給我們提供比較器,沒得選只能讓元素具備比較性

 使用自定義的對象 可以使用comprable接口
 */
public class _01SparkSecondarySortOps {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSecondarySortOps.class.getSimpleName());
        JavaSparkContext jsc = new JavaSparkContext(conf);

        JavaRDD<String> linesRDD = jsc.textFile("D:/data/spark/secondsort.csv");
        JavaPairRDD<SecondarySort, String> ssRDD = linesRDD.mapToPair(new PairFunction<String, SecondarySort, String>() {
            @Override
            public Tuple2<SecondarySort, String> call(String line) throws Exception {
                String[] fields = line.split(" ");
                int first = Integer.valueOf(fields[0].trim());
                int second = Integer.valueOf(fields[1].trim());
                SecondarySort ss = new SecondarySort(first, second);
                return new Tuple2<SecondarySort, String>(ss, "");
            }
        });
        /*
        // 第一種方式:使元素具備比較性
        JavaPairRDD<SecondarySort, String> sbkRDD = ssRDD.sortByKey(true, 1);   // 設置partition為1,這樣數據才整體有序,否則只是partition中有序
        */

        /**
         * 第二種方式,提供比較器
         *      與前面方式相反,這次是:第一列降序,第二列升序
         */
        JavaPairRDD<SecondarySort, String> sbkRDD = ssRDD.sortByKey(new MyComparator<SecondarySort>() {
            @Override
            public int compare(SecondarySort o1, SecondarySort o2) {
                int ret = o2.getFirst() - o1.getFirst();
                if(ret == 0) {
                    ret = o1.getSecond() - o2.getSecond();
                }
                return ret;
            }
        }, true, 1);

        sbkRDD.foreach(new VoidFunction<Tuple2<SecondarySort, String>>() {
            @Override
            public void call(Tuple2<SecondarySort, String> tuple2) throws Exception {
                System.out.println(tuple2._1);
            }
        });

        jsc.close();
    }
}

/**
 * 做一個中間的過渡接口
 * 比較需要實現序列化接口,否則也會報異常
 * 是用到了適配器Adapter模式
 * 適配器模式(Adapter Pattern)是作為兩個不兼容的接口之間的橋梁,這裏就是非常好的體現了。
 */
interface MyComparator<T> extends Comparator<T>, Serializable{}

輸出結果如下:

740 58
730 54
530 54
203 21
74 58
73 57
71 55
71 56
70 54
70 55
70 56
70 57
70 58
70 58
63 61
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
40 511
31 42
20 21
20 53
20 522
12 211
7 8
7 82
5 6
3 4
1 2

Scala版本

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.core.p3

import cn.xpleaf.bigdata.spark.java.core.domain.SecondarySort
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.reflect.ClassTag

object _05SparkSecondarySortOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkSecondarySortOps.getClass.getSimpleName)
        val sc = new SparkContext(conf)

        val linesRDD = sc.textFile("D:/data/spark/secondsort.csv")

        /*
        val ssRDD:RDD[(SecondarySort, String)] = linesRDD.map(line => {
            val fields = line.split(" ")
            val first = Integer.valueOf(fields(0).trim())
            val second = Integer.valueOf(fields(1).trim())
            val ss = new SecondarySort(first, second)
            (ss, "")
        })

        // 第一種方式,使用元素具備比較性
        val sbkRDD:RDD[(SecondarySort, String)] = ssRDD.sortByKey(true, 1)

        sbkRDD.foreach{case (ss:SecondarySort, str:String) => { // 使用模式匹配的方式
            println(ss)
        }}
        */

        /*
        // 使用sortBy的第一種方式,基於原始的數據
        val retRDD = linesRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] {
            override def compare(x: String, y: String): Int = {
                val xFields = x.split(" ")
                val yFields = y.split(" ")

                var ret = xFields(0).toInt - yFields(0).toInt
                if(ret == 0) {
                    ret = yFields(1).toInt - xFields(1).toInt
                }
                ret
            }
        }, ClassTag.Object.asInstanceOf[ClassTag[String]])
        */

        // 使用sortBy的第二種方式,將原始數據做轉換--->sortBy()第一個參數的作用,就是做數據的轉換
        val retRDD:RDD[String] = linesRDD.sortBy(line => {
            // f: (T) => K
            // 這裏T的類型為String,K是SecondarySort類型
            val fields = line.split(" ")
            val first = Integer.valueOf(fields(0).trim())
            val second = Integer.valueOf(fields(1).trim())
            val ss = new SecondarySort(first, second)
            ss
        }, true, 1)(new Ordering[SecondarySort] {
            override def compare(x: SecondarySort, y: SecondarySort): Int = {
                var ret = x.getFirst - y.getFirst
                if(ret == 0) {
                    ret = y.getSecond - x.getSecond
                }
                ret
            }
        }, ClassTag.Object.asInstanceOf[ClassTag[SecondarySort]])

        retRDD.foreach(println)

        sc.stop()
    }
}

輸出結果如下:

1 2
3 4
5 6
7 82
7 8
12 211
20 522
20 53
20 21
31 42
40 511
50 522
50 512
50 62
50 54
50 53
50 53
50 52
50 51
60 61
60 57
60 57
60 56
60 56
60 53
60 52
60 51
63 61
70 58
70 58
70 57
70 56
70 55
70 54
71 56
71 55
73 57
74 58
203 21
530 54
730 54
740 58

TopN問題

需求與說明

需求與數據說明如下:

  * TopN問題的說明:
  *     TopN問題顯然是可以使用action算子take來完成,但是因為take需要將所有數據都拉取到Driver上才能完成操作,
  *     所以Driver的內存壓力非常大,不建議使用take.
  *
  * 這裏要進行TopN問題的分析,數據及需求如下:
  * chinese ls 91
  * english ww 56
  * chinese zs 90
  * chinese zl 76
  * english zq 88
  * chinese wb 95
  * chinese sj 74
  * english ts 87
  * english ys 67
  * english mz 77
  * chinese yj 98
  * english gk 96
  *
  * 需求:排出每個科目的前三名

下面分別使用性能很低的groupByKey和性能很好的combineByKey來進行操作,詳細的說明已經在代碼中給出,註意其思想非常重要,尤其是使用combineByKey來解決groupByKey出現的性能問題,有興趣的話,可以好好閱讀一下代碼,以及其所體現的思想,因為這都跟Spark本身的理論緊密相關。

使用groupByKey解決

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.core.p3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
  * TopN問題的說明:
  *     TopN問題顯然是可以使用action算子take來完成,但是因為take需要將所有數據都拉取到Driver上才能完成操作,
  *     所以Driver的內存壓力非常大,不建議使用take.
  *
  * 這裏要進行TopN問題的分析,數據及需求如下:
  * chinese ls 91
  * english ww 56
  * chinese zs 90
  * chinese zl 76
  * english zq 88
  * chinese wb 95
  * chinese sj 74
  * english ts 87
  * english ys 67
  * english mz 77
  * chinese yj 98
  * english gk 96
  *
  * 需求:排出每個科目的前三名
  *
  * 思路:先進行map操作轉換為(subject, name + score)的元組
  * 再根據subject這個key進行groupByKey,這樣就可以得到gbkRDD
  * 之後再對其進行map操作,在map操作中使用treeSet得到前三名(既能控制大小,又能進行排序)
  *
  * 問題:
  * 上面的方案在生產過程中慎用
  * 因為,執行groupByKey,會將key相同的數據都拉取到同一個partition中,再執行操作,
  * 拉取的過程是shuffle,是分布式性能殺手!再一個,如果key對應的數據過多,很有可能造成數據傾斜,或者OOM,
  * 那麽就需要盡量的避免這種操作方式。
  * 那如何做到?可以參考MR中TopN問題的思想,MR中,是在每個map task中對數據進行篩選,雖然最後還是需要shuffle到一個節點上,但是數據量會大大減少。
  * Spark中參考其中的思想,就是可以在每個partition中對數據進行篩選,然後再對各個分區篩選出來的數據進行合並,再做一次排序,從而得到最終排序的結果。
  * 顯然,這樣就可以解決前面說的數據到同一個partition中導致數據量過大的問題!因為分區篩選的工作已經可以大大減少數據量。
  * 那麽在Spark中有什麽算子可以做到這一點呢?那就是combineByKey或者aggregateByKey,其具體的用法可以參考我前面的博客文章,這裏我使用combineByKey來操作。
  */
object _06SparkTopNOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_06SparkTopNOps.getClass.getSimpleName())
        val sc = new SparkContext(conf)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
        Logger.getLogger("org.spark_project").setLevel(Level.OFF)

        // 1.轉換為linesRDD
        val linesRDD:RDD[String] = sc.textFile("D:/data/spark/topn.txt")

        // 2.轉換為pairsRDD
        val pairsRDD:RDD[(String, String)] = linesRDD.map(line => {
            val fields = line.split(" ")
            val subject = fields(0).trim()
            val name = fields(1).trim()
            val score = fields(2).trim()
            (subject, name + " " + score)   // ("chinese", "zs 90")
        })

        // 3.轉換為gbkRDD
        val gbkRDD:RDD[(String, Iterable[String])] = pairsRDD.groupByKey()
        println("==========TopN前==========")
        gbkRDD.foreach(println)
        // (english,CompactBuffer(ww 56, zq 88, ts 87, ys 67, mz 77, gk 96))
        // (chinese,CompactBuffer(ls 91, zs 90, zl 76, wb 95, sj 74, yj 98))

        // 4.轉換為retRDD
        val retRDD:RDD[(String, Iterable[String])] = gbkRDD.map(tuple => {
            var ts = new mutable.TreeSet[String]()(new MyOrdering())
            val subject = tuple._1          // chinese
            val nameScores = tuple._2       //  ("ls 91", "ww 56", "zs 90", ...)
            for(nameScore <- nameScores) {  // 遍歷每一份成績"ls 91"
                // 添加到treeSet中
                ts.add(nameScore)
                if(ts.size > 3) {   // 如果大小大於3,則彈出最後一份成績
                    ts = ts.dropRight(1)
                }
            }
            (subject, ts)
        })

        println("==========TopN後==========")
        retRDD.foreach(println)

        sc.stop()
    }
}

// gbkRDD.map中用於排序的treeSet的排序比較規則,根據需求,應該為降序
class MyOrdering extends Ordering[String] {
    override def compare(x: String, y: String): Int = {
        // x或者y的格式為:"zs 90"
        val xFields = x.split(" ")
        val yFields = y.split(" ")
        val xScore = xFields(1).toInt
        val yScore = yFields(1).toInt
        val ret = yScore - xScore
        ret
    }
}

輸出結果如下:

==========TopN前==========
(chinese,CompactBuffer(ls 91, zs 90, zl 76, wb 95, sj 74, yj 98))
(english,CompactBuffer(ww 56, zq 88, ts 87, ys 67, mz 77, gk 96))
==========TopN後==========
(chinese,TreeSet(yj 98, wb 95, ls 91))
(english,TreeSet(gk 96, zq 88, ts 87))

使用combineByKey解決

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.core.p3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

/**
  * 使用combineByKey算子來優化前面的TopN問題
  * 關於combineByKey算子的使用,可以參考我的博客文章,上面有非常詳細的例子
  * 一定要掌握,因為非常重要
  */
object _07SparkTopNOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_07SparkTopNOps.getClass().getSimpleName())
        val sc = new SparkContext(conf)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
        Logger.getLogger("org.spark_project").setLevel(Level.OFF)

        // 1.轉換為linesRDD
        val linesRDD:RDD[String] = sc.textFile("D:/data/spark/topn.txt")

        // 2.轉換為pairsRDD
        val pairsRDD:RDD[(String, String)] = linesRDD.map(line => {
            val fields = line.split(" ")
            val subject = fields(0).trim()
            val name = fields(1).trim()
            val score = fields(2).trim()
            (subject, name + " " + score)   // ("chinese", "zs 90")
        })

        println("==========TopN前==========")
        pairsRDD.foreach(println)
        // (chinese,sj 74)
        // (chinese,ls 91)
        // (english,ts 87)
        // (english,ww 56)
        // (english,ys 67)
        // (chinese,zs 90)
        // (english,mz 77)
        // (chinese,zl 76)
        // (chinese,yj 98)
        // (english,zq 88)
        // (english,gk 96)
        // (chinese,wb 95)

        // 3.轉換為cbkRDD
        val cbkRDD:RDD[(String, mutable.TreeSet[String])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)

        println("==========TopN後==========")
        cbkRDD.foreach(println)
        // (chinese,TreeSet(yj 98, wb 95, ls 91))
        // (english,TreeSet(gk 96, zq 88, ts 87))
    }

    // 創建一個容器,這裏返回一個treeSet,作為每個分區中相同key的value的容器
    def createCombiner(nameScore: String):mutable.TreeSet[String] = {
        // nameScore格式為:"zs 90"
        // 指定排序規則MyOrdering,為降序排序
        val ts = new mutable.TreeSet[String]()(new MyOrdering())
        ts.add(nameScore)
        ts
    }

    // 合並分區中key相同的value,同時使用treeSet來進行排序
    def mergeValue(ts:mutable.TreeSet[String], nameScore:String):mutable.TreeSet[String] = {
        ts.add(nameScore)
        if(ts.size > 3) {   // 如果超過3個,刪除一個再返回
            ts.dropRight(1) // scala中的集合進行操作後,本身不變,但是會返回一個新的集合
        }
        ts
    }

    // 合並不同分區中key相同的value集合,同時使用treeSet來進行排序
    def mergeCombiners(ts1:mutable.TreeSet[String], ts2:mutable.TreeSet[String]):mutable.TreeSet[String] = {
        var newTS = new mutable.TreeSet[String]()(new MyOrdering())
        // 將分區1中集合的value添加到新的treeSet中,同時進行排序和控制大小
        for(nameScore <- ts1) {
            newTS.add(nameScore)
            if(newTS.size > 3) {    // 如果數量大於3,則刪除一個後再賦值給本身
                newTS = newTS.dropRight(1)
            }
        }
        // 將分區2中集合的value添加到新的treeSet中,同時進行排序和控制大小
        for(nameScore <- ts2) {
            newTS.add(nameScore)
            if(newTS.size > 3) {    // 如果數量大於3,則刪除一個後再賦值給本身
                newTS = newTS.dropRight(1)
            }
        }
        newTS
    }

}

輸出結果如下:

==========TopN前==========
(chinese,ls 91)
(chinese,sj 74)
(english,ww 56)
(english,ts 87)
(chinese,zs 90)
(english,ys 67)
(chinese,zl 76)
(english,mz 77)
(english,zq 88)
(chinese,yj 98)
(chinese,wb 95)
(english,gk 96)
==========TopN後==========
(english,TreeSet(gk 96, zq 88, ts 87))
(chinese,TreeSet(yj 98, wb 95, ls 91))

Spark高級排序與TopN問題揭密