1. 程式人生 > >對於sortBy運算元的一些理解

對於sortBy運算元的一些理解

package day03

import org.apache.spark.{SparkConf, SparkContext}

/**
  * RanagePartitioner採用的是"水塘抽樣"演算法,可以對資料進行相對均勻的分配
  * 原理是,.在分配之前對資料進行抽樣
  *
  * 與hashPartitioner不同的是,hashPartitioner是根據key取HashCode值進行分割槽,但是這種分割槽方式
  * 不能均勻的分配資料,是Spark的預設分割槽方式
  *
  * RanagePartitioner:可以根據範圍進行分割槽,可以均勻的分配資料,分割槽內有序,全域性無序
  * 如果需要實現全域性有序,需要將資料收集到Driver端進行輸出
  */
object RanagePartitioner {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RanagePartitioner").setMaster("local[*]")
    val sc =new SparkContext(conf)

    val list =Array(1,22,3,4,5,90,66,55,87,34,22,99)
    val numbers=sc.parallelize(list)

    //區域性有序,全域性有序
    val result =numbers.sortBy(x=>x)
    //定義一個函式變數
    val func =(index:Int,it:Iterator[Int])=>{
      println(s"index:${index}, ele:${it.toList.mkString(",")}")
      it
    }
    val result2=result.mapPartitionsWithIndex(func)
    //收集到Driver端
    result2.collect().foreach(println(_))
    sc.stop()
  }
}