1. 程式人生 > >Spark2.x 如何實現自定義排序(利用元組,類--隱式轉換Ordering,Ordered等實現)

Spark2.x 如何實現自定義排序(利用元組,類--隱式轉換Ordering,Ordered等實現)

交流QQ: 824203453

需求:

對於有複雜排序條件的需求,可以利用自定義排序來實現,同時可以使用多種方案實現自定義排序需求。

對指定的資料(欄位分別為:名稱 年齡 顏值,資料以空格分割),按照指定的要求排序,排序要求為:根據顏值降序,如果顏值相同,再按照年齡升序排序。

示例資料:
"pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"

1.2  方案一:利用類或者樣例類來封裝資料

把資料封裝成類或者case class,然後類繼承Ordered[型別] ,然後可以自定義排序規則。

如果是class,需要實現序列化特質,Serializable,如果是case class,可以不實現該序列化特質。
這種處理方式,返回值型別是類的例項物件。
普通類:
objectSortDemo1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    val data: RDD[String] = sc.parallelize(List("wx 28 85 wx.zfb.com", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))
    // 獲得的資料型別是Person
    val prdd: RDD[Person] = data.map(t => {
      val strings = t.split(" ")
      val name = strings(0)
      val age = strings(1).toInt
      val fv = strings(2).toInt
      new  Person(name, age, fv)
    })

    // sortBy
    val sortedrd:RDD[Person] = prdd.sortBy(t => t)
    sortedrd.foreach(println)
  }
}

class Person(val name: String, val age: Int, val fv: Int) extends Serializable withOrdered[Person] {
  override def compare(that: Person): Int = {
    //    根據顏值降序   如果顏值相同  再按照年齡的升序
    if (this.fv == that.fv) {
      this.age - that.age
    } else {
      that.fv - this.fv
    }
  }
  override def toString: String = s"$name,$age,$fv"
}

樣例類:

objectSortDemo1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    val data: RDD[String] = sc.parallelize(List("wx 28 85 wx.zfb.com", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))
    // 獲得的資料型別是Person
    val prdd: RDD[Person] = data.map(t => {
      val strings = t.split(" ")
      val name = strings(0)
      val age = strings(1).toInt
      val fv = strings(2).toInt
      Person(name, age, fv)
    })
    // sortBy
    val sortedrd:RDD[Person] = prdd.sortBy(t => t)
    sortedrd.foreach(println)
  }
}

case class Person(val name: String, val age: Int, val fv: Int) extends Ordered[Person] {
  override def compare(that: Person): Int = {
    //    根據顏值降序   如果顏值相同  再按照年齡的升序
    if (this.fv == that.fv) {
      this.age - that.age
    } else {
      that.fv - this.fv
    }
  }
  override def toString: String = s"$name,$age,$fv"
}

1.3  方案二:利用class或者case class指定排序規則

對原始資料不進行封裝,僅僅在排序的時候,利用class或者case class指定排序的規則。
如果使用類,需要繼承Ordered[型別],實現序列化特質,
如果使用case class,不需實現序列化特質。
返回值的結果型別:還是原來的資料型別。和類本身無關。僅僅是利用類的規則來實現了排序。
objectSortDemo2 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))

    // 獲得的資料型別是 元組
    val prdd = data.map(t => {
      val strings = t.split(" ")
      val name = strings(0)
      val age = strings(1).toInt
      val fv = strings(2).toInt
      (name, age, fv)
    })

    // sortBy  在排序的時候 指定使用類的規則   利用已經存在的來來指定排序規則
//    val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => new Person2(t._1,t._2,t._3))
    val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => Person2(t._1,t._2,t._3))

    sortedrd.foreach(println)
  }
}
/*
class Person2(val name: String, val age: Int, val fv: Int) extends Serializable with Ordered[Person2] {

  override def compare(that: Person2): Int = {
    //    根據顏值降序   如果顏值相同  再按照年齡的升序
    if (this.fv == that.fv) {
      this.age - that.age
    } else {
      that.fv - this.fv
    }
  }
  override def toString: String = s"$name,$age,$fv"
}
*/
case class Person2(val name: String, val age: Int, val fv: Int) extends  Ordered[Person2] {
  override def compare(that: Person2): Int = {
    //    根據顏值降序   如果顏值相同  再按照年齡的升序
    if (this.fv == that.fv) {
      this.age - that.age
    } else {
      that.fv - this.fv
    }
  }
  override def toString: String = s"$name,$age,$fv"
}

1.4  方案三:利用隱式轉換

利用隱式轉換時,類可以不實現Ordered的特質,普通的類或者普通的樣例類即可。
隱式轉換支援,隱式方法,隱式函式,隱式的object和隱式的變數,
如果都同時存在,優先使用隱式的object,隱式方法和隱式函式中,會優先使用隱式函式。
隱式轉換可以寫在任意地方(當前物件中,外部的類中,外部的物件中),如果寫在外部,需要匯入到當前的物件中即可。
objectSortDemo3 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))

    // 獲得的資料型別是 元組
    val prdd = data.map(t => {
      val strings = t.split(" ")
      val name = strings(0)
      val age = strings(1).toInt
      val fv = strings(2).toInt
      (name, age, fv)
    })

    // 如果類沒有繼承 Ordered 特質
    // 可以利用隱式轉換  隱式方法  隱式函式  隱式值  隱式object都可以  implicit ord: Ordering[K]
    implicit def ordMethod(p: Person3): Ordered[Person3] = new Ordered[Person3] {
      override def compare(that: Person3): Int = {
        if (p.fv == that.fv) {
          -(p.age - that.age)
        } else {
          that.fv - p.fv
        }
      }
    }

    implicit val ordFunc = (p: Person3) => new Ordered[Person3] {
      override def compare(that: Person3): Int = {
        if (p.fv == that.fv) {
          -(p.age - that.age)
        } else {
          that.fv - p.fv
        }
      }
    }
    // 隱式的Object  優先順序更高
    implicit object ord extends Ordering[Person3] {
      override def compare(x: Person3, y: Person3): Int = {
        if (x.fv == y.fv) {
          x.age - y.age
        } else {
          y.fv - x.fv
        }
      }
    }

    //    隱式的變數
    /*   implicit val ord2: Ordering[Person3] = new Ordering[Person3] {
         override def compare(x: Person3, y: Person3): Int = {
           // 顏值相同  降序
           if (x.fv == y.fv) {
              - (x.age - y.age)
           } else {
             y.fv - x.fv
           }
         }
       }*/
    //  如果把隱式轉換寫在其他的object中,就使用import a._  如果是寫在其他的類中,val obj = new 類()  import obj._
    val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => new Person3(t._1, t._2, t._3))

    sortedrd.foreach(println)
  }
}
class Person3(val name: String, val age: Int, val fv: Int) extends Serializable {
  override def toString: String = s"$name,$age,$fv"
}

在隱式轉換中,Ordered和Ordering都是可以相互轉換的。

1.5  方案四:利用Ordering的on方法

無需藉助任何的類或者物件

只需要利用Ordering特質的on方法即可。

object SortDemo4 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    val data: RDD[String] = sc.parallelize(List("wx 28 85", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))

    // 獲得的資料型別是 元組
    val prdd = data.map(t => {
      val strings = t.split(" ")
      val name = strings(0)
      val age = strings(1).toInt
      val fv = strings(2).toInt
      (name, age, fv)
    })

    /** t=>(-t._3,t._2)  具體的排序的規則
      * Ordering[T].on[U](f)
      * U   (String,Int,Int)  原始的資料型別
      * T    (Int,Int)        具體的函式的返回值的型別
      */
    implicit  val obj = Ordering[(Int,Int)].on[(String,Int,Int)](t=>(-t._3,t._2))

    val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => t)
    sortedrd.foreach(println)
  }
}

1.6 方案五:利用元組封裝排序條件

最簡單的實現方案,直接利用元組來封裝要排序的條件,預設升序,降序使用-號即可

object SortDemo5 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))

    // 獲得的資料型別是 元組
    val prdd = data.map(t => {
      val strings = t.split(" ")
      val name = strings(0)
      val age = strings(1).toInt
      val fv = strings(2).toInt
      (name, age, fv)
    })

    // 利用元組直接封裝要排序的條件
    val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => (-t._3,-t._2))
    sortedrd.foreach(println)
  }
}

交流QQ: 824203453