1. 程式人生 > >Spark核心程式設計:建立RDD(集合、本地檔案、HDFS檔案)

Spark核心程式設計:建立RDD(集合、本地檔案、HDFS檔案)

1,建立RDD

1.進行Spark核心程式設計時,首先要做的第一件事,就是建立一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程式的輸入源資料。然後在建立了初始的RDD之後,才可以通過Spark Core提供的transformation運算元,對該RDD進行轉換,來獲取其他的RDD。

2.Spark Core提供了三種建立RDD的方式,包括:使用程式中的集合建立RDD;使用本地檔案建立RDD;使用HDFS檔案建立RDD。
注意:

1.使用程式中的集合建立RDD,主要用於進行測試,可以在實際部署到叢集執行之前,自己使用集合構造測試資料,來測試後面的spark應用的流程。

2.使用本地檔案建立RDD,主要用於臨時性地處理一些儲存了大量資料的檔案。

3.使用HDFS檔案建立RDD,應該是最常用的生產環境處理方式,主要可以針對HDFS上儲存的大資料,進行離線批處理操作。

2.並行化集合建立RDD

1.如果要通過並行化集合來建立RDD,需要針對程式中的集合,呼叫SparkContext的parallelize()方法。Spark會將集合中的資料拷貝到叢集上去,形成一個分散式的資料集合,也就是一個RDD。相當於是,集合中的部分資料回到一個節點上,而另一部分資料會到其他節點上。然後就可以用並行的方式來操作這個分散式資料集合,即RDD。

2.呼叫parallelize()時,有一個重要的引數是可以指定,就是要將集合切分成多個partition。Spark會為每一個partition執行一個task來進行處理。Spark官方的建議是,為叢集中的每個CPU建立2~4個partition。Spark預設會根據叢集的情況來設定partition的數量。但是也可以在呼叫parallelize()方法是,傳入第二個引數,來設定RDD的partition數量。

3.案例:1到10累加就和。

Java版:

    //建立JavaSparkContext
    JavaSparkContext sc = new JavaSparkContext(conf);
    //通過並行化集合的方式建立RDD
    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

    //執行reduce運算元操作
    //相當於,先進行1+2=3,接著3+3=6,。。。
    int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {

        private static final long serialVersionUID = 1L;

        @Override
        public Integer call(Integer num1, Integer num2) throws Exception {

            return num1 + num2;
        }
    });
    System.out.println(sum);

Scala版:

val numbers = Array(1 to 10:_*)
val numberRDD = sc.parallelize(numbers, 3)
val num = numberRDD.reduce(_ + _)
println(num)

3. 使用本地檔案和HDFS建立RDD

1.Spark是支援使用任何Hadoop支援的儲存系統上的檔案建立RDD的,比如說HDFS、Cassandra、HBase以及本地檔案。通過呼叫SparkContext的textFile()方法,可以針對本地檔案或HDFS檔案建立RDD。
注意:

1、如果是針對本地檔案的話,如果是在windows上本地測試,windows上有一份檔案即可;如果是在spark叢集上針對linux本地檔案,那麼需要將檔案拷貝到所有worker節點上。

2、Spark的textFile()方法支援針對目錄、壓縮檔案以及萬用字元進行RDD建立。

3、Spark預設會為hdfs檔案的每一個block建立一個partition,但是也可以通過textFile()的第二個引數手動設定分割槽數量,只能比block數量多,不能比block數量少。
案例:檔案字數統計

java版:

            //使用本地檔案來建立RDD 
    JavaRDD<String> lines = sc.textFile("G://SparkDevel//test//wordCount//data//spark.txt");
    //使用HDFS檔案來建立RDD
    //avaRDD<String> lines = sc.textFile("hdfs://spark1:9000/data/input/spark.txt");
    //RDD中每個元素的長度
    JavaRDD<Integer> length = lines.map(new Function<String, Integer>() {
        private static final long serialVersionUID = 1L;

        @Override
        public Integer call(String line) throws Exception {
            return line.length();
        }
    });
    //RDD中字母數量
    int numbers = length.reduce(new Function2<Integer, Integer, Integer>() {

        private static final long serialVersionUID = 1L;

        @Override
        public Integer call(Integer v1, Integer v2) throws Exception {
            return v1 + v2;
        }
    });
    //列印
    System.out.println(numbers);

Scala版:

//使用本地檔案建立RDD
val lines = sc.textFile("G://SparkDevel//test//wordCount//data//spark.txt", 4)
//使用HDFS檔案建立RDD
//val lines = sc.textFile("hdfs://spark1:9000/data/input/spark.txt", 4)
val numbers = lines.map(line => line.length()).reduce(_ + _)
println(numbers)

4.其他特列的方法來建立RDD

1、SparkContext.wholeTextFiles()方法,可以針對一個目錄中的大量小檔案,返回(filename,fileContent)組成的pair,作為一個PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每個元素就是檔案中的一行文字。

2、SparkContext.sequenceFile(K,V)()方法,可以針對SequenceFile建立RDD,K和V泛型型別就是SequenceFile的key和value的型別。K和V要求必須是Hadoop的序列化型別,比如IntWritable、Text等。

3、SparkContext.hadoopRDD()方法,對於Hadoop的自定義輸入型別,可以建立RDD。該方法接收JobConf、InputFormatClass、Key和Value的Class。

4、SparkContext.objectFile()方法,可以針對之前呼叫RDD.saveAsObjectFile()建立的物件序列化的檔案,反序列化檔案中的資料,並建立一個RDD。