spark學習記錄(七、二次排序和分組取TopN問題)
阿新 • • 發佈:2019-01-13
1.二次排序
例題:將兩列數字按第一列升序,如果第一列相同,則第二列升序排列
資料檔案:https://download.csdn.net/download/qq_33283652/10894807
將資料封裝成物件,對物件進行排序,然後取出value
public class SecondSortKey implements Serializable, Comparable<SecondSortKey> { private int first; private int second; public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } public SecondSortKey(int first, int second) { super(); this.first = first; this.second = second; } public int compareTo(SecondSortKey o) { if (getFirst() - o.getFirst() == 0) { return getSecond() - o.getSecond(); } else { return getFirst() - o.getFirst(); } } }
public class SecondarySortTest { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("SecondarySortTest"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> secondRDD = sc.textFile("C://secondSort.txt"); JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() { public Tuple2<SecondSortKey, String> call(String line) throws Exception { String[] spilted = line.split(" "); int first = Integer.valueOf(spilted[0]); int second = Integer.valueOf(spilted[1]); SecondSortKey secondSortKey = new SecondSortKey(first, second); return new Tuple2<SecondSortKey, String>(secondSortKey, line); } }); pairSecondRDD.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey, String>>() { public void call(Tuple2<SecondSortKey, String> tuple2) throws Exception { System.out.println(tuple2._2); } }); sc.stop(); } }
2.分組取TopN
例題:找出各個班級的分數前n大的
資料檔案:https://download.csdn.net/download/qq_33283652/10894827
public class GroupByKeyOps { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("SecondarySortTest"); final JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> linesRDD = sc.textFile("C://scores.txt"); JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String str) throws Exception { String[] splited = str.split("\t"); String className = splited[0]; Integer score = Integer.valueOf(splited[1]); return new Tuple2<String, Integer>(className, score); } }); pairRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() { public void call(Tuple2<String, Iterable<Integer>> tuple2) throws Exception { String className = tuple2._1; Iterator<Integer> iterator = tuple2._2.iterator(); Integer[] top3 = new Integer[3]; while (iterator.hasNext()) { Integer score = iterator.next(); for (int i = 0; i < top3.length; i++) { if (top3[i] == null) { top3[i] = score; break; } else if (score > top3[i]) { //如果有更大值進來則陣列中的小值則往後移 for (int j = 2; j > i; j--) { top3[j] = top3[j - 1]; } top3[i] = score; break; } } } System.out.println("class name: " + className); for (Integer sscore : top3) { System.out.println(sscore); } } }); sc.stop(); } }
object ScalaGroupByKeyOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("ScalaGroupByKeyOps")
val sc = new SparkContext(conf)
val linesRDD = sc.textFile("C://scores.txt")
val pairRDD = linesRDD.map(str => {
val spilted = str.split("\t")
val className = spilted(0);
val score = Integer.valueOf(spilted(1))
new Tuple2(className, score)
})
pairRDD.groupByKey().foreach(Tuple2 => {
val className = Tuple2._1
val iterator = Tuple2._2.iterator
val top3 = new Array[Integer](3)
while (iterator.hasNext) {
val score: Integer = iterator.next
import scala.util.control.Breaks._
breakable(
for (i <- 0 until top3.length) {
if (top3(i) == null) {
top3(i) = score
break
}
if (score > top3(i)) {
var j = 2
while (j > i) {
top3(j) = top3(j - 1) {
j -= 1;
j + 1
}
}
top3(i) = score
break
}
}
)
}
System.out.println("class name: " + className)
for (sscore <- top3) {
System.out.println(sscore)
}
})
sc.stop()
}
}