1. 程式人生 > >再談RDD、DataFrame、DataSet關係以及相互轉換(JAVA API)

再談RDD、DataFrame、DataSet關係以及相互轉換(JAVA API)

Spark提供了三種主要的與資料相關的API:

  • RDD
  • DataFrame
  • DataSet

三者圖示

下面詳細介紹下各自的特點:

RDD

主要描述:RDD是Spark提供的最主要的一個抽象概念(Resilient Distributed Dataset),它是一個element的collection,分割槽化的位於叢集的節點中,支援並行處理。

  • RDD的特性

    • 分散式: RDD使用MapReduce運算元來廣泛的適應在叢集中並行分散式的大資料集的處理和產生。並且方便使用者使用高級別的運算元在平行計算中。
    • 不可變: RDD是由一個records的collection組成,而且是分割槽的。分割槽是RDD並行化的基礎單元,而且每個分割槽就是對資料的邏輯分割,它是不可變的,它是通過已經存在的分割槽的某些transformations建立得到。這種不可變性方便在計算中做到資料一致性。
    • 錯誤容忍: 在實際中如果我們丟失了RDD的部分分割槽,可以通過對丟失分割槽關聯性的transformation重新計算得到。而不是在眾多節點中做資料的複製等操作。這個特性是RDD的最大優點,它節省了大量的資料管理、複製等操作,使得計算速度更快。
    • 惰性執行: 所有的transformation都是惰性的,他們並不是立刻計算出結果,而是隻是記住了各個transformation對資料集的依賴關係。當driver程式需要一個action結果時才開始執行。
    • 功能支援: RDD支援兩種型別的運算元:transformation是指從已經存在的資料集中計算得到新的資料集;action是指通過對通過對資料集的計算得到一個結果返回給driver。
    • 資料格式: 輕鬆且有效支援各種資料,包括結構化的和非結構化的。
    • 程式語言: RDD的API支援Scala、Java、Python和R
  • RDD的限制

    • 沒有內建的優化引擎 當對結構化的資料進行處理時,RDD沒有使用Spark的高階優化器,比如catalyst優化器和Tungsten執行引擎。
    • 處理結構化的資料 不像Dataframe或者Dataset,RDD不會主動推測出資料的schema,而是需要使用者在程式碼裡指示。

DataFrame

Spark從1.3版本開始引入Dataframe,它克服了RDD的最主要的挑戰。

主要描述:Dataframe是一個分散式的資料collection,而且將資料按照列名進行組織。在概念上它與關係型的資料庫的表或者R/Python語言中的DataFrame類似。與之一起提供的還有,Spark引入了catalyst優化器,它可以優化查詢。

  • DataFrame的特性

    • 分散式的Row物件的Collection: 分散式、列名組織的資料、後臺優化。 具體到程式碼裡面,Dataframe就是Dataset<Row>
    • 資料處理: 處理支援結構或者非結構化的格式(比如Avro, CSV, elastic search, 以及Cassandra)以及不同的檔案系統(HDFS, HIVE tables, MySQL, etc)。它支援非常多的資料來源
    • 使用catalyst優化器優化: 它對SQL查詢以及DataFrame API都提供優化支援。Dataframe使用catalyst樹transformation框架有四個步驟: 1、Analyzing a logical plan to resolve references 2、Logical plan optimization 3、Physical planning 4、Code generation to compile parts of the query to Java bytecode.
    • Hive相容性: 使用Spark的SQL可以無修改的支援Hive查詢在已經存在的Hive warehouses。它重用了Hive的前端、MetaStore並且對已經存在的Hive資料、查詢和UDF提供完整的相容性。
    • Tungsten: Tungsten提供了一個物理執行後端,管理記憶體動態產生expression evaluation的位元組碼
    • 程式語言: Dataframe API支援Scala、Java、Python和R
  • DataFrame的限制

    • 沒有編譯階段的型別檢查: 不能在編譯時刻對安全性做出檢查,而且限制了使用者對於未知結構的資料進行操作。比如下面程式碼在編譯時沒有錯誤,但是在執行時會出現異常:
    case class Person(name : String , age : Int) 
    val dataframe = sqlContect.read.json("people.json") 
    dataframe.filter("salary > 10000").show 
    => throws Exception : cannot resolve 'salary' given input age , name
    
    • 不能保留類物件的結構: 一旦把一個類結構的物件轉成了Dataframe,就不能轉回去了。下面這個栗子就是指出了:
    case class Person(name : String , age : Int)
    val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
    val personDF = sqlContect.createDataframe(personRDD)
    personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
    

DataSet

主要描述:Dataset API是對DataFrame的一個擴充套件,使得可以支援型別安全的檢查,並且對類結構的物件支援程式介面。它是強型別的,不可變collection,並對映成一個相關的schema。 Dataset API的核心是一個被稱為Encoder的概念。它是負責對JVM的物件以及表格化的表達(tabular representation)之間的相互轉化。 表格化的表達在儲存時使用了Spark內建的Tungsten二進位制形式,允許對序列化資料操作並改進了記憶體使用。在Spark 1.6版本之後,支援自動化生成Encoder,可以對廣泛的primitive型別(比如String,Integer,Long等)、Scala的case class以及Java Bean自動生成對應的Encoder。

  • DataSet的特性

    • 支援RDD和Dataframe的優點: 包括RDD的型別安全檢查,Dataframe的關係型模型,查詢優化,Tungsten執行,排序和shuffling。
    • Encoder: 通過使用Encoder,使用者可以輕鬆轉換JVM物件到一個Dataset,允許使用者在結構化和非結構化的資料操作。
    • 程式語言: Scala和Java
    • 型別安全檢查: 提供編譯階段的安全型別檢查。比如下面這個栗子:
    case class Person(name : String , age : Int)
    val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
    val personDF = sqlContect.createDataframe(personRDD)
    val ds:Dataset[Person] = personDF.as[Person]
    ds.filter(p => p.age > 25)
    ds.filter(p => p.salary > 25)
     // error : value salary is not a member of person
    ds.rdd // returns RDD[Person]
    
    • 相互轉換: Dataset可以讓使用者輕鬆從RDD和Dataframe轉換到Dataset不需要額外太多程式碼。
  • DataSet的限制

    • 需要把型別轉成String: Querying the data from datasets currently requires us to specify the fields in the class as a string. Once we have queried the data, we are forced to cast column to the required data type. On the other hand, if we use map operation on Datasets, it will not use Catalyst optimizer. 比如:
    ds.select(col("name").as[String], $"age".as[Int]).collect()
    

Java API中三種資料格式的相互轉換

首先構造一個數據集,是由Person類的結構組成的,然後在此之上看這三個API例項的構造以及相互轉換

  • 資料建立

Person類的定義

資料建立

  • 直接構建出 JavaRDD<Person>

    JavaRDD<Person> personJavaRDD = jsc.parallelize(personList);
    System.out.println("1. 直接構建出 JavaRDD<Person>");
    personJavaRDD.foreach(element -> System.out.println(element.toString()));
    

    Print結果:

    直接構建出 JavaRDD<Person> Person: name = Andy, age = 32 Person: name = Michael, age = 23 Person: name = Justin, age = 19

  • 直接構建出 Dataset<Person>

          Encoder<Person> personEncoder = Encoders.bean(Person.class);
          Dataset<Person> personDS = spark.createDataset(personList, personEncoder);
          System.out.println("2. 直接構建出 Dataset<Person>");
          personDS.show();
          personDS.printSchema();
    

    Print結果:

    1. 直接構建出 Dataset<Person> +---+-------+ |age| name| +---+-------+ | 32| Andy| | 23|Michael| | 19| Justin| +---+-------+ root |-- age: integer (nullable = false) |-- name: string (nullable = true)
  • 直接構建出 Dataset<Row>

          Dataset<Row> personDF = spark.createDataFrame(personList, Person.class);
          System.out.println("3. 直接構建出 Dataset<Row>");
          personDF.show();
          personDF.printSchema();
    

    Print結果:

    1. 直接構建出 Dataset<Row> +---+-------+ |age| name| +---+-------+ | 32| Andy| | 23|Michael| | 19| Justin| +---+-------+ root |-- age: integer (nullable = false) |-- name: string (nullable = true)
  • JavaRDD<Person> -> Dataset<Person>

          personDS = spark.createDataset(personJavaRDD.rdd(), personEncoder);
          System.out.println("1->2 JavaRDD<Person> -> Dataset<Person>");
          personDS.show();
          personDS.printSchema();
    

    Print結果:

    1->2 JavaRDD<Person> -> Dataset<Person> +---+-------+ |age| name| +---+-------+ | 32| Andy| | 23|Michael| | 19| Justin| +---+-------+ root |-- age: integer (nullable = true) |-- name: string (nullable = true)

  • JavaRDD<Person> -> Dataset<Row>

          personDF = spark.createDataFrame(personJavaRDD, Person.class);
          System.out.println("1->3 JavaRDD<Person> -> Dataset<Row>");
          personDF.show();
          personDF.printSchema();
    

    Print結果:

    1->3 JavaRDD<Person> -> Dataset<Row> +---+-------+ |age| name| +---+-------+ | 32| Andy| | 23|Michael| | 19| Justin| +---+-------+ root |-- age: integer (nullable = false) |-- name: string (nullable = true)

  • 補充從JavaRDD<Row>到Dataset<Row>

          JavaRDD<Row> personRowRdd = personJavaRDD.map(person -> RowFactory.create(person.age, person.name));
          List<StructField> fieldList = new ArrayList<>();
          fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
          fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
          StructType rowAgeNameSchema = DataTypes.createStructType(fieldList);
          personDF = spark.createDataFrame(personRowRdd, rowAgeNameSchema);
          System.out.println("\n\n\n補充,由JavaRDD<Row> -> Dataset<Row>");
          personDF.show();
          personDF.printSchema();
    

    主要就是使用RowFactory把Row中的每一項寫好後,通過spark的createDataFrame來建立。其中對於Row的解讀包含在了自建的StructType中。

  • Dataset<Person> -> JavaRDD<Person>

          personJavaRDD = personDS.toJavaRDD();
          System.out.println("2->1 Dataset<Person> -> JavaRDD<Person>");
          personJavaRDD.foreach(element -> System.out.println(element.toString()));
    

    Print結果:

    2->1 Dataset<Person> -> JavaRDD<Person> Person: name = Justin, age = 19 Person: name = Andy, age = 32 Person: name = Michael, age = 23

  • Dataset<Row> -> JavaRDD<Person>

          personJavaRDD = personDF.toJavaRDD().map(row -> {
              String name = row.getAs("name");
              int age = row.getAs("age");
              return new Person(name, age);
          });
          System.out.println("3->1 Dataset<Row> -> JavaRDD<Person>");
          personJavaRDD.foreach(element -> System.out.println(element.toString()));
    

    Print結果:

    3->1 Dataset<Row> -> JavaRDD<Person> Person: name = Justin, age = 19 Person: name = Michael, age = 23 Person: name = Andy, age = 32

  • Dataset<Person> -> Dataset<Row>

          List<StructField> fieldList = new ArrayList<>();
          fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, false));
          fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, false));
          StructType rowSchema = DataTypes.createStructType(fieldList);
          ExpressionEncoder<Row> rowEncoder = RowEncoder.apply(rowSchema);
          Dataset<Row> personDF_fromDS = personDS.map(
                  (MapFunction<Person, Row>) person -> {
                      List<Object> objectList = new ArrayList<>();
                      objectList.add(person.name);
                      objectList.add(person.age);
                      return RowFactory.create(objectList.toArray());
                  },
                  rowEncoder
          );
          System.out.println("2->3 Dataset<Person> -> Dataset<Row>");
          personDF_fromDS.show();
          personDF_fromDS.printSchema();
    

    Print結果:

    2->3 Dataset<Person> -> Dataset<Row> +---+-------+ |age| name| +---+-------+ | 32| Andy| | 23|Michael | 19| Justin| +---+-------+ root |-- age: integer (nullable = false) |-- name: string (nullable = true)

  • Dataset<Row> -> Dataset<Person>

          personDS = personDF.map(new MapFunction<Row, Person>() {
              @Override
              public Person call(Row value) throws Exception {
                  return new Person(value.getAs("name"), value.getAs("age"));
              }
          }, personEncoder);
          System.out.println("3->2 Dataset<Row> -> Dataset<Person>");
          personDS.show();
          personDS.printSchema();
    

    Print結果:

    3->2 Dataset<Row> -> Dataset<Person> +---+-------+ |age| name| +---+-------+ | 32| Andy| | 23|Michael| | 19| Justin| +---+-------+ root |-- age: integer (nullable = true) |-- name: string (nullable = true)

總結: 其實RDD的Map和Dataset的Map只有一點不同,就是Dataset的Map要指定一個Encoder的引數。

需要用Encoder類給出

作者:shohokuooo 連結:https://www.jianshu.com/p/71003b152a84 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。