1. 程式人生 > >spark記錄(4)spark算子之Action

spark記錄(4)spark算子之Action

lac atm ide replace action ret 加載 再次 col

Action類算子也是一類算子(函數)叫做行動算子,如foreach,collect,count等。Transformations類算子是延遲執行,Action類算子是觸發執行。一個application應用程序中有幾個Action類算子執行,就有幾個job運行。

(1)reduce

reduce其實是講RDD中的所有元素進行合並,當運行call方法時,會傳入兩個參數,在call方法中將兩個參數合並後返回,而這個返回值回合一個新的RDD中的元素再次傳入call方法中,繼續合並,直到合並到只剩下一個元素時。

代碼

    public static void reduce() {
        JavaRDD
<String> rdd = jsc.textFile("words"); String reduce = rdd.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String a) throws Exception {
return Arrays.asList(a.split(" ")); } }).reduce(new Function2<String, String, String>() { private static final long serialVersionUID = 1L; @Override public String call(String a, String b) throws Exception {
return a+"-->"+b; } }); System.out.println(reduce); }

結果:

技術分享圖片

(2)collect()

將計算的結果作為集合拉回到driver端,一般在使用過濾算子或者一些能返回少量數據集的算子後,將結果回收到Driver端打印顯示。

分布式環境下盡量規避,如有其他需要,手動編寫代碼實現相應功能就好。

詳情請參考:https://blog.csdn.net/Fortuna_i/article/details/80851775

代碼:

    public static void collect() {
        JavaRDD<String> rdd = jsc.textFile("words");
        List<String> collect = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).collect();
        for (String string : collect) {
            System.out.println(string);
        }
    }

結果:

技術分享圖片

(3)take

返回一個包含數據集前n個元素的數組(從0下標到n-1下標的元素),不排序。

代碼:

    public static void take() {
        JavaRDD<String> rdd = jsc.textFile("words");
        List<String> take = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).take(5);
        for (String string : take) {
            System.out.println(string);
        }
    }

結果:

技術分享圖片

(4)first

返回數據集的第一個元素(底層即是take(1))

代碼:

    public static void first() {
        JavaRDD<String> rdd = jsc.textFile("words");
        String first = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).first();
        System.out.println(first);

    }

結果:

技術分享圖片

(5)takeSample(withReplacement, num, [seed])

對於一個數據集進行隨機抽樣,返回一個包含num個隨機抽樣元素的數組,withReplacement表示是否有放回抽樣,參數seed指定生成隨機數的種子。

該方法僅在預期結果數組很小的情況下使用,因為所有數據都被加載到driver端的內存中。

代碼:

結果:

(6)count

返回數據集中元素個數,默認Long類型。

代碼:

    public static void count() {
        JavaRDD<String> rdd = jsc.textFile("words");
        long count = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).count();
        System.out.println(count);
    }

結果:

技術分享圖片

spark記錄(4)spark算子之Action