spark中實現自定義排序
阿新 • • 發佈:2018-11-11
排序的方式可以分為6中:
(1)使用一個自定義一個普通的類繼承Ordered[User] with Serializable
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object CustomSort1 { //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序 def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("CustomerSort1").setMaster("local[2]") val sc = new SparkContext(conf) //定義一個數組型別的值 val user = Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99") //轉換成RDD的型別 val lines = sc.parallelize(user) //將整個字串切分為元組的形式 val sorted: RDD[User] = lines.map(x => { val line = x.split(" ") val name = line(0) val age = line(1).toInt val face = line(2).toInt new User(name, age, face) }) //實現自定義排序需要呼叫sortBy才可以自動呼叫自定義排序 val r = sorted.sortBy(u=>u) println(r.collect().toBuffer) } class User(val name:String,val age:Int,val face:Int)extends Ordered[User] with Serializable{ override def compare(that: User): Int = { if (this.face == that.face){ this.age-that.age }else{ - (this.face-that.face) } } override def toString: String = s"name :$name,age: $age,face:$face" } }
(2)和上面的差不多隻是new 的位置是不太一樣的
import org.apache.spark.{SparkConf, SparkContext} object CustomSort2 { //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序 def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("CustomSort2").setMaster("local[2]") val sc = new SparkContext(conf) val user = Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99") val lines = sc.parallelize(user) val tpRdd=lines.map(x=>{ val line = x.split(" ") val name = line(0) val age = line(1).toInt val face = line(2).toInt (name, age, face) }) val sorted = tpRdd.sortBy(x=>new User1(x._2,x._3)) sorted.foreach(println) } //這裡定義的引數必須新增型別,傳的引數只是自己需要比較的引數,沒有重寫toString()方法 class User1(val age:Int,val face:Int)extends Ordered[User1] with Serializable { override def compare(that: User1): Int = { if (this.face == that.face){ this.age-that.age }else{ - (this.face-that.face) } } } }
(3)使用了樣例類的方式此時可以不用實現序列化,並且輸出不能使用foreach,和並行度有關
import org.apache.spark.{SparkConf, SparkContext} object CustomSort3 { //排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序 def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("CustomSort3").setMaster("local[2]") val sc = new SparkContext(conf) val user = Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99") val lines = sc.parallelize(user) val tpRdd=lines.map(x=>{ val line = x.split(" ") val name = line(0) val age = line(1).toInt val face = line(2).toInt (name, age, face) }) val sorted = tpRdd.sortBy(x=> Man(x._2,x._3)) // sorted.foreach(println) // sc.stop() //不能使用foreach println(sorted.collect().toBuffer) } //這裡定義的引數必須新增型別,傳的引數只是自己需要比較的引數,沒有重寫toString()方法 case class Man(age:Int,face:Int)extends Ordered[Man] { override def compare(that: Man): Int = { if (this.face == that.face){ this.age-that.age }else{ - (this.face-that.face) } } } }
(4)利用隱式轉換的方式
利用隱式轉換時,類可以不實現Ordered的特質,普通的類或者普通的樣例類即可。
隱式轉換支援,隱式方法,隱式函式,隱式的object和隱式的變數,
如果都同時存在,優先使用隱式的object,隱式方法和隱式函式中,會優先使用隱式函式。
隱式轉換可以寫在任意地方(當前物件中,外部的類中,外部的物件中),如果寫在外部,需要匯入到當前的物件中即可。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 利用隱式轉換時,類可以不實現Ordered的特質,普通的類或者普通的樣例類即可。
隱式轉換支援,隱式方法, 隱式函式, 隱式的object 和隱式的變數,
如果都同時存在,優先使用隱式的object,隱式方法和隱式函式中,會優先使用隱式函式。
隱式轉換可以寫在任意地方(當前物件中,外部的類中,外部的物件中),如果寫在外部,需要匯入到當前的物件中即可。
*/
object CustomSort4 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CustomSort4").setMaster("local[*]")
val sc = new SparkContext(conf)
//排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序
val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
//將Driver端的資料並行化變成RDD
val lines: RDD[String] = sc.parallelize(users)
//切分整理資料
val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
val fields = line.split(" ")
val name = fields(0)
val age = fields(1).toInt
val fv = fields(2).toInt
(name, age, fv)
})
//隱式的object方式
implicit object OrderingXiaoRou extends Ordering[XianRou]{
override def compare(x: XianRou, y: XianRou): Int = {
if(x.fv == y.fv) {
x.age - y.age
} else {
y.fv - x.fv
}
}
}
// 如果類沒有繼承 Ordered 特質
// 可以利用隱式轉換 隱式方法 隱式函式 隱式值 隱式object都可以 implicit ord: Ordering[K]
implicit def ordMethod(p: XianRou): Ordered[XianRou] = new Ordered[XianRou] {
override def compare(that: XianRou): Int = {
if (p.fv == that.fv) {
-(p.age - that.age)
} else {
that.fv - p.fv
}
}
}
//利用隱式的函式方式
implicit val ordFunc = (p: XianRou) => new Ordered[XianRou] {
override def compare(that: XianRou): Int = {
if (p.fv == that.fv) {
-(p.age - that.age)
} else {
that.fv - p.fv
}
}
}
//排序(傳入了一個排序規則,不會改變資料的格式,只會改變順序)
val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => XianRou(tp._2, tp._3))
println(sorted.collect().toBuffer)
sc.stop()
}
}
case class XianRou(age: Int, fv: Int)
(5)利用Ordering的on方法
無需藉助任何的類或者物件
只需要利用Ordering特質的on方法即可。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CustomSort5 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
//將Driver端的資料並行化變成RDD
val lines: RDD[String] = sc.parallelize(users)
// 獲得的資料型別是 元組
val prdd = lines.map(t => {
val strings = t.split(" ")
val name = strings(0)
val age = strings(1).toInt
val fv = strings(2).toInt
(name, age, fv)
})
implicit val obj = Ordering[(Int,Int)].on[(String,Int,Int)](t=>(-t._3,t._2))
val sortedrd: RDD[(String, Int, Int)] = prdd.sortBy(t => t)
sortedrd.foreach(println)
}
}
(6)利用元組封裝排序條件
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CustomSort6 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CustomSort5").setMaster("local[*]")
val sc = new SparkContext(conf)
//排序規則:首先按照顏值的降序,如果顏值相等,再按照年齡的升序
val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
//將Driver端的資料並行化變成RDD
val lines: RDD[String] = sc.parallelize(users)
//切分整理資料
val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
val fields = line.split(" ")
val name = fields(0)
val age = fields(1).toInt
val fv = fields(2).toInt
(name, age, fv)})
//充分利用元組的比較規則,元組的比較規則:先比第一,相等再比第二個
val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => (-tp._3, tp._2))
println(sorted.collect().toBuffer)
sc.stop()
}
}
每天多努力一點