1. 程式人生 > >Spark分組二次排序

Spark分組二次排序

package com.ibeifeng.spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer import scala.util.{Random, Try} object TopN { def main(args: Array[String]): Unit = { val hdfs = "hdfs://192.168.1.102:8020" //設定配置屬性 val conf = SparkConf() .setMaster("dataNode1") .setAppName("Secnodary-Sort") .set("mapreduce.framework.name", "yarn") .set("spark.rdd.compress", "true") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .set("spark.storage.memoryFraction", "0.5") .set("spark.akka.frameSize", "100") .set("spark.default.parallelism", "1") val sc = SparkContext.getOrCreate(conf) //利用textFile方法建立RDD val fileRDD: RDD[String] = sc.textFile(s"hdfs://${hdfs}/Data/emp.data") val wordRDD: RDD[(String, Int)] = fileRDD.map(line => { val arr = line.split(" ")   //排除資料異常和空格 (Try(arr(0).trim),Try(1).trim.toInt) }) .groupByKey() .sortByKey(true) .map(x => (x._1,x._2.sortWith(_ > _)))   //結果資料輸出到HDFS wordRDD.saveAsTextFile(s"${hdfs}/interviewData/resultData")