1. 程式人生 > >Spark:高階排序(二次排序)

Spark:高階排序(二次排序)

為了多維的排序,需要考慮多個條件,這要求我們自定義key

1 23
3 22
3 31
1 12
2 11
4 45

二、使用java實現

2.1、自定義key

使用scala.math.Ordered介面,實現Serializable介面

package com.chb.sparkDemo.secondarySort;

import java.io.Serializable;

import scala.math.Ordered;
/**
 * Spark 二次排序自定義key
 * 使用scala.math.Ordered介面
 * @author 12285
 */
public class MyKey implements Ordered<MyKey>, Serializable{
    private int firstKey;
    private int secondKey;

    public MyKey(int firstKey, int secondKey) {
        super();
        this.firstKey = firstKey;
        this.secondKey = secondKey;
    }   

    public int getFirstKey() {
        return firstKey;
    }
    public int getSecondKey() {
        return secondKey;
    }
    public void setFirstKey(int firstKey) {
        this.firstKey = firstKey;
    }
    public void setSecondKey(int secondKey) {
        this.secondKey = secondKey;
    }


    public boolean $greater(MyKey other) {
        if (this.getFirstKey() > other.getFirstKey()) {
            return true;
        }else if(this.getFirstKey() == other.getFirstKey() && this.getSecondKey() > other.getSecondKey()){
            return true;
        }else {
            return false;
        }
    }
    public boolean $greater$eq(MyKey other) {
        if ($greater(other) || this.getFirstKey()==other.getFirstKey() && this.getSecondKey() == other.getSecondKey()) {
            return true;
        }
        return false;
    }
    public boolean $less(MyKey other) {
        if (this.getFirstKey() < other.getFirstKey()) {
            return true;
        }else if(this.getFirstKey() == other.getFirstKey() && this.getSecondKey() < other.getSecondKey()){
            return true;
        }else {
            return false;
        }
    }
    public boolean $less$eq(MyKey other) {
        if ($less(other) || this.getFirstKey()==other.getFirstKey() && this.getSecondKey() == other.getSecondKey()) {
            return true;
        }
        return false;
    }
    public int compare(MyKey other) {
        if (this.getFirstKey() != other.getFirstKey()) {
            return this.getFirstKey()-other.getFirstKey();
        }else {
            return this.getSecondKey() - other.getSecondKey();
        }
    }
    public int compareTo(MyKey other) {
        if (this.getFirstKey() != other.getFirstKey()) {
            return this.getFirstKey()-other.getFirstKey();
        }else {
            return this.getSecondKey() - other.getSecondKey();
        }
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + firstKey;
        result = prime * result + secondKey;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        MyKey other = (MyKey) obj;
        if (firstKey != other.firstKey)
            return false;
        if (secondKey != other.secondKey)
            return false;
        return true;
    }
}

2.2、具體實現步驟

第一步: 自定義key 實現scala.math.Ordered介面,和Serializeable介面  第二步:將要進行二次排序的資料載入,按照<key,value>格式的RDD  第三步:使用sortByKey 基於自定義的key進行二次排序  第四步:去掉排序的key,只保留排序的結果

2.2.1、 第一步: 自定義key 實現scala.math.Ordered介面,和Serializeable介面

        JavaPairRDD<MyKey, String> mykeyPairs = lines.mapToPair(new PairFunction<String, MyKey, String>() {

            private static final long serialVersionUID = 1L;

            public Tuple2<MyKey, String> call(String line) throws Exception {
                int firstKey = Integer.valueOf(line.split(" ")[0]);
                int secondKey = Integer.valueOf(line.split(" ")[1]);
                MyKey mykey = new MyKey(firstKey, secondKey);
                return new Tuple2<MyKey, String>(mykey, line);
            }
        });

2.2.2、第三步:使用sortByKey 基於自定義的key進行二次排序

    JavaPairRDD<MyKey, String> sortPairs = mykeyPairs.sortByKey();

2.2.3、第四步:去掉排序的key,只保留排序的結果

JavaRDD<String> result = sortPairs.map(new Function<Tuple2<MyKey,String>, String>() {
            private static final long serialVersionUID = 1L;

            public String call(Tuple2<MyKey, String> tuple) throws Exception {
                return tuple._2;//line
            }
        });
        //列印排序好的結果
        result.foreach(new VoidFunction<String>() {

            private static final long serialVersionUID = 1L;

            public void call(String line) throws Exception {
                System.out.println(line);
            }
        });

三、完整程式碼

package com.chb.sparkDemo.secondarySort;

import io.netty.handler.codec.http.HttpContentEncoder.Result;

import java.awt.image.RescaleOp;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * Spark二次排序的具體實現步驟:
 * 第一步: 自定義key 實現scala.math.Ordered介面,和Serializeable介面
 * 第二步:將要進行二次排序的資料載入,按照<key,value>格式的RDD
 * 第三步:使用sortByKey 基於自定義的key進行二次排序
 * 第四步:去掉排序的key,只保留排序的結果
 * @author 12285
 *
 */
public class SecordSortTest {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount");
        //內部實際呼叫的SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //讀取檔案,將每行資料轉換為
        JavaRDD<String> lines = jsc.textFile("C:\\Users\\12285\\Desktop\\test");//hadoopRDD
        //第二步:將要進行二次排序的資料載入,按照<key,value>格式的RDD
        JavaPairRDD<MyKey, String> mykeyPairs = lines.mapToPair(new PairFunction<String, MyKey, String>() {

            private static final long serialVersionUID = 1L;

            public Tuple2<MyKey, String> call(String line) throws Exception {
                int firstKey = Integer.valueOf(line.split(" ")[0]);
                int secondKey = Integer.valueOf(line.split(" ")[1]);
                MyKey mykey = new MyKey(firstKey, secondKey);
                return new Tuple2<MyKey, String>(mykey, line);
            }
        });
        //第三步:使用sortByKey 基於自定義的key進行二次排序
        JavaPairRDD<MyKey, String> sortPairs = mykeyPairs.sortByKey();

        //第四步:去掉排序的key,只保留排序的結果

        JavaRDD<String> result = sortPairs.map(new Function<Tuple2<MyKey,String>, String>() {
            private static final long serialVersionUID = 1L;

            public String call(Tuple2<MyKey, String> tuple) throws Exception {
                return tuple._2;//line
            }
        });
        //列印排序好的結果
        result.foreach(new VoidFunction<String>() {

            private static final long serialVersionUID = 1L;

            public void call(String line) throws Exception {
                System.out.println(line);
            }
        });




    }
}


結果:
1 12
1 23
2 11
3 22
3 31
4 45

四、使用scala實現

4.1、自定義key


class SecordSortKey(val firstKey: Int, val secondKey: Int)extends Ordered[SecordSortKey] with Serializable{
    override def compare(that: SecordSortKey):Int = {
      if(this.firstKey != that.firstKey) {
        this.firstKey - that.firstKey
      }else {
        this.secondKey - that.secondKey
      }
    }  
  }

4.2、具體實現


import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SecordSortTest {
    def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setMaster("local[2]").setAppName("SecordSort")
      val sc = new SparkContext(conf);

      val lines = sc.textFile("C:\\Users\\12285\\Desktop\\test");
      //第二步:將要進行二次排序的資料載入,按照<key,value>格式的RDD
      val pairSortKey = lines.map { line => (
        new SecordSortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt),
        line
      ) };
      //第三步:使用sortByKey 基於自定義的key進行二次排序
     val sortPair = pairSortKey.sortByKey(false);

     val sortResult = sortPair.map(line=>line._2);

     sortResult.collect().foreach { x => print(x) };

    }
}