1. 程式人生 > >spark學習記錄(七、二次排序和分組取TopN問題)

spark學習記錄(七、二次排序和分組取TopN問題)

1.二次排序

例題:將兩列數字按第一列升序,如果第一列相同,則第二列升序排列

資料檔案:https://download.csdn.net/download/qq_33283652/10894807

將資料封裝成物件,對物件進行排序,然後取出value

public class SecondSortKey implements Serializable, Comparable<SecondSortKey> {
    private int first;
    private int second;

    public int getFirst() {
        return first;
    }

    public void setFirst(int first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }

    public SecondSortKey(int first, int second) {
        super();
        this.first = first;
        this.second = second;
    }

    public int compareTo(SecondSortKey o) {
        if (getFirst() - o.getFirst() == 0) {
            return getSecond() - o.getSecond();
        } else {
            return getFirst() - o.getFirst();
        }
    }
}
public class SecondarySortTest {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SecondarySortTest");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> secondRDD = sc.textFile("C://secondSort.txt");

        JavaPairRDD<SecondSortKey, String> pairSecondRDD =
                secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {

                    public Tuple2<SecondSortKey, String> call(String line) throws Exception {
                        String[] spilted = line.split(" ");
                        int first = Integer.valueOf(spilted[0]);
                        int second = Integer.valueOf(spilted[1]);
                        SecondSortKey secondSortKey = new SecondSortKey(first, second);
                        return new Tuple2<SecondSortKey, String>(secondSortKey, line);
                    }
                });
        pairSecondRDD.sortByKey(false).foreach(new VoidFunction<Tuple2<SecondSortKey, String>>() {
            public void call(Tuple2<SecondSortKey, String> tuple2) throws Exception {
                System.out.println(tuple2._2);
            }
        });

        sc.stop();
    }
}

 2.分組取TopN

例題:找出各個班級的分數前n大的

資料檔案:https://download.csdn.net/download/qq_33283652/10894827

public class GroupByKeyOps {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SecondarySortTest");
        final JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> linesRDD = sc.textFile("C://scores.txt");
        JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String str) throws Exception {
                String[] splited = str.split("\t");
                String className = splited[0];
                Integer score = Integer.valueOf(splited[1]);
                return new Tuple2<String, Integer>(className, score);
            }
        });
        pairRDD.groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            public void call(Tuple2<String, Iterable<Integer>> tuple2) throws Exception {
                String className = tuple2._1;
                Iterator<Integer> iterator = tuple2._2.iterator();
                Integer[] top3 = new Integer[3];
                while (iterator.hasNext()) {
                    Integer score = iterator.next();
                    for (int i = 0; i < top3.length; i++) {
                        if (top3[i] == null) {
                            top3[i] = score;
                            break;
                        } else if (score > top3[i]) {
                            //如果有更大值進來則陣列中的小值則往後移
                            for (int j = 2; j > i; j--) {
                                top3[j] = top3[j - 1];
                            }
                            top3[i] = score;
                            break;
                        }
                    }
                }
                System.out.println("class name: " + className);
                for (Integer sscore : top3) {
                    System.out.println(sscore);
                }
            }
        });
        sc.stop();
    }
}
object ScalaGroupByKeyOps {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("ScalaGroupByKeyOps")
    val sc = new SparkContext(conf)
    val linesRDD = sc.textFile("C://scores.txt")

    val pairRDD = linesRDD.map(str => {
      val spilted = str.split("\t")
      val className = spilted(0);
      val score = Integer.valueOf(spilted(1))
      new Tuple2(className, score)
    })

    pairRDD.groupByKey().foreach(Tuple2 => {
      val className = Tuple2._1
      val iterator = Tuple2._2.iterator
      val top3 = new Array[Integer](3)
      while (iterator.hasNext) {
        val score: Integer = iterator.next
        import scala.util.control.Breaks._
        breakable(
          for (i <- 0 until top3.length) {
            if (top3(i) == null) {
              top3(i) = score
              break
            }
            if (score > top3(i)) {
              var j = 2
              while (j > i) {
                top3(j) = top3(j - 1) {
                  j -= 1;
                  j + 1
                }
              }
              top3(i) = score
              break
            }
          }
        )
      }
      System.out.println("class name: " + className)
      for (sscore <- top3) {
        System.out.println(sscore)
      }
    })

    sc.stop()
  }
}