1. 程式人生 > >spark1.x和2.xIterable和iterator兼容問題

spark1.x和2.xIterable和iterator兼容問題

環境 兼容 lean 進行 java spark1.x pair map row

1. spark 1.x 升級到spark 2.x
對於普通的spark來說,變動不大 :
1
舉一個最簡單的實例:

spark1.x
public static JavaRDD<String> workJob(JavaRDD<String> spark1Rdd) {

        JavaPairRDD<String, Integer> testRdd = spark1Rdd
                .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {

            @Override
            public Iterable<Tuple2<String, Integer>> call(String str)
                    throws Exception {
                ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
                return list;


            }
        });

        return spark1Rdd;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spark2.x
public static JavaRDD<String> workJob(JavaRDD<String> spark2Rdd) {

        JavaPairRDD<String, Integer> testRdd2 = spark2Rdd
                .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {

            @Override
            public Iterator<Tuple2<String, Integer>> call(String str)
                    throws Exception {
                ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();


                return list.iterator();
            }
        });

        return spark2Rdd;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
需要說明的是: 
上面的返回的rdd就直接用輸入的 RDD顯然是不合理的! 只是為了用最簡潔的方式介紹代碼的轉換而已!

可以看到 : 區別主要在於
1. spark 1.x中的Iterable對象 變成了 spark2.x中的Iterator對象
2. 相應的,對於返回值為list的RDD,  spark2.x中要返回list.iterator();
1
2
3
還是很簡單的吧

問題在於 : 如果你有幾個spark程序要運行在不同的環境下,(有的現場用1.x,有的現場用2.x) 
你需要同時維護兩種不同版本的spark,是不是耗時又耗力呢?

這個時候就需要考慮到 spark版本的兼容性,使你的程序能成功的運行在各種集群環境下

2. spark版本的兼容
寫一個簡單的工具類如下 :

import java.util.Iterator;

public class MyIterator<T> implements Iterator, Iterable 
{
    private Iterator myIterable;

    public MyIterator(Iterable iterable)
    {
        myIterable = iterable.iterator();
    }

    @Override
    public boolean hasNext() 
    {
        return myIterable.hasNext();
    }

    @Override
    public Object next() 
    {
        return myIterable.next();
    }

    @Override
    public void remove() 
    {
        myIterable.remove();
    }

    @Override
    public Iterator iterator() 
    {
        return myIterable;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
只需要進行如上設計就可以實現版本的兼容了 
那麽應該如何應用呢?

 JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public MyIterator<String> call(String s) throws Exception {
                String[] split = s.split("\\s+");
                MyIterator myIterator = new MyIterator(Arrays.asList(split));
                return myIterator;
            }
});

  

spark1.x和2.xIterable和iterator兼容問題