Hadoop、Spark(Java、scala)實現分組、排序
1. MapReduce實現分組、排序
1.1分組
//在MapReduce 主類設定自定義分組Class
job.setGroupingComparatorClass((Class<? extends RawComparator> cls);
//例項
job.setGroupingComparatorClass(S1apEnbidGroupingComparator.class);
1.2 排序
//自定義二次排序策略 job.setSortComparatorClass(Class<? extends RawComparator> cls) //例項 job.setSortComparatorClass(S1apEnbidSortComparator.class);
2.Java版Spark實現分組、排序
//分組
JavaPairRDD.groupByKey(reducenum);
key重寫hashCode()和equals()
//value排序 groupByKey.mapToPair(new PairFunction<Tuple2<SparkLocatorCombinedKey,Iterable<String>>, SparkLocatorCombinedKey, Iterable<String>>() { private static final long serialVersionUID = 8988893168013930479L; @Override public Tuple2<SparkLocatorCombinedKey, Iterable<String>> call( Tuple2<SparkLocatorCombinedKey, Iterable<String>> tuple2) throws Exception { List<String> list = copyIterator(tuple2._2.iterator()); Collections.sort(list, new SecondSortByTimeComparator()); return new Tuple2<SparkLocatorCombinedKey, Iterable<String>>(tuple2._1, list); } }); //Iterable => Iterator => List public static <T> List<T> copyIterator(Iterator<T> iter) { List<T> copy = new ArrayList<T>(); while (iter.hasNext()) copy.add(iter.next()); return copy; } // List => Iterable 多型 //SecondSortByTimeComparator
3.scala版Spark實現分組、排序
//分組 與Java一樣 也是hashCode()和equals()
rdd.groupByKey(reduceNum)
//value排序
rdd.mapValues(iterable => {
iterable.toList.sortBy(sortRule)
})
//sortRule
def sortRule(employee: Employee): (Long, String) = {
(employee.getTimeStamp, employee.getEmployeeID)
}