Spark2.x 如何實現自定義排序(利用元組,類--隱式轉換Ordering,Ordered等實現)
阿新 • • 發佈:2019-02-01
交流QQ: 824203453
需求:
對於有複雜排序條件的需求,可以利用自定義排序來實現,同時可以使用多種方案實現自定義排序需求。
對指定的資料(欄位分別為:名稱 年齡 顏值,資料以空格分割),按照指定的要求排序,排序要求為:根據顏值降序,如果顏值相同,再按照年齡升序排序。
示例資料:
"pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"
1.2 方案一:利用類或者樣例類來封裝資料
把資料封裝成類或者case class,然後類繼承Ordered[型別] ,然後可以自定義排序規則。
如果是class,需要實現序列化特質,Serializable,如果是case class,可以不實現該序列化特質。
這種處理方式,返回值型別是類的例項物件。
普通類:
objectSortDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val data: RDD[String] = sc.parallelize(List("wx 28 85 wx.zfb.com", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999")) // 獲得的資料型別是Person val prdd: RDD[Person] = data.map(t => { val strings = t.split(" ") val name = strings(0) val age = strings(1).toInt val fv = strings(2).toInt new Person(name, age, fv) }) // sortBy val sortedrd:RDD[Person] = prdd.sortBy(t => t) sortedrd.foreach(println) } } class Person(val name: String, val age: Int, val fv: Int) extends Serializable withOrdered[Person] { override def compare(that: Person): Int = { // 根據顏值降序 如果顏值相同 再按照年齡的升序 if (this.fv == that.fv) { this.age - that.age } else { that.fv - this.fv } } override def toString: String = s"$name,$age,$fv" }
樣例類:
objectSortDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val data: RDD[String] = sc.parallelize(List("wx 28 85 wx.zfb.com", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999")) // 獲得的資料型別是Person val prdd: RDD[Person] = data.map(t => { val strings = t.split(" ") val name = strings(0) val age = strings(1).toInt val fv = strings(2).toInt Person(name, age, fv) }) // sortBy val sortedrd:RDD[Person] = prdd.sortBy(t => t) sortedrd.foreach(println) } } case class Person(val name: String, val age: Int, val fv: Int) extends Ordered[Person] { override def compare(that: Person): Int = { // 根據顏值降序 如果顏值相同 再按照年齡的升序 if (this.fv == that.fv) { this.age - that.age } else { that.fv - this.fv } } override def toString: String = s"$name,$age,$fv" }
1.3 方案二:利用class或者case class指定排序規則
對原始資料不進行封裝,僅僅在排序的時候,利用class或者case class指定排序的規則。
如果使用類,需要繼承Ordered[型別],實現序列化特質,
如果使用case class,不需實現序列化特質。
返回值的結果型別:還是原來的資料型別。和類本身無關。僅僅是利用類的規則來實現了排序。
objectSortDemo2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))
// 獲得的資料型別是 元組
val prdd = data.map(t => {
val strings = t.split(" ")
val name = strings(0)
val age = strings(1).toInt
val fv = strings(2).toInt
(name, age, fv)
})
// sortBy 在排序的時候 指定使用類的規則 利用已經存在的來來指定排序規則
// val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => new Person2(t._1,t._2,t._3))
val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => Person2(t._1,t._2,t._3))
sortedrd.foreach(println)
}
}
/*
class Person2(val name: String, val age: Int, val fv: Int) extends Serializable with Ordered[Person2] {
override def compare(that: Person2): Int = {
// 根據顏值降序 如果顏值相同 再按照年齡的升序
if (this.fv == that.fv) {
this.age - that.age
} else {
that.fv - this.fv
}
}
override def toString: String = s"$name,$age,$fv"
}
*/
case class Person2(val name: String, val age: Int, val fv: Int) extends Ordered[Person2] {
override def compare(that: Person2): Int = {
// 根據顏值降序 如果顏值相同 再按照年齡的升序
if (this.fv == that.fv) {
this.age - that.age
} else {
that.fv - this.fv
}
}
override def toString: String = s"$name,$age,$fv"
}
1.4 方案三:利用隱式轉換
利用隱式轉換時,類可以不實現Ordered的特質,普通的類或者普通的樣例類即可。
隱式轉換支援,隱式方法,隱式函式,隱式的object和隱式的變數,
如果都同時存在,優先使用隱式的object,隱式方法和隱式函式中,會優先使用隱式函式。
隱式轉換可以寫在任意地方(當前物件中,外部的類中,外部的物件中),如果寫在外部,需要匯入到當前的物件中即可。
objectSortDemo3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))
// 獲得的資料型別是 元組
val prdd = data.map(t => {
val strings = t.split(" ")
val name = strings(0)
val age = strings(1).toInt
val fv = strings(2).toInt
(name, age, fv)
})
// 如果類沒有繼承 Ordered 特質
// 可以利用隱式轉換 隱式方法 隱式函式 隱式值 隱式object都可以 implicit ord: Ordering[K]
implicit def ordMethod(p: Person3): Ordered[Person3] = new Ordered[Person3] {
override def compare(that: Person3): Int = {
if (p.fv == that.fv) {
-(p.age - that.age)
} else {
that.fv - p.fv
}
}
}
implicit val ordFunc = (p: Person3) => new Ordered[Person3] {
override def compare(that: Person3): Int = {
if (p.fv == that.fv) {
-(p.age - that.age)
} else {
that.fv - p.fv
}
}
}
// 隱式的Object 優先順序更高
implicit object ord extends Ordering[Person3] {
override def compare(x: Person3, y: Person3): Int = {
if (x.fv == y.fv) {
x.age - y.age
} else {
y.fv - x.fv
}
}
}
// 隱式的變數
/* implicit val ord2: Ordering[Person3] = new Ordering[Person3] {
override def compare(x: Person3, y: Person3): Int = {
// 顏值相同 降序
if (x.fv == y.fv) {
- (x.age - y.age)
} else {
y.fv - x.fv
}
}
}*/
// 如果把隱式轉換寫在其他的object中,就使用import a._ 如果是寫在其他的類中,val obj = new 類() import obj._
val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => new Person3(t._1, t._2, t._3))
sortedrd.foreach(println)
}
}
class Person3(val name: String, val age: Int, val fv: Int) extends Serializable {
override def toString: String = s"$name,$age,$fv"
}
在隱式轉換中,Ordered和Ordering都是可以相互轉換的。
1.5 方案四:利用Ordering的on方法
無需藉助任何的類或者物件
只需要利用Ordering特質的on方法即可。
object SortDemo4 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val data: RDD[String] = sc.parallelize(List("wx 28 85", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))
// 獲得的資料型別是 元組
val prdd = data.map(t => {
val strings = t.split(" ")
val name = strings(0)
val age = strings(1).toInt
val fv = strings(2).toInt
(name, age, fv)
})
/** t=>(-t._3,t._2) 具體的排序的規則
* Ordering[T].on[U](f)
* U (String,Int,Int) 原始的資料型別
* T (Int,Int) 具體的函式的返回值的型別
*/
implicit val obj = Ordering[(Int,Int)].on[(String,Int,Int)](t=>(-t._3,t._2))
val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => t)
sortedrd.foreach(println)
}
}
1.6 方案五:利用元組封裝排序條件
最簡單的實現方案,直接利用元組來封裝要排序的條件,預設升序,降序使用-號即可
object SortDemo5 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val data: RDD[String] = sc.parallelize(List("wx 28 85 ", "pp 30 85", "dd 18 100", "taoge 35 100", "laozhao 30 120", "huge 26 9999"))
// 獲得的資料型別是 元組
val prdd = data.map(t => {
val strings = t.split(" ")
val name = strings(0)
val age = strings(1).toInt
val fv = strings(2).toInt
(name, age, fv)
})
// 利用元組直接封裝要排序的條件
val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => (-t._3,-t._2))
sortedrd.foreach(println)
}
}
交流QQ: 824203453