1. 程式人生 > >spark2.x由淺入深深到底系列六之RDD java api詳解四

spark2.x由淺入深深到底系列六之RDD java api詳解四

spark 大數據 javaapi 老湯 rdd

學習spark任何的知識點之前,先對spark要有一個正確的理解,可以參考:正確理解spark

本文對join相關的api做了一個解釋

SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaPairRDD<Integer, Integer> javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, 2),
                new Tuple2<>(3, 4), new Tuple2<>(3, 6), new Tuple2<>(5, 6)));
JavaPairRDD<Integer, Integer> otherJavaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2<>(3, 9),
                new Tuple2<>(4, 5)));
//結果: [(4,([],[5])), (1,([2],[])), (3,([4, 6],[9])), (5,([6],[]))]
System.out.println(javaPairRDD.cogroup(otherJavaPairRDD).collect());

//結果: [(4,([],[5])), (1,([2],[])), (3,([4, 6],[9])), (5,([6],[]))]
// groupWith和cogroup效果是一模一樣的
System.out.println(javaPairRDD.groupWith(otherJavaPairRDD).collect());

//結果: [(3,(4,9)), (3,(6,9))]
//基於cogroup實現的,就是取cogroup結果中相同key在兩個RDD都有value的數據
System.out.println(javaPairRDD.join(otherJavaPairRDD).collect());

//結果: [(1,(2,Optional.empty)), (3,(4,Optional[9])), (3,(6,Optional[9])), (5,(6,Optional.empty))]
//基於cogroup實現的,結果需要出現的key以左邊的RDD為準
System.out.println(javaPairRDD.leftOuterJoin(otherJavaPairRDD).collect());

//結果: [(4,(Optional.empty,5)), (3,(Optional[4],9)), (3,(Optional[6],9))]
//基於cogroup實現的,結果需要出現的key以右邊的RDD為準
System.out.println(javaPairRDD.rightOuterJoin(otherJavaPairRDD).collect());

//結果: [(4,(Optional.empty,Optional[5])), (1,(Optional[2],Optional.empty)), (3,(Optional[4],Optional[9])), (3,(Optional[6],Optional[9])), (5,(Optional[6],Optional.empty))]
//基於cogroup實現的,結果需要出現的key是兩個RDD中所有的key
System.out.println(javaPairRDD.fullOuterJoin(otherJavaPairRDD).collect());


從上可以看出,最基本的操作是cogroup這個操作,下面是cougroup的原理圖:

技術分享

如果想對cogroup原理更徹底的理解,可以參考:spark core RDD api原理詳解

spark2.x由淺入深深到底系列六之RDD java api詳解四