《深入理解Spark》Spark自定義分割槽器
阿新 • • 發佈:2019-02-19
package com.lyzx.reviewDay27 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class T1 { /** * Spark自定義分割槽器 * @param sc */ def f1(sc:SparkContext):Unit ={ val rdd = sc.parallelize(List(99,88,200,10,900,1000),2).map(x=>(x,x)) rdd.mapPartitionsWithIndex((index,itr)=>{ println("index:"+index) while(itr.hasNext){ print("-"+itr.next()) } itr }).collect() val partRdd = rdd.partitionBy(new MyPartitioner) partRdd.mapPartitionsWithIndex((index,itr)=>{ println("index:"+index) while(itr.hasNext){ print("="+itr.next()) } itr }).collect() } } class MyPartitioner extends Partitioner{ //分割槽的個數 override def numPartitions: Int = 2 /** * 獲取分割槽的編號 * @param key * RDD所對映的鍵值對資料的鍵 * @return * 分割槽編號:如果返回1就是第二個分割槽 */ override def getPartition(key: Any): Int = { val k = key.toString.toInt if(k > 100){ return 1 }else{ return 0 } } } object T1{ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("reviewDay27").setMaster("local") val sc = new SparkContext(conf) val t= new T1 t.f1(sc) sc.stop() } }