1. 程式人生 > >spark 平行計算 前n項和

spark 平行計算 前n項和

      在單執行緒計算中前n項和計算一直沒有障礙,隨著資料量的膨脹,單執行緒計算已經無法滿足資料計算。計算逐漸被遷移到spark或者hadoop叢集上平行計算,但是無論spark還是hadoop平行計算前n項和一直是一個痛點,只能做到每個結點或者容器上的前N項和,卻無法做到計算全域性前N項和。

     現提供一種解決方案,希望大家多多指正。計算過程需要兩次便利全部資料。第一次遍歷計算每個容器中資料加和結果,並返回paritition的id和容器中資料家和。第二次遍歷才計算前Nx項和的家和。現有java版本實現,如需要scala版本或者python版本實現請私信本人。

public void sum(){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("temp");
        JavaSparkContext ctx  = new JavaSparkContext(conf);
        List<Integer> list = Arrays.asList(1,2,3,4,5,6,7);
        JavaRDD<Integer> soureceRdd = ctx.parallelize(list,4).cache();
        List<Tuple2<Integer, Integer>> partitionSub = soureceRdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Tuple2<Integer, Integer>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterator<Tuple2<Integer, Integer>> call(Integer partitionId, Iterator<Integer> v2) throws Exception {
                Integer result = 0;
                while(v2.hasNext()){
                    result += v2.next();
                }
                return Arrays.asList(new Tuple2<Integer, Integer>(partitionId,result)).iterator();
            }
        }, true).collect();
        Map<Integer, Integer> paritionSum = this.sumPriPartition(partitionSub);
        JavaRDD<Integer> x = soureceRdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterator<Integer> call(Integer v1, Iterator<Integer> v2) throws Exception {
                List<Integer> result = new CopyOnWriteArrayList<Integer>();
                Integer proPartitionSum = paritionSum.get(v1); 
                while(v2.hasNext()){
                    proPartitionSum+=v2.next();
                    result.add(proPartitionSum);
                }
                return result.iterator();
            }
        }, true);
    }
    /*結果<partitionId,當前partition之前所有partition資料和>*/
    public Map<Integer, Integer> sumPriPartition(List<Tuple2<Integer, Integer>> list){
        Map<Integer, Integer>  map = new HashMap<Integer, Integer>();
        Integer caluer = 0;
        for(Tuple2<Integer, Integer> tuple: list){
            Integer partitionId = tuple._1;
            map.put(partitionId, caluer);
            caluer+=tuple._2;
        }
        return map;
    }