1. 程式人生 > >第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作

第59課:使用Java和Scala在IDE中實戰RDD和DataFrame轉換操作

內容:

    1.RDD與DataFrame轉換的重大意義     2.使用Java實戰RDD與DataFrame轉換     3.使用Scala實戰RDD與dataFrame轉換

一.  RDD與DataFrame轉換的重大意義

    1.在Spark中RDD可以直接轉換成DataFrame。SparkCore的核心是RDD,所有的排程都是基於RDD完成的,對RDD的操作都可以轉換成基於DataFrame使用SparkSQL來操作。RDD可能接上資料庫,接上NoSQL,其他檔案系統等各種資料來源,然後將資料轉換為DataFrame,極大簡化了大資料的開發,原來寫Scala\Java,現在只需要寫SparkSQL。     2.同時對DataFrame的操作又可以轉換成RDD,基於DataFrame對資料進行SQL或機器學習等操作後又可以轉換為RDD,這對於儲存資料、格式化非常方便。     3.RDD變DataFrame有兩種方式:         a)通過反射,推斷RDD元素中的元資料。         RDD中的資料本身是沒有元資料的,例如一個Person的資訊裡有id/name/age,RDD的Record不知道id/name/age這些資訊,但如果變成DataFrame的話,DataFrame必須知道這些資訊。最簡單的就是通過反射知道在RDD和DataFrame轉換時擁有這些元資料資訊。             在Scala中就是Case Class對映。寫一個Case Class,描述RDD中不同列的元資料是什麼。Scala:case class對映。             在Java中就是通過JavaBean。Java:Bean(但不能支援巢狀的JavaBean,也不能有List/Map等複雜的資料結構。只能用簡單的資料型別:String/Int等。Scala就沒有這些限制)          注意:使用反射的前提,已經知道元資料資訊了(靜態的)。但有些場景下只有在執行時才能知道元資料資訊(動態的)         b)建立DataFrame時事先不知道元資料資訊,只能在執行時動態構建元資料。然後再把這些元資料資訊應用於RDD上。這種情況是比較常見的情況,即動態獲取Schema。

二、使用Java實戰RDD與DataFrame轉換

1.準備資料:person.txt

1,Spark,7
2,Hadoop,11
3,Flink,5

2.編寫Person.class

package SparkSQL;

import java.io.Serializable;

/**
 * FileName: Person
 * Author:   hadoop
 * Email:    [email protected]
 * Date:     18-10-28 下午4:27
 * Description:
 */
public class Person implements Serializable {

        private int id;
        private String name;
        private int age;

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }

        @Override
        public String toString() {
            return "Person{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}';
        }

}

3.編寫RDDToDataFrameByReflection.class

package SparkSQL;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.util.List;

/**
 * FileName: RDDToDataFrameByReflection
 * Author:   hadoop
 * Email:    
[email protected]
* Date: 18-10-28 下午3:27 * Description:使用反射的方式將RDD轉換成為DataFrame */ public class RDDToDataFrameByReflection { public static void main(String[] args){ //建立SparkConf用於讀取系統資訊並設定運用程式的名稱 SparkConf conf = new SparkConf().setAppName("RDDToDataFrameByReflection").setMaster("spark://Master:7077"); //建立JavaSparkContext物件例項作為整個Driver的核心基石 JavaSparkContext sc = new JavaSparkContext(conf); //設定輸出log的等級,可以設定INFO,WARN,ERROR sc.setLogLevel("ERROR"); //建立SQLContext上下文物件,用於SqL的分析 SQLContext sqlContext = new SQLContext(sc); //建立RDD,讀取textFile JavaRDD<String> lines = sc.textFile("/hadoop/dataframe/input"); /** * 將讀入的RDD資料轉化為Person型別的DataFrame */ JavaRDD<Person> person = lines.map(new Function<String, Person>() { @Override public Person call(String line) throws Exception { String[] splited = line.split(","); Person p = new Person(); p.setId(Integer.valueOf(splited[0].trim())); p.setName(splited[1]); p.setAge(Integer.valueOf(splited[2].trim())); return p; } }); /** * reateDataFrame方法來自於sqlContext,有兩個引數,第一個是RDD,這裡就是lines.map之後的person * 這個RDD裡的型別是Person,即每條記錄都是Person,Person其實是有id,name,age的, * JavaRDD本身並不知道id,name,age資訊,所以要建立DataFrame,DataFrame需要知道id,name,age資訊, * DataFrame怎麼知道的呢?這裡用createDataFrame時傳入兩個引數,第一個的RDD本身,第二個引數是 * 對RDD中每條資料的元資料的描述,這裡就是java bean class,即person.class * 實際上工作原理是:person.class傳入時本身會用反射的方式建立DataFrame, * 在底層通過反射的方式獲得Person的所有fields,結合RDD本身,就生成了DataFrame */ Dataset ds = sqlContext.createDataFrame(person,Person.class); //將DataFrame變成一個TempTable ds.registerTempTable("person"); //在記憶體中就會生成一個persons的表,在這張臨時表上就可以寫SQL語句了 Dataset bigDatas = sqlContext.sql("select * from person where age >= 6"); //轉過來就可以把查詢後的結果變成 RDD,返回的是JavaRDD<Row> JavaRDD<Row> bigdataRDD = bigDatas.javaRDD(); //再對RDD進行map操作。元素是一行一行的資料(SQL的Row),結果是Person,再次還原成Person。 //這裡返回的是具體的每條RDD的元素。 JavaRDD<Person> result = bigdataRDD.map(new Function<Row, Person>() { @Override public Person call(Row row) throws Exception { Person p = new Person(); /** * 轉化為DataFrame後,dataFrame對資料欄位進行了結構優化, * 對欄位進行了排序,所以使用下面的方式是不能按正確資料順序訪問資料的 * p.setId(row.getInt(0)); * p.setName(row.getString(1)); * p.setAge(row.getInt(2)); */ p.setId(row.getInt(1)); p.setName(row.getString(2)); p.setAge(row.getInt(0)); return p; } }); List<Person> personList = result.collect(); for (Person p : personList){ System.out.println(p); } } }

4.執行結果: