1. 程式人生 > >java實現spark運算元combineByKey

java實現spark運算元combineByKey

/**
 * createCombiner: combineByKey() 會遍歷分割槽中的所有元素,因此每個元素的鍵要麼還沒有遇到過,要麼就
    和之前的某個元素的鍵相同。如果這是一個新的元素, combineByKey() 會使用一個叫作 createCombiner() 的函式來建立
    那個鍵對應的累加器的初始值
   mergeValue: 如果這是一個在處理當前分割槽之前已經遇到的鍵, 它會使用 mergeValue() 方法將該鍵的累加器對應的當前值與這個新的值進行合併
   mergeCombiners: 由於每個分割槽都是獨立處理的, 因此對於同一個鍵可以有多個累加器。如果有兩個或者更多的分割槽都有對應同一個鍵的累加器,
就需要使用使用者提供的 mergeCombiners() 方法將各個分割槽的結果進行合併。 */ public class CombineByKey { public static void main(String[] args){ SparkConf conf = new SparkConf().setAppName("CombineKeyCountAvg").setMaster("local[2]"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
ArrayList<ScoreDetail> scoreDetails = new ArrayList<>(); scoreDetails.add(new ScoreDetail("xiaoming", "math", 90)); scoreDetails.add(new ScoreDetail("xiaoming", "English", 80)); scoreDetails.add(new ScoreDetail("xiaohong", "math", 70)); scoreDetails.add(new ScoreDetail("xiaohong"
, "English", 60)); scoreDetails.add(new ScoreDetail("xiaozhang", "math", 50)); scoreDetails.add(new ScoreDetail("xiaozhang", "English", 40)); //list => javaRDD JavaRDD<ScoreDetail> scoreDetailsRDD = javaSparkContext.parallelize(scoreDetails); //scoreDetail => (name, scoreDetail) JavaPairRDD<String, ScoreDetail> nameWithScoreDetail = scoreDetailsRDD .mapToPair(new PairFunction<ScoreDetail, String, ScoreDetail>() { @Override public Tuple2<String, ScoreDetail> call(ScoreDetail scoreDetail) throws Exception { return new Tuple2<String, ScoreDetail>(scoreDetail.name, scoreDetail); } }); //傳入一個ScoreDetail,返回一個Tuple,當前沒有遇到這個scoreDetail時,會建立這個Tuple //new Function<ScoreDetail, Float,Integer>(); Function<ScoreDetail, Tuple2<Float, Integer>> createCombine = new Function<ScoreDetail, Tuple2<Float, Integer>>() { @Override public Tuple2<Float, Integer> call(ScoreDetail scoreDetail) throws Exception { return new Tuple2<>(scoreDetail.score, 1); } }; //根據要合併的物件nameWithScoreDetail中的name作為鍵,scoreDetail作為值 //傳入操作是傳入鍵對應的值scoreDetailTuple<Float, Integer>,返回一個Tuple Function2<Tuple2<Float, Integer>, ScoreDetail, Tuple2<Float, Integer>> mergeValue = new Function2<Tuple2<Float, Integer>, ScoreDetail, Tuple2<Float, Integer>>() { @Override public Tuple2<Float, Integer> call(Tuple2<Float, Integer> t, ScoreDetail scoreDetail) throws Exception { return new Tuple2<>(t._1 + scoreDetail.score, t._2 + 1); } }; //合併分割槽,傳入兩個Tuple,返回一個Tuple Function2<Tuple2<Float, Integer>, Tuple2<Float, Integer>, Tuple2<Float, Integer>> mergeCombiners = new Function2<Tuple2<Float, Integer>, Tuple2<Float, Integer>, Tuple2<Float, Integer>>() { @Override public Tuple2<Float, Integer> call(Tuple2<Float, Integer> t1, Tuple2<Float, Integer> t2) throws Exception { return new Tuple2<>(t1._1 + t2._1, t1._2 + t2._2); } }; JavaPairRDD<String, Tuple2<Float, Integer>> combineByKey = nameWithScoreDetail.combineByKey(createCombine, mergeValue, mergeCombiners); System.out.println(combineByKey.collect()); } }