1. 程式人生 > >《深入理解Spark》Spark自定義分割槽器

《深入理解Spark》Spark自定義分割槽器

package com.lyzx.reviewDay27

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

class T1 {
  /**
    * Spark自定義分割槽器
    * @param sc
    */
  def f1(sc:SparkContext):Unit ={
    val rdd = sc.parallelize(List(99,88,200,10,900,1000),2).map(x=>(x,x))

    rdd.mapPartitionsWithIndex((index,itr)=>{
      println("index:"+index)
      while(itr.hasNext){
        print("-"+itr.next())
      }
      itr
    }).collect()

    val partRdd = rdd.partitionBy(new MyPartitioner)
    partRdd.mapPartitionsWithIndex((index,itr)=>{
      println("index:"+index)
      while(itr.hasNext){
        print("="+itr.next())
      }
      itr
    }).collect()
  }
}


class MyPartitioner extends Partitioner{
  //分割槽的個數
  override def numPartitions: Int = 2

  /**
    * 獲取分割槽的編號
    * @param key
    *      RDD所對映的鍵值對資料的鍵
    * @return
    *     分割槽編號:如果返回1就是第二個分割槽
    */
  override def getPartition(key: Any): Int = {
    val k = key.toString.toInt
    if(k > 100){
      return 1
    }else{
      return 0
    }
  }
}
object T1{
  def main(args: Array[String]): Unit = {
    val conf  = new SparkConf().setAppName("reviewDay27").setMaster("local")
    val sc = new SparkContext(conf)
    val t= new T1
    t.f1(sc)

    sc.stop()
  }
}