1. 程式人生 > >Spark核心程式設計建立RDD及transformation和action詳解和案例

Spark核心程式設計建立RDD及transformation和action詳解和案例

建立RDD

進行Spark核心程式設計時,首先要做的第一件事,就是建立一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程式的輸入源資料。然後在建立了初始的RDD之後,才可以通過Spark Core提供的transformation運算元,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種建立RDD的方式,包括:使用程式中的集合建立RDD;使用本地檔案建立RDD;使用HDFS檔案建立RDD。

個人經驗認為:
1、使用程式中的集合建立RDD,主要用於進行測試,可以在實際部署到叢集執行之前,自己使用集合構造測試資料,來測試後面的spark應用的流程。
2、使用本地檔案建立RDD,主要用於臨時性地處理一些儲存了大量資料的檔案。
3、使用HDFS檔案建立RDD,應該是最常用的生產環境處理方式,主要可以針對HDFS上儲存的大資料,進行離線批處理操作。

並行化集合建立RDD

如果要通過並行化集合來建立RDD,需要針對程式中的集合,呼叫SparkContext的parallelize()方法。Spark會將集合中的資料拷貝到叢集上去,形成一個分散式的資料集合,也就是一個RDD。相當於是,集合中的部分資料會到一個節點上,而另一部分資料會到其他節點上。然後就可以用並行的方式來操作這個分散式資料集合,即RDD。

java

package cn.spark.study.core;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import
org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; /** * 並行化集合建立RDD * 案例:累加1到10 * @author Administrator * */ public class ParallelizeCollection { public static void main(String[] args) { // 建立SparkConf
SparkConf conf = new SparkConf() .setAppName("ParallelizeCollection") .setMaster("local"); // 建立JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 要通過並行化集合的方式建立RDD,那麼就呼叫SparkContext以及其子類,的parallelize()方法 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numberRDD = sc.parallelize(numbers); // 執行reduce運算元操作 // 相當於,先進行1 + 2 = 3;然後再用3 + 3 = 6;然後再用6 + 4 = 10。。。以此類推 int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer num1, Integer num2) throws Exception { return num1 + num2; } }); // 輸出累加的和 System.out.println("1到10的累加和:" + sum); // 關閉JavaSparkContext sc.close(); } }

scala

package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author Administrator
 */
object ParallelizeCollection {

  def main(args: Array[String]) {
    val conf = new SparkConf()
        .setAppName("ParallelizeCollection")
        .setMaster("local")
    val sc = new SparkContext(conf)

    val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numberRDD = sc.parallelize(numbers, 5)  
    val sum = numberRDD.reduce(_ + _)  

    println("1到10的累加和:" + sum)  
  }

}

呼叫parallelize()時,有一個重要的引數可以指定,就是要將集合切分成多少個partition。Spark會為每一個partition執行一個task來進行處理。Spark官方的建議是,為叢集中的每個CPU建立2~4個partition。Spark預設會根據叢集的情況來設定partition的數量。但是也可以在呼叫parallelize()方法時,傳入第二個引數,來設定RDD的partition數量。比如parallelize(arr, 10)

使用本地檔案和HDFS建立RDD

Spark是支援使用任何Hadoop支援的儲存系統上的檔案建立RDD的,比如說HDFS、Cassandra、HBase以及本地檔案。通過呼叫SparkContext的textFile()方法,可以針對本地檔案或HDFS檔案建立RDD。

有幾個事項是需要注意的:
1、如果是針對本地檔案的話,如果是在windows上本地測試,windows上有一份檔案即可;如果是在spark叢集上針對linux本地檔案,那麼需要將檔案拷貝到所有worker節點上。
2、Spark的textFile()方法支援針對目錄、壓縮檔案以及萬用字元進行RDD建立。
3、Spark預設會為hdfs檔案的每一個block建立一個partition,但是也可以通過textFile()的第二個引數手動設定分割槽數量,只能比block數量多,不能比block數量少。

java

package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

/**
 * 使用本地檔案建立RDD
 * 案例:統計文字檔案字數
 * @author Administrator
 *
 */
public class LocalFile {

    public static void main(String[] args) {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("LocalFile")
                .setMaster("local"); 
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 使用SparkContext以及其子類的textFile()方法,針對本地檔案建立RDD
        JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt");

        // 統計文字檔案內的字數
        JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(String v1) throws Exception {
                return v1.length();
            }

        });

        int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }

        });

        System.out.println("檔案總字數是:" + count);  

        // 關閉JavaSparkContext
        sc.close();
    }

}

package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

/**
 * 使用HDFS檔案建立RDD
 * 案例:統計文字檔案字數
 * @author Administrator
 *
 */
public class HDFSFile {

    public static void main(String[] args) {
        // 建立SparkConf
        // 修改:去除setMaster()設定,修改setAppName()
        SparkConf conf = new SparkConf()
                .setAppName("HDFSFile"); 
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 使用SparkContext以及其子類的textFile()方法,針對HDFS檔案建立RDD
        // 只要把textFile()內的路徑修改為hdfs檔案路徑即可
        JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/spark.txt");

        // 統計文字檔案內的字數
        JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(String v1) throws Exception {
                return v1.length();
            }

        });

        int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }

        });

        System.out.println("檔案總字數是:" + count);  

        // 關閉JavaSparkContext
        sc.close();
    }

}

scala

package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author Administrator
 */
object LocalFile {

  def main(args: Array[String]) {
    val conf = new SparkConf()
        .setAppName("LocalFile") 
        .setMaster("local");  
    val sc = new SparkContext(conf)

    val lines = sc.textFile("C://Users//Administrator//Desktop//spark.txt", 1);
    val count = lines.map { line => line.length() }.reduce(_ + _)  

    println("file's count is " + count)  
  }

}
package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author Administrator
 */
object HDFSFile {

  def main(args: Array[String]) {
    val conf = new SparkConf()
        .setAppName("HDFSFile") ;  
    val sc = new SparkContext(conf)

    val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1);
    val count = lines.map { line => line.length() }.reduce(_ + _)  

    println("file's count is " + count)  
  }

}

使用本地檔案和HDFS建立RDD

Spark的textFile()除了可以針對上述幾種普通的檔案建立RDD之外,還有一些特列的方法來建立RDD:

1、SparkContext.wholeTextFiles()方法,可以針對一個目錄中的大量小檔案,返回

transformation和action介紹

Spark支援兩種RDD操作:transformation和action。transformation操作會針對已有的RDD建立一個新的RDD;而action則主要是對RDD進行最後的操作,比如遍歷、reduce、儲存到檔案等,並可以返回結果給Driver程式。

例如,map就是一種transformation操作,它用於將已有RDD的每個元素傳入一個自定義的函式,並獲取一個新的元素,然後將所有的新元素組成一個新的RDD。而reduce就是一種action操作,它用於對RDD中的所有元素進行聚合操作,並獲取一個最終的結果,然後返回給Driver程式。

transformation的特點就是lazy特性。lazy特性指的是,如果一個spark應用中只定義了transformation操作,那麼即使你執行該應用,這些操作也不會執行。也就是說,transformation是不會觸發spark程式的執行的,它們只是記錄了對RDD所做的操作,但是不會自發的執行。只有當transformation之後,接著執行了一個action操作,那麼所有的transformation才會執行。Spark通過這種lazy特性,來進行底層的spark應用執行的優化,避免產生過多中間結果。

action操作執行,會觸發一個spark job的執行,從而觸發這個action之前所有的transformation的執行。這是action的特性。

transformation和action原理剖析

這裡寫圖片描述

案例:統計檔案字數

// 這裡通過textFile()方法,針對外部檔案建立了一個RDD,lines,但是實際上,程式執行到這裡為止,spark.txt檔案的資料是不會載入到記憶體中的。lines,只是代表了一個指向spark.txt檔案的引用。
val lines = sc.textFile("spark.txt")

// 這裡對lines RDD進行了map運算元,獲取了一個轉換後的lineLengths RDD。但是這裡連資料都沒有,當然也不會做任何操作。lineLengths RDD也只是一個概念上的東西而已。
val lineLengths = lines.map(line => line.length)

// 之列,執行了一個action操作,reduce。此時就會觸發之前所有transformation操作的執行,Spark會將操作拆分成多個task到多個機器上並行執行,每個task會在本地執行map操作,並且進行本地的reduce聚合。最後會進行一個全域性的reduce聚合,然後將結果返回給Driver程式。
val totalLength = lineLengths.reduce(_ + _)

案例:統計檔案每行出現的次數

Spark有些特殊的運算元,也就是特殊的transformation操作。比如groupByKey、sortByKey、reduceByKey等,其實只是針對特殊的RDD的。即包含key-value對的RDD。而這種RDD中的元素,實際上是scala中的一種型別,即Tuple2,也就是包含兩個值的Tuple。

在scala中,需要手動匯入Spark的相關隱式轉換,import org.apache.spark.SparkContext._。然後,對應包含Tuple2的RDD,會自動隱式轉換為PairRDDFunction,並提供reduceByKey等方法。

val lines = sc.textFile(“hello.txt”)
val linePairs = lines.map(line => (line, 1))
val lineCounts = linePairs.reduceByKey(_ + _)
lineCounts.foreach(lineCount => println(lineCount._1 + ” appears ” + llineCount._2 + ” times.”))

常用transformation介紹

這裡寫圖片描述

常用action介紹

這裡寫圖片描述

transformation操作開發實戰(重點)

java

package cn.spark.study.core;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

/**
 * transformation操作實戰
 * @author Administrator
 *
 */
@SuppressWarnings(value = {"unused", "unchecked"})
public class TransformationOperation {

    public static void main(String[] args) {
        // map();
        // filter();
        // flatMap();
        // groupByKey();
        // reduceByKey();
        // sortByKey();
        // join();
        cogroup();
    }

    /**
     * map運算元案例:將集合中每一個元素都乘以2
     */
    private static void map() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("map")
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 構造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

        // 並行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

        // 使用map運算元,將集合中的每個元素都乘以2
        // map運算元,是對任何型別的RDD,都可以呼叫的
        // 在java中,map運算元接收的引數是Function物件
        // 建立的Function物件,一定會讓你設定第二個泛型引數,這個泛型型別,就是返回的新元素的型別
            // 同時call()方法的返回型別,也必須與第二個泛型型別同步
        // 在call()方法內部,就可以對原始RDD中的每一個元素進行各種處理和計算,並返回一個新的元素
        // 所有新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(

                new Function<Integer, Integer>() {

                    private static final long serialVersionUID = 1L;

                    // 傳入call()方法的,就是1,2,3,4,5
                    // 返回的就是2,4,6,8,10
                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }

                });

        // 列印新的RDD
        multipleNumberRDD.foreach(new VoidFunction<Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                System.out.println(t);  
            }

        });

        // 關閉JavaSparkContext
        sc.close();
    }

    /**
     * filter運算元案例:過濾集合中的偶數
     */
    private static void filter() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("filter")
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 模擬集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // 並行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

        // 對初始RDD執行filter運算元,過濾出其中的偶數
        // filter運算元,傳入的也是Function,其他的使用注意點,實際上和map是一樣的
        // 但是,唯一的不同,就是call()方法的返回型別是Boolean
        // 每一個初始RDD中的元素,都會傳入call()方法,此時你可以執行各種自定義的計算邏輯
        // 來判斷這個元素是否是你想要的
        // 如果你想在新的RDD中保留這個元素,那麼就返回true;否則,不想保留這個元素,返回false
        JavaRDD<Integer> evenNumberRDD = numberRDD.filter(

                new Function<Integer, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    // 在這裡,1到10,都會傳入進來
                    // 但是根據我們的邏輯,只有2,4,6,8,10這幾個偶數,會返回true
                    // 所以,只有偶數會保留下來,放在新的RDD中
                    @Override
                    public Boolean call(Integer v1) throws Exception {
                        return v1 % 2 == 0;
                    }

                });

        // 列印新的RDD
        evenNumberRDD.foreach(new VoidFunction<Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Integer t) throws Exception {
                System.out.println(t);
            }

        });

        // 關閉JavaSparkContext
        sc.close();
    }

    /**
     * flatMap案例:將文字行拆分為多個單詞
     */
    private static void flatMap() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("flatMap")  
                .setMaster("local");  
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 構造集合
        List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");  

        // 並行化集合,建立RDD
        JavaRDD<String> lines = sc.parallelize(lineList);

        // 對RDD執行flatMap運算元,將每一行文字,拆分為多個單詞
        // flatMap運算元,在java中,接收的引數是FlatMapFunction
        // 我們需要自己定義FlatMapFunction的第二個泛型型別,即,代表了返回的新元素的型別
        // call()方法,返回的型別,不是U,而是Iterable<U>,這裡的U也與第二個泛型型別相同
        // flatMap其實就是,接收原始RDD中的每個元素,並進行各種邏輯的計算和處理,返回可以返回多個元素
        // 多個元素,即封裝在Iterable集合中,可以使用ArrayList等集合
        // 新的RDD中,即封裝了所有的新元素;也就是說,新的RDD的大小一定是 >= 原始RDD的大小
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            // 在這裡會,比如,傳入第一行,hello you
            // 返回的是一個Iterable<String>(hello, you)
            @Override
            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split(" "));
            }

        });

        // 列印新的RDD
        words.foreach(new VoidFunction<String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(String t) throws Exception {
                System.out.println(t);
            }
        });

        // 關閉JavaSparkContext
        sc.close();
    }

    /**
     * groupByKey案例:按照班級對成績進行分組
     */
    private static void groupByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("groupByKey")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 模擬集合
        List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                new Tuple2<String, Integer>("class1", 80),
                new Tuple2<String, Integer>("class2", 75),
                new Tuple2<String, Integer>("class1", 90),
                new Tuple2<String, Integer>("class2", 65));

        // 並行化集合,建立JavaPairRDD
        JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);

        // 針對scores RDD,執行groupByKey運算元,對每個班級的成績進行分組
        // groupByKey運算元,返回的還是JavaPairRDD
        // 但是,JavaPairRDD的第一個泛型型別不變,第二個泛型型別變成Iterable這種集合型別
        // 也就是說,按照了key進行分組,那麼每個key可能都會有多個value,此時多個value聚合成了Iterable
        // 那麼接下來,我們是不是就可以通過groupedScores這種JavaPairRDD,很方便地處理某個分組內的資料
        JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();

        // 列印groupedScores RDD
        groupedScores.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Iterable<Integer>> t)
                    throws Exception {
                System.out.println("class: " + t._1);  
                Iterator<Integer> ite = t._2.iterator();
                while(ite.hasNext()) {
                    System.out.println(ite.next());  
                }
                System.out.println("==============================");   
            }

        });

        // 關閉JavaSparkContext
        sc.close();
    }

    /**
     * reduceByKey案例:統計每個班級的總分
     */
    private static void reduceByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("reduceByKey")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 模擬集合
        List<Tuple2<String, Integer>> scoreList = Arrays.asList(
                new Tuple2<String, Integer>("class1", 80),
                new Tuple2<String, Integer>("class2", 75),
                new Tuple2<String, Integer>("class1", 90),
                new Tuple2<String, Integer>("class2", 65));

        // 並行化集合,建立JavaPairRDD
        JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);

        // 針對scores RDD,執行reduceByKey運算元
        // reduceByKey,接收的引數是Function2型別,它有三個泛型引數,實際上代表了三個值
        // 第一個泛型型別和第二個泛型型別,代表了原始RDD中的元素的value的型別
            // 因此對每個key進行reduce,都會依次將第一個、第二個value傳入,將值再與第三個value傳入
            // 因此此處,會自動定義兩個泛型型別,代表call()方法的兩個傳入引數的型別
        // 第三個泛型型別,代表了每次reduce操作返回的值的型別,預設也是與原始RDD的value型別相同的
        // reduceByKey演算法返回的RDD,還是JavaPairRDD<key, value>
        JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(

                new Function2<Integer, Integer, Integer>() {

                    private static final long serialVersionUID = 1L;

                    // 對每個key,都會將其value,依次傳入call方法
                    // 從而聚合出每個key對應的一個value
                    // 然後,將每個key對應的一個value,組合成一個Tuple2,作為新RDD的元素
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }

                });

        // 列印totalScores RDD
        totalScores.foreach(new VoidFunction<Tuple2<String,Integer>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._1 + ": " + t._2);   
            }

        });

        // 關閉JavaSparkContext
        sc.close();
    }

    /**
     * sortByKey案例:按照學生分數進行排序
     */
    private static void sortByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("sortByKey")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 模擬集合
        List<Tuple2<Integer, String>> scoreList = Arrays.asList(
                new Tuple2<Integer, String>(65, "leo"),
                new Tuple2<Integer, String>(50, "tom"),
                new Tuple2<Integer, String>(100, "marry"),
                new Tuple2<Integer, String>(80, "jack"));

        // 並行化集合,建立RDD
        JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);

        // 對scores RDD執行sortByKey運算元
        // sortByKey其實就是根據key進行排序,可以手動指定升序,或者降序
        // 返回的,還是JavaPairRDD,其中的元素內容,都是和原始的RDD一模一樣的
        // 但是就是RDD中的元素的順序,不同了
        JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);  

        // 列印sortedScored RDD
        sortedScores.foreach(new VoidFunction<Tuple2<Integer,String>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<Integer, String> t) throws Exception {
                System.out.println(t._1 + ": " + t._2);  
            }

        });

        // 關閉JavaSparkContext
        sc.close();
    }

    /**
     * join案例:列印學生成績
     */
    private static void join() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("join")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 模擬集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(
                new Tuple2<Integer, String>(1, "leo"),
                new Tuple2<Integer, String>(2, "jack"),
                new Tuple2<Integer, String>(3, "tom"));

        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 100),
                new Tuple2<Integer, Integer>(2, 90),
                new Tuple2<Integer, Integer>(3, 60));

        // 並行化兩個RDD
        JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);

        // 使用join運算元關聯兩個RDD
        // join以後,還是會根據key進行join,並返回JavaPairRDD
        // 但是JavaPairRDD的第一個泛型型別,之前兩個JavaPairRDD的key的型別,因為是通過key進行join的
        // 第二個泛型型別,是Tuple2<v1, v2>的型別,Tuple2的兩個泛型分別為原始RDD的value的型別
        // join,就返回的RDD的每一個元素,就是通過key join上的一個pair
        // 什麼意思呢?比如有(1, 1) (1, 2) (1, 3)的一個RDD
            // 還有一個(1, 4) (2, 1) (2, 2)的一個RDD
            // join以後,實際上會得到(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
        JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);

        // 列印studnetScores RDD
        studentScores.foreach(

                new VoidFunction<Tuple2<Integer,Tuple2<String,Integer>>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<Integer, Tuple2<String, Integer>> t)
                            throws Exception {
                        System.out.println("student id: " + t._1);  
                        System.out.println("student name: " + t._2._1);  
                        System.out.println("student score: " + t._2._2);
                        System.out.println("===============================");   
                    }

                });

        // 關閉JavaSparkContext
        sc.close();
    }

    /**
     * cogroup案例:列印學生成績
     */
    private static void cogroup() {
        // 建立SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("cogroup")  
                .setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 模擬集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(
                new Tuple2<Integer, String>(1, "leo"),
                new Tuple2<Integer, String>(2, "jack"),
                new Tuple2<Integer, String>(3, "tom"));

        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                new Tuple2<Integer, Integer>(1, 100),
                new Tuple2<Integer, Integer>(2, 90),
                new Tuple2<Integer, Integer>(3, 60),
                new Tuple2<Integer, Integer>(1, 70),
                new Tuple2<Integer, Integer>(2, 80),
                new Tuple2<Integer, Integer>(3, 50));

        // 並行化兩個RDD
        JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);

        // cogroup與join不同
        // 相當於是,一個key join上的所有value,都給放到一個Iterable裡面去了 
        // cogroup,不太好講解,希望大家通過動手編寫我們的案例,仔細體會其中的奧妙
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = 
                students.cogroup(scores);

        // 列印studnetScores RDD
        studentScores.foreach(

                new VoidFunction<Tuple2<Integer,Tuple2<Iterable<String>,Iterable<Integer>>>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(
                            Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t)
                            throws Exception {
                        System.out.println("student id: " + t._1);  
                        System.out.println("student name: " + t._2._1);  
                        System.out.println("student score: " + t._2._2);
                        System.out.println("===============================");   
                    }

                });

        // 關閉JavaSparkContext
        sc.close();
    }

}

scala

package cn.spark.study.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

/**
 * @author Administrator
 */
object TransformationOperation {

  def main(args: Array[String]) {
    // map()  
    // filter()  
    // flatMap()  
    // groupByKey() 
    // reduceByKey()  
    // sortByKey() 
    // join()  
    cogroup()
  }

  def map() {
    val conf = new SparkConf()
        .setAppName("map")
        .setMaster("local")  
    val sc = new SparkContext(conf)

    val numbers = Array(1, 2, 3, 4, 5)
    val numberRDD = sc.parallelize(numbers, 1)  
    val multipleNumberRDD = numberRDD.map { num => num * 2 }  

    multipleNumberRDD.foreach { num => println(num) }   
  }

  def filter() {
    val conf = new SparkConf()
        .setAppName("filter")
        .setMaster("local")
    val sc = new SparkContext(conf)

    val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numberRDD = sc.parallelize(numbers, 1)
    val evenNumberRDD = numberRDD.filter { num => num % 2 == 0 }

    evenNumberRDD.foreach { num => println(num) }   
  }

  def flatMap() {
    val conf = new SparkConf()
        .setAppName("flatMap")  
        .setMaster("local")  
    val sc = new SparkContext(conf) 

    val lineArray = Array("hello you", "hello me", "hello world")  
    val lines = sc.parallelize(lineArray, 1)
    val words = lines.flatMap { line => line.split(" ") }   

    words.foreach { word => println(word) }
  }

  def groupByKey() {
    val conf = new SparkConf()
        .setAppName("groupByKey")  
        .setMaster("local")  
    val sc = new SparkContext(conf)

    val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),
        Tuple2("class1", 90), Tuple2("class2", 60))
    val scores = sc.parallelize(scoreList, 1)  
    val groupedScores = scores.groupByKey() 

    groupedScores.foreach(score => { 
      println(score._1); 
      score._2.foreach { singleScore => println(singleScore) };
      println("=============================")  
    })
  }

  def reduceByKey() {
    val conf = new SparkConf()
        .setAppName("groupByKey")  
        .setMaster("local")  
    val sc = new SparkContext(conf)

    val scoreList = Array(Tuple2("class1", 80), Tuple2("class2", 75),
        Tuple2("class1", 90), Tuple2("class2", 60))
    val scores = sc.parallelize(scoreList, 1)  
    val totalScores = scores.reduceByKey(_ + _)  

    totalScores.foreach(classScore => println(classScore._1 + ": " + classScore._2))  
  }

  def sortByKey() {
    val conf = new SparkConf()
        .setAppName("sortByKey")  
        .setMaster("local")  
    val sc = new SparkContext(conf)

    val scoreList = Array(Tuple2(65, "leo"), Tuple2(50, "tom"), 
        Tuple2(100, "marry"), Tuple2(85, "jack"))  
    val scores = sc.parallelize(scoreList, 1)  
    val sortedScores = scores.sortByKey(false)

    sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))  
  }

  def join() {
    val conf = new SparkConf()
        .setAppName("join")  
        .setMaster("local")  
    val sc = new SparkContext(conf)

   val studentList = Array(
        Tuple2(1, "leo"),
        Tuple2(2, "jack"),
        Tuple2(3, "tom"));

   val