1. 程式人生 > >spark中實現自定義排序

spark中實現自定義排序

排序的方式可以分為6中:

(1)使用一個自定義一個普通的類繼承Ordered[User] with Serializable

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CustomSort1 {

  //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CustomerSort1").setMaster("local[2]")
    val sc = new SparkContext(conf)
    //定義一個數組型別的值
    val user = Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    //轉換成RDD的型別
    val lines = sc.parallelize(user)
    //將整個字串切分為元組的形式
    val sorted: RDD[User] = lines.map(x => {
      val line = x.split(" ")
      val name = line(0)
      val age = line(1).toInt
      val face = line(2).toInt
      new User(name, age, face)
    })
    //實現自定義排序需要呼叫sortBy才可以自動呼叫自定義排序
    val r = sorted.sortBy(u=>u)
    println(r.collect().toBuffer)

  }
  class User(val name:String,val age:Int,val face:Int)extends Ordered[User] with Serializable{
    override def compare(that: User): Int = {
      if (this.face == that.face){
        this.age-that.age
      }else{
       - (this.face-that.face)
      }
    }
  override def toString: String = s"name :$name,age: $age,face:$face"
  }
}

(2)和上面的差不多隻是new 的位置是不太一樣的


import org.apache.spark.{SparkConf, SparkContext}

object CustomSort2 {
  //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CustomSort2").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val user = Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    val lines = sc.parallelize(user)
    val tpRdd=lines.map(x=>{
      val line = x.split(" ")
      val name = line(0)
      val age = line(1).toInt
      val face = line(2).toInt
     (name, age, face)
    })
    val sorted = tpRdd.sortBy(x=>new User1(x._2,x._3))
    sorted.foreach(println)
  }
  //這裡定義的引數必須新增型別,傳的引數只是自己需要比較的引數,沒有重寫toString()方法
  class User1(val age:Int,val face:Int)extends Ordered[User1] with Serializable {
    override def compare(that: User1): Int = {
      if (this.face == that.face){
        this.age-that.age
      }else{
        - (this.face-that.face)
      }
    }
  }
}

(3)使用了樣例類的方式此時可以不用實現序列化,並且輸出不能使用foreach,和並行度有關

import org.apache.spark.{SparkConf, SparkContext}

object CustomSort3 {
  //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("CustomSort3").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val user = Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    val lines = sc.parallelize(user)
    val tpRdd=lines.map(x=>{
      val line = x.split(" ")
      val name = line(0)
      val age = line(1).toInt
      val face = line(2).toInt
      (name, age, face)
    })
    val sorted = tpRdd.sortBy(x=> Man(x._2,x._3))
   // sorted.foreach(println)
   // sc.stop()
    //不能使用foreach
    println(sorted.collect().toBuffer)
  }
  //這裡定義的引數必須新增型別,傳的引數只是自己需要比較的引數,沒有重寫toString()方法
  case class Man(age:Int,face:Int)extends Ordered[Man]  {
    override def compare(that: Man): Int = {
      if (this.face == that.face){
        this.age-that.age
      }else{
        - (this.face-that.face)
      }
    }
  }

}

(4)利用隱式轉換的方式

利用隱式轉換時,類可以不實現Ordered的特質,普通的類或者普通的樣例類即可。
隱式轉換支援,隱式方法,隱式函式,隱式的object和隱式的變數,
如果都同時存在,優先使用隱式的object,隱式方法和隱式函式中,會優先使用隱式函式。
隱式轉換可以寫在任意地方(當前物件中,外部的類中,外部的物件中),如果寫在外部,需要匯入到當前的物件中即可。



import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  * 利用隱式轉換時,類可以不實現Ordered的特質,普通的類或者普通的樣例類即可。
    隱式轉換支援,隱式方法,  隱式函式,  隱式的object  和隱式的變數,
如果都同時存在,優先使用隱式的object,隱式方法和隱式函式中,會優先使用隱式函式。
隱式轉換可以寫在任意地方(當前物件中,外部的類中,外部的物件中),如果寫在外部,需要匯入到當前的物件中即可。
  */
object CustomSort4 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CustomSort4").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序
    val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    //將Driver端的資料並行化變成RDD
    val lines: RDD[String] = sc.parallelize(users)
    //切分整理資料
    val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    })

    //隱式的object方式
      implicit object OrderingXiaoRou extends Ordering[XianRou]{
        override def compare(x: XianRou, y: XianRou): Int = {
          if(x.fv == y.fv) {
            x.age - y.age
          } else {
            y.fv - x.fv
          }
        }
      }
    // 如果類沒有繼承 Ordered 特質
    // 可以利用隱式轉換  隱式方法  隱式函式  隱式值  隱式object都可以  implicit ord: Ordering[K]
    implicit def ordMethod(p: XianRou): Ordered[XianRou] = new Ordered[XianRou] {
      override def compare(that: XianRou): Int = {
        if (p.fv == that.fv) {
          -(p.age - that.age)
        } else {
          that.fv - p.fv
        }
      }
    }

    //利用隱式的函式方式
    implicit val ordFunc = (p: XianRou) => new Ordered[XianRou] {
      override def compare(that: XianRou): Int = {
        if (p.fv == that.fv) {
          -(p.age - that.age)
        } else {
          that.fv - p.fv
        }
      }
    }


    //排序(傳入了一個排序規則,不會改變資料的格式,只會改變順序)
    val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => XianRou(tp._2, tp._3))
    println(sorted.collect().toBuffer)
    sc.stop()
  }
}
case class XianRou(age: Int, fv: Int)

(5)利用Ordering的on方法

無需藉助任何的類或者物件

只需要利用Ordering特質的on方法即可。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CustomSort5 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    //將Driver端的資料並行化變成RDD
    val lines: RDD[String] = sc.parallelize(users)
    // 獲得的資料型別是 元組
    val prdd = lines.map(t => {
      val strings = t.split(" ")
      val name = strings(0)
      val age = strings(1).toInt
      val fv = strings(2).toInt
      (name, age, fv)
    })
    implicit  val obj = Ordering[(Int,Int)].on[(String,Int,Int)](t=>(-t._3,t._2))

    val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => t)
    sortedrd.foreach(println)
  }
}

(6)利用元組封裝排序條件

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CustomSort6 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("CustomSort5").setMaster("local[*]")
    val sc = new SparkContext(conf)
    //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序
    val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
    //將Driver端的資料並行化變成RDD
    val lines: RDD[String] = sc.parallelize(users)
    //切分整理資料
    val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)})
    //充分利用元組的比較規則,元組的比較規則:先比第一,相等再比第二個
    val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => (-tp._3, tp._2))
    println(sorted.collect().toBuffer)
    sc.stop()
  }
}

每天多努力一點