1. 程式人生 > >Spark RDD排序運算元

Spark RDD排序運算元

RDD排序運算元有sortBy和sortByKey兩個算作,sortBy運算元可以自定義排序規則,而sortByKey只能對Key使用Scala或Spark預設支援的排序規則,如果Scala或Spark不支援排序規則的話,需要使用sortBy自己實現排序規則!

sortByKey的核心實現程式碼:

class OrderedRDDFunctions extends Logging with Serializable {
	
  private val ordering = implicitly[Ordering[K]] //核心程式碼1

  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] = self.withScope
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)//核心程式碼2
  }
   //省略其餘程式碼
}

以上程式碼我們可以看出,sortByKey是使用一個ordering隱式值進行排序的,所以說只要當前作用域存在Ordering[K]值就可以排序,但是由於sortByKey不支援傳入隱式值,所以只能使用scala和spark系統預設的隱式值,因此支援部分Key的排序!

接下來看一下sortBy的核心實現程式碼:

  /**
    * Return this RDD sorted by the given key function.
    */
  def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
                      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    
    this.keyBy[K](f).sortByKey(ascending, numPartitions).values
  }

由原始碼我們容易看出,sortBy實現底層使用的還是sortByKey,sortBy只不過是傳入了一個選key的函式,並將排序規則隱式值傳入spark隱式轉換系統,可以在排序時候找到自定義Key的Ordering[K]。

接下來實戰演示一下:

1、實戰一下Scala支援的排序型別Int:

  def getRandomList(n: Int): List[Int] = {

    var result: List[Int] = Nil

    while (result.length < n) {
      result = Random.nextInt(1000) :: result
    }

    result
  }

以上程式碼是生成一個n個大小的List序列,供我們生成RDD使用的輔助方法。
scala> val nums = sc.parallelize(getRandomList(10))
res41: Array[Int] = Array(246, 157, 15, 488, 212, 513, 293, 224, 373, 242)
scala> nums.sortBy(x=>x).collect//直接對Scala支援的排序型別直接使用sortBy排序
res45: Array[Int] = Array(15, 157, 212, 224, 242, 246, 293, 373, 488, 513)
直接對Scala支援的排序型別直接使用sortBy排序,不需要我們實現一個隱式值Ordering[Int],因為這個Scala隱式系統已經存在該值,在scala.math.Ordering中,實現程式碼如下:
  trait IntOrdering extends Ordering[Int] {
    def compare(x: Int, y: Int) =
      if (x < y) -1
      else if (x == y) 0
      else 1
  }
  implicit object Int extends IntOrdering
其他Long,Short等型別同理!

2、接下來對1中nums轉成元組KV形式之後排序:

對於元組排序,scala和spark提供了一些元組的Ordering隱式值,在scala.math.Ordering中。接下來我們講解一下二元組的隱式值:

  implicit def Tuple2[T1, T2](implicit ord1: Ordering[T1], ord2: Ordering[T2]): Ordering[(T1, T2)] =
    new Ordering[(T1, T2)]{
      def compare(x: (T1, T2), y: (T1, T2)): Int = {
        val compare1 = ord1.compare(x._1, y._1)
        if (compare1 != 0) return compare1
        val compare2 = ord2.compare(x._2, y._2)
        if (compare2 != 0) return compare2
        0
      }
    }
這個預設的二元組排序規則是先根據第一個元素排序,如果第一個元素相同在根據第二個元素排序!


如上是一個預設二元組排序隱式值,接下來我們就實踐一下二元組的排序:

scala> val pairs = nums.map(x => (x,x))//支援兩種排序方法sortBy和sortByKey
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at map at <console>:35
scala> pairs.sortBy(x=>x).collect //使用sortBy運算元必須指定排序的Key
res49: Array[(Int, Int)] = Array((15,15), (157,157), (212,212), (224,224), (242,242), (246,246), (293,293), (373,373), (488,488), (513,513))
此處我們指定排序的Key的規則函式是 x=>x,由於OrderedRDDFunctions存在Ordering[Int]所以不需要我們顯示指定一個隱式值。

我們也可以對pairs使用sortByKey進行排序:

scala> pairs.sortByKey().collect //此處我們不需要指定key和隱式值
res54: Array[(Int, Int)] = Array((15,15), (157,157), (212,212), (224,224), (242,242), (246,246), (293,293), (373,373), (488,488), (513,513))


3、接下來對自定義型別進行排序,需要我們自己實現Ordering:

當我們不顯示提供Dog的Ordering時候,會報錯提示沒有隱式值:

case class Dog(var age:Int)
val dogs = List(Dog(3), Dog(1), Dog(6), Dog(8))
scala> val dogsRDD = sc.parallelize(dogs)
dogsRDD: org.apache.spark.rdd.RDD[Dog] = ParallelCollectionRDD[97] at parallelize at <console>:35

scala> dogsRDD.map((_,1)).sortBy(x=>x._1)
<console>:31: error: No implicit Ordering defined for Dog.//報錯:沒有Dog的Ordering隱式值
Error occurred in an application involving default arguments.
       dogsRDD.map((_,1)).sortBy(x=>x._1)


當我們提供一個隱式值:

implicit object DogOrdering extends Ordering[Dog] {
  override def compare(e1:Dog, e2:Dog): Int = {
   (e1.age-e2.age).toInt
  }
}

再次排序:
scala> dogsRDD.map((_,1)).sortBy(x=>x._1).collect
res6: Array[(Dog, Int)] = Array((Dog(1),1), (Dog(3),1), (Dog(6),1), (Dog(8),1))
可以正確排序!

結論:當對Scala系統提供預設的Ordering值時候我們可以不顯示給定隱式值,但是如果不存在的話,需要我們在作用域中顯示給出!