1. 程式人生 > >Java Spark之建立RDD的兩種方式和操作RDD

Java Spark之建立RDD的兩種方式和操作RDD

首先看看思維導圖,我的spark是1.6.1版本,jdk是1.7版本 


spark是什麼? 
Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了在大資料環境下資料處理的實時性,同時保證了高容錯性和高可伸縮性,允許使用者將Spark 部署在大量廉價硬體之上,形成叢集。

下載和安裝 
可以看我之前發表的部落格 
Spark安裝

安裝成功後執行示例程式

在spark安裝目錄下examples/src/main目錄中。 執行的一個Java或Scala示例程式,使用bin/run-example <class> [params]

./bin/run-example SparkPi 10

啟動spark-shell時的引數 
./bin/spark-shell –master local[2] 
引數master 表名主機master在分散式叢集中的URL 
local【2】 表示在本地通過開啟2個執行緒執行

執行模式 
四種: 
1.Mesos 
2.Hadoop YARN 
3.spark 
4.local

一般我們用的是local和spark模式

首先建立maven工程加入整個專案所用到的包的maven依賴

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>sparkday01</groupId>
  <artifactId>sparkday01</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>sparkday01</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>

     <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.6.1</version>

     </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.4</version>

    </dependency>

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.1</version>
    </dependency>
  </dependencies>
</project>

下面開始初始化spark

spark程式需要做的第一件事情,就是建立一個SparkContext物件,它將告訴spark如何訪問一個叢集,而要建立一個SparkContext物件,你首先要建立一個SparkConf物件,該物件訪問了你的應用程式的資訊

比如下面的程式碼是執行在spark模式下

public class sparkTestCon {

    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因為jvm無法獲得足夠的資源
        JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
        System.out.println(sc);
    }

}

下面是執行在本機,把上面的第6行程式碼改為如下

JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

快速入門

可以參看我的部落格,轉載的一篇文章 
Spark快速入門

Spark程式設計

每一個spark應用程式都包含一個驅動程式(driver program ),他會執行使用者的main函式,並在叢集上執行各種並行操作(parallel operations)

spark提供的最主要的抽象概念有兩種: 
彈性分散式資料集(resilient distributed dataset)簡稱RDD ,他是一個元素集合,被分割槽地分佈到叢集的不同節點上,可以被並行操作,RDDS可以從hdfs(或者任意其他的支援Hadoop的檔案系統)上的一個檔案開始建立,或者通過轉換驅動程式中已經存在的Scala集合得到,使用者也可以讓spark將一個RDD持久化到記憶體中,使其能再並行操作中被有效地重複使用,最後RDD能自動從節點故障中恢復

spark的第二個抽象概念是共享變數(shared variables),它可以在並行操作中使用,在預設情況下,當spark將一個函式以任務集的形式在不同的節點上並行執行時,會將該函式所使用的每個變數拷貝傳遞給每一個任務中,有時候,一個變數需要在任務之間,或者驅動程式之間進行共享,spark支援兩種共享變數: 
廣播變數(broadcast variables),它可以在所有節點的記憶體中快取一個值。 
累加器(accumulators):只能用於做加法的變數,例如計算器或求和器

RDD的建立有兩種方式 
1.引用外部檔案系統的資料集(HDFS) 
2.並行化一個已經存在於驅動程式中的集合(並行集合,是通過對於驅動程式中的集合呼叫JavaSparkContext.parallelize來構建的RDD)

第一種方式建立 
下面通過程式碼來理解RDD和怎麼操作RDD

package com.tg.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部檔案系統的資料集(HDFS)建立RDD
 *  匿名內部類定義函式傳給spark
 * @author 湯高
 *
 */
public class RDDOps {
    //完成對所有行的長度求和
    public static void main(String[] args) {

        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因為jvm無法獲得足夠的資源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
        System.out.println(sc);

        //通過hdfs上的檔案定義一個RDD 這個資料暫時還沒有載入到記憶體,也沒有在上面執行動作,lines僅僅指向這個檔案
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");

        //定義lineLengths作為Map轉換的結果 由於惰性,不會立即計算lineLengths
        //第一個引數為傳入的內容,第二個引數為函式操作完後返回的結果型別
        JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
          public Integer call(String s) { 
              System.out.println("每行長度"+s.length());
              return s.length(); }
        });
        //執行reduce  這是一個動作action  這時候,spark才將計算拆分成不同的task,
        //並執行在獨立的機器上,每臺機器執行他自己的map部分和本地的reducation,並返回結果集給去驅動程式
        int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer a, Integer b) { return a + b; }
        });

        System.out.println(totalLength);
        //為了以後複用  持久化到記憶體...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());


    }
}


如果覺得剛剛那種寫法難以理解,可以看看第二種寫法

package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部檔案系統的資料集(HDFS)建立RDD 
 *  外部類定義函式傳給spark
 * @author 湯高
 *
 */
public class RDDOps2 {
    // 完成對所有行的長度求和
    public static void main(String[] args) {

        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);


        //通過hdfs上的檔案定義一個RDD 這個資料暫時還沒有載入到記憶體,也沒有在上面執行動作,lines僅僅指向這個檔案
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
        //定義lineLengths作為Map轉換的結果 由於惰性,不會立即計算lineLengths
        JavaRDD<Integer> lineLengths = lines.map(new GetLength());


        //執行reduce  這是一個動作action  這時候,spark才將計算拆分成不同的task,
                //並執行在獨立的機器上,每臺機器執行他自己的map部分和本地的reducation,並返回結果集給去驅動程式
        int totalLength = lineLengths.reduce(new Sum());

        System.out.println("總長度"+totalLength);
        // 為了以後複用 持久化到記憶體...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());

    }
    //定義map函式
    //第一個引數為傳入的內容,第二個引數為函式操作完後返回的結果型別
    static class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }
    //定義reduce函式 
    //第一個引數為內容,第三個引數為函式操作完後返回的結果型別
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

第二種方式建立RDD

package com.tg.spark;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;

import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
 * 並行化一個已經存在於驅動程式中的集合建立RDD
 * @author 湯高
 *
 */
public class RDDOps3 {
    // 完成對所有數求和
    public static void main(String[] args) {

        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);

        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        //並行集合,是通過對於驅動程式中的集合呼叫JavaSparkContext.parallelize來構建的RDD
        JavaRDD<Integer> distData = sc.parallelize(data);

        JavaRDD<Integer> lineLengths = distData.map(new GetLength());

        // 執行reduce 這是一個動作action 這時候,spark才將計算拆分成不同的task,
        // 並執行在獨立的機器上,每臺機器執行他自己的map部分和本地的reducation,並返回結果集給去驅動程式
        int totalLength = lineLengths.reduce(new Sum());

        System.out.println("總和" + totalLength);
        // 為了以後複用 持久化到記憶體...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());

    }

    // 定義map函式
    static class GetLength implements Function<Integer, Integer> {

        @Override
        public Integer call(Integer a) throws Exception {

            return a;
        }
    }

    // 定義reduce函式
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

注意:上面的寫法是基於jdk1.7或者更低版本 
基於jdk1.8有更簡單的寫法 
下面是官方文件的說明


Note: In this guide, we’ll often use the concise Java 8 lambda syntax to specify Java functions, but in older versions of Java you can implement the interfaces in the org.apache.spark.api.java.function package. We describe passing functions to Spark in more detail below.


Spark’s API relies heavily on passing functions in the driver program to run on the cluster. In Java, functions are represented by classes implementing the interfaces in the org.apache.spark.api.java.function package. There are two ways to create such functions:

Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, and pass an instance of it to Spark.
In Java 8, use lambda expressions to concisely define an implementation.

所以如果要完成上面第一種建立方式,在jdk1.8中可以簡單的這麼寫

JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

要完成第二種方式的建立,簡單的這麼寫

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

主要不同就是在jdk1.7中我們要自己寫一個函式傳到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中寫lambda表示式

好了,今天就寫到這裡,以後的更多內容後面再寫 
碼字不易,轉載請指明出處http://blog.csdn.net/tanggao1314/article/details/51570452

參考資料 
Spark程式設計指南