1. 程式人生 > >spark2.x由淺入深深到底系列六之RDD java api詳解二

spark2.x由淺入深深到底系列六之RDD java api詳解二

spark 大數據 javaapi 老湯 rdd

package com.twq.javaapi.java7;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

/**
 * Created by tangweiqun on 2017/9/16.
 */
public class BaseActionApiTest {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<Integer> listRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 3, 6), 2);

        //結果: [1, 2, 4, 3, 3, 6]
        System.out.println("collect = " + listRDD.collect());
        //結果:[1, 2]
        System.out.println("take(2) = " + listRDD.take(2));
        //結果:[6, 4]
        System.out.println("top(2) = " + listRDD.top(2));
        //結果:1
        System.out.println("first = " + listRDD.first());
        //結果:1
        System.out.println("min = " + listRDD.min(new AscComparator()));
        //結果:6
        System.out.println("min = " + listRDD.min(new DescComparator()));
        //結果:6
        System.out.println("max = " + listRDD.max(new AscComparator()));
        //結果:1
        System.out.println("max = " + listRDD.max(new DescComparator()));
        //結果:[1, 2]
        System.out.println("takeOrdered(2) = " + listRDD.takeOrdered(2));
        //結果:[1, 2]
        System.out.println("takeOrdered(2)  = " + listRDD.takeOrdered(2, new AscComparator()));
        //結果:[6, 4]
        System.out.println("takeOrdered(2)  = " + listRDD.takeOrdered(2, new DescComparator()));

        listRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer element) throws Exception {
                //這個性能太差,遍歷每一個元素的時候都需要調用比較耗時的getInitNumber
                //建議采用foreachPartition來代替foreach操作
                Integer initNumber = getInitNumber("foreach");
                System.out.println((element + initNumber) + "=========");
            }
        });

        listRDD.foreachPartition(new VoidFunction<Iterator<Integer>>() {
            @Override
            public void call(Iterator<Integer> integerIterator) throws Exception {
                //和foreach api的功能是一樣,只不過一個是將函數應用到每一條記錄,這個是將函數應用到每一個partition
                //如果有一個比較耗時的操作,只需要每一分區執行一次這個操作就行,則用這個函數
                //這個耗時的操作可以是連接數據庫等操作,不需要計算每一條時候去連接數據庫,一個分區只需連接一次就行
                Integer initNumber = getInitNumber("foreach");
                while (integerIterator.hasNext()) {
                    System.out.println((integerIterator.next() + initNumber) + "=========");
                }
            }
        });

        Integer reduceResult = listRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer ele1, Integer ele2) throws Exception {
                return ele1 + ele2;
            }
        });
        //結果:19
        System.out.println("reduceResult = " + reduceResult);

        Integer treeReduceResult = listRDD.treeReduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }, 3);
        //結果:19
        System.out.println("treeReduceResult = " + treeReduceResult);

        //和reduce的功能類似,只不過是在計算每一個分區的時候需要加上初始值0,最後再將每一個分區計算出來的值相加再加上這個初始值
        Integer foldResult = listRDD.fold(0, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        //結果:19
        System.out.println("foldResult = " + foldResult);

        //先初始化一個我們想要的返回的數據類型的初始值
        //然後在每一個分區對每一個元素應用函數一(acc, value) => (acc._1 + value, acc._2 + 1)進行聚合
        //最後將每一個分區生成的數據應用函數(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)進行聚合
        Tuple2 aggregateResult = listRDD.aggregate(new Tuple2<Integer, Integer>(0, 0),
                new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception {
                        return new Tuple2<>(acc._1 + integer, acc._2 + 1);
                    }
                }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
                        return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
                    }
                });
        //結果:(19,6)
        System.out.println("aggregateResult = " + aggregateResult);

        Tuple2 treeAggregateResult = listRDD.treeAggregate(new Tuple2<Integer, Integer>(0, 0),
                new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer integer) throws Exception {
                        return new Tuple2<>(acc._1 + integer, acc._2 + 1);
                    }
                }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
                        return new Tuple2<>(acc1._1 + acc2._1, acc1._2 + acc2._2);
                    }
                });
        //結果:(19,6)
        System.out.println("treeAggregateResult = " + treeAggregateResult);


    }

    public static Integer getInitNumber(String source) {
        System.out.println("get init number from " + source + ", may be take much time........");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1;
    }

    private static class AscComparator implements Comparator<Integer>, Serializable {

        @Override
        public int compare(java.lang.Integer o1, java.lang.Integer o2) {
            return o1 - o2;
        }

    }

    private static class DescComparator implements Comparator<Integer>, Serializable {

        @Override
        public int compare(java.lang.Integer o1, java.lang.Integer o2) {
            return o2 - o1;
        }
    }

}



對於reduce, treeReduce, fold, aggragate, treeAggrate等api的詳細原理,可以參考spark core RDD api原理詳解。

spark2.x由淺入深深到底系列六之RDD java api詳解二