1. 程式人生 > >《深入理解Spark》之Spark常用運算元詳解(java版+spark1.6.1)

《深入理解Spark》之Spark常用運算元詳解(java版+spark1.6.1)

最近公司要用Java開發Spark專案,以前用的是Scala語言,今天就把Spark常用的運算元使用java語言實現了一遍

XML Code 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340

package com.lyzx.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.junit.Test;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;


/

**
 * 測試java語言開發Spark程式的各個運算元
 * 由於java語言是強型別的語言,所以這兒儘量使用lambda表示式以儘量使程式碼簡潔
 */
public class Day01 {

    /**
     * 測試map和mapToPair運算元
     *  map: 是一個對映運算元即把RDD中每個Partition中的資料做一個對映即轉變為別的格式/型別
     *       JavaRDD[M]=>JavaRDD[N],對每一條資料都呼叫一次map操作
     *  mapToPair:也是一個對映運算元,把一個RDD對映為[K,V]格式的RDD
     *  mapPartitions:對每一個Partition呼叫一次對映運算元,是對map運算元的優化
     *  mapPartitionsWithIndex:對每一個Partition呼叫一次對映運算元,並帶有每個partition的編號即索引
     * @param ctx
     */

    public void testMap(JavaSparkContext ctx){
        JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
         rdd.map(x->x+100)
            .mapToPair(x->new Tuple2<>(x,x))
            .mapPartitions(itr->{
                List<String> result = new ArrayList<>();
               while (itr.hasNext()){
                   Tuple2 t = itr.next();
                    result.add(t._1+"-"+t._2);
               }
               return result;
            })
            .mapPartitionsWithIndex((index,itr)->{
                List<String> result = new ArrayList<>();
                System.out.println("==="+index);
                while(itr.hasNext()){
                    String item = itr.next();
                    System.out.printf("  "+item);
                    result.add(item);
                }
                return result.iterator();
            },true)
            .foreach(x-> System.out.println(x));
    }

    /**
     * 測試filter運算元
     *  通過對每個元素做判斷並返回boolean值型別的引數,
     *  如果返回true就保留該元素,否則去除掉
     * @param ctx
     */
    public void testFilter(JavaSparkContext ctx){
        JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

        //x代表這個RDD的每一個元素,迭代每個元素判斷是否保留
        JavaPairRDD pairRDD = rdd.filter(x->x>5)
                                 .mapToPair(x->new Tuple2<>(x,x+1000));

        pairRDD.foreach(x-> System.out.println(x));
    }

    /**
     * 測試flatMap運算元,這個運算元就是先map後flat即
     * 先做map操作如下做完map操作後的返回值是String[]
     * 然後把陣列中的每一項掏出來作為單獨的一項
     * @param ctx
     */
    public void testFlatMap(JavaSparkContext ctx){
        JavaRDD<String> rdd = ctx.parallelize(Arrays.asList("hello java","hello c++","hello c","hello java","hello 鳴人 "," hello 雛田"));
        List<Tuple2<String,Integer>> items = rdd.flatMap(x->Arrays.asList(x.trim().split(" ")))
                            .mapToPair(x->new Tuple2<>(x,1))
                            .reduceByKey((x,y)->x+y)
                            .take(3);

        Iterator itr = items.iterator();
        while(itr.hasNext()){
            System.out.println(itr.next());
        }
    }

    /**
     * Sample:隨機抽樣運算元
     * withReplacement:表示取出來的元素是否要放回去,true表示要放回去
     * fraction:取多少,用一個小數表示,0.2表示取出20%
     * seed:表示種子,用於在除錯時取出相同的元素
     * @param ctx
     */
    public  void testSample(JavaSparkContext ctx){
        String[] table = {"A","B","C","D","E","F","G","H","I","J","K","L","M","N","O","P","Q","R","S","T","U","V","W","X","Y","Z"};
        JavaRDD rdd = ctx.parallelize(Arrays.asList(table));
        JavaRDD sampleRdd = rdd.sample(false,0.2);
        sampleRdd.foreach(x-> System.out.println(x));
    }

    /**
     * RDD還可以當做一個集合使用
     * union:表示求並集但是不去重
     * intersection:求交集
     * subtract:求餘集
     * cartesian:求笛卡爾積
     * @param ctx
     */
    public void testSetOperate(JavaSparkContext ctx){
        JavaRDD r1 = ctx.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
        JavaRDD r2 = ctx.parallelize(Arrays.asList(10,11,12,13,14,15,16,17,18,19));

        //求並集不去重
        r1.union(r2)
                .distinct()
                .foreach(x-> System.out.print(x+" "));
        System.out.println("=======================");

        //求交集
        r1.intersection(r2)
                .foreach(x-> System.out.print(x+"="));

        System.out.println("=======================");
        //差集/餘集  r1-r2
        r1.subtract(r2)
                .foreach(x-> System.out.print(x+" <> "));

        //笛卡爾積
        System.out.println("=======================");
        r1.cartesian(r2)
                .foreach(x-> System.out.print(x+"|"));
    }


    /**
     * groupByKey:按照key分組,把key相同的資料放在一個組裡,組成二元組格式Key是相同的鍵,
     * 把對應的值都放在一個Iterable集合裡
     * 注意:這個運算元時基於[K,V]格式的RDD的
     * 對於[K,V]格式的RDD即JavaPairRDD一定要指定泛型要不然不能使用具體的型別
     * 如下在foreach時x是一個Object型別而不是Tuple2型別
     *
     * JavaPairRDD[K,V]=>JavaPairRDD[k,Iterable<V>]
     * e.g:
     *  rdd1:[(1,1),(2,2),(3,3),(1,1),(2,2)]
     *  rdd2 = rdd1.groupByKey()
     *      rdd2:[(1,[1,1]),(2,(2,2)),(3,[3])]
     * 返回值型別二元組,key就是相同的鍵,值時相同的鍵對應的值的Iterable集合
     * @param ctx
     */
    public void testGroupByKey(JavaSparkContext ctx){
        List<Tuple2<String,Integer>> pairData = Arrays.asList(new Tuple2<>("A",1),
                                                              new Tuple2<>("B",2),
                                                              new Tuple2<>("C",3),
                                                              new Tuple2<>("B",2),
                                                              new Tuple2<>("C",3));

        JavaPairRDD<String,Integer> r1 = ctx.parallelizePairs(pairData);
        r1.groupByKey()
                .foreach(x->{
                    System.out.println(x._1);
                    Iterable<Integer> itr = x._2;
                    for(Integer i : itr){
                        System.out.print(i+" ");
                    }
                    System.out.println();
                });
    }


    /**
     * 按照key做聚合
     * 有兩個型別相同的引數(T t1,T t2)最後的結果是T型別
     * 也就是說把鍵相同的值多一次聚合即[K,V]是["one",Iterable<1,2,3,4,5,6,7,8,9,10>]
     * (Int,Int)=>Int  例如:(x,y)=>x+y
     * 第一次把1賦給x,把2賦給y做相加的操作後把結果3再次賦值給x
     * 然後把第三個元素賦給y在做相加操作,以此類推
     * @param ctx
     */
    public void testReduceByKey(JavaSparkContext ctx){
        JavaRDD<String> rdd = ctx.parallelize(Arrays.asList("A","B","D","Z","X","Y","A","B","D","Z","X","Y","A","B","D","A","B","D","A","B","D","A","B","D"));
        rdd.mapToPair(x->new Tuple2<>(x,1))
                .reduceByKey((x,y)->x+y)
                .sortByKey(false)
                .foreach(x-> System.out.println(x));
    }

    /**
     * join:類似於SQL語言中的等值inner join
     *      把鍵相同的值們放入一個元組中,下面是這個例子程式的運算結果
     *      (B,(2,B2))
     *      (B,(2,B2))
     *      (C,(3,C2))
     *      (C,(3,C2))
     * @param ctx
     */
    public void testJoin(JavaSparkContext ctx){
        JavaPairRDD<String,Integer> r1 = ctx.parallelizePairs(Arrays.asList(
                                    new Tuple2<>("A",1),
                                    new Tuple2<>("B",2),
                                    new Tuple2<>("C",3),
                                    new Tuple2<>("A",1),
                                    new Tuple2<>("B",2),
                                    new Tuple2<>("C",3)
                            ));

        JavaPairRDD<String,String> r2 = ctx.parallelizePairs(Arrays.asList(
                                    new Tuple2<>("B","B2"),
                                    new Tuple2<>("C","C2"),
                                    new Tuple2<>("D","D2")
                            ));

        r1.join(r2)
          .foreach(x-> System.out.println(x));
    }


    /**
     * cogroup:做一個等值連線,不過是把鍵相同的值現在map端做一下聚合
     * 如下的例子中:
     *  先把r1中B對應的值做一下聚合放入一個Iterable裡面形如 [B,Iterable<200,500>]
     *  再把R2中B對應的值做聚合形如:[B,Iterable<2,5>]
     *  最後再把B對應的值再次做聚合形成一個二元組,最後B對應的結果如[B,(Iterable<200,500>,Iterable<2,5>)]
     *  鍵對應的值時一個二元組,裡面放的是兩個Iterable集合
     * @param ctx
     */
    public void testCogroup(JavaSparkContext ctx){
        JavaPairRDD<String,Integer> r1 = ctx.parallelizePairs(Arrays.asList(
                new Tuple2<>("A",100),
                new Tuple2<>("B",200),
                new Tuple2<>("C",300),
                new Tuple2<>("A",400),
                new Tuple2<>("B",500),
                new Tuple2<>("C",600)
        ));

        JavaPairRDD<String,Integer> r2 = ctx.parallelizePairs(Arrays.asList(
                new Tuple2<>("B",2),
                new Tuple2<>("C",3),
                new Tuple2<>("D",4),
                new Tuple2<>("B",5),
                new Tuple2<>("C",6),
                new Tuple2<>("D",7)
        ));

        r1.cogroup(r2)
          .foreach(x->{
              Tuple2<Iterable<Integer>,Iterable<Integer>> t2 = x._2;

              //r1對應的值的集合
              Iterable<Integer> itr1 = t2._1;
              for(Integer item : itr1 ){
                  System.out.print(":"+item+"  ");
              }

              //r2對應的值的集合
              Iterable<Integer> itr2 = t2._2;
              for(Integer item : itr2 ){
                  System.out.println(":::"+item+"  ");
              }
          });
    }

    /**
     * coalesce:重分割槽運算元
     *  當第二個引數為true時肯定執行且有shuffle操作
     * @param ctx
     */
    public void  testCoalesce(JavaSparkContext ctx){
        JavaPairRDD<String,Integer> rdd = ctx.parallelize(Arrays.asList("A","B","D","Z","X","Y","A","B","D","Z","X","Y"),3)
                                             .mapToPair(x->new Tuple2<>(x,1));

            rdd.coalesce(2)
               .mapPartitionsWithIndex((index,itr)->{
                    System.out.println(">>::"+index);
                    while(itr.hasNext()){
                        System.out.print(itr.next()+"  ");
                    }
                    return itr;
                },false)
                .foreach(x-> System.out.println(x));
    }

    /**
     * 聚合操作(T t1,T t2)=>T t
     * @param ctx
     */
    public void testReduce(JavaSparkContext ctx){
        JavaRDD<Integer> rdd = ctx.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9));
        JavaRDD<Integer> cacheRdd = rdd.cache();

        long count = cacheRdd.count();

        Integer i = cacheRdd.reduce((x,y)->x+y);
        System.out.println(count+"===="+i);
    }

    /**
     * takeSample:隨機取出N個元素
     * takeOrdered:按照順序取出N個元素
     * @param ctx
     */
    public void testTakeSample(JavaSparkContext ctx){
        JavaRDD<String> rdd = ctx.parallelize(Arrays.asList("F","G","H","A","B","C","D","E","I","L","M","N"));
        JavaRDD cacheRdd = rdd.cache();
        List<String> elements = cacheRdd.takeSample(false,5);
        for(String item : elements){
            System.out.println(item);
        }

        List<String> orderedList =  cacheRdd.takeOrdered(2);
        for(String item : orderedList){
            System.out.println("::"+item);
        }
    }

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Day01").setMaster("local");
        JavaSparkContext ctx = new JavaSparkContext(conf);

        //Q1:在foreachRDD中怎麼取出二元組的每一項
        Day01 t = new Day01();
//        t.testMap(ctx);
//        t.testFilter(ctx);
//        t.testFlatMap(ctx);
//        t.testSample(ctx);
//        t.testSetOperate(ctx);
//        t.testGroupByKey(ctx);
//        t.testReduceByKey(ctx);
//        t.testJoin(ctx);
        t.testCogroup(ctx);
//        t.testCoalesce(ctx);
//        t.testReduce(ctx);
//        t.testTakeSample(ctx);
    }