1. 程式人生 > >[2.2]Spark DataFrame操作(二)之通過反射實現RDD與DataFrame的轉換

[2.2]Spark DataFrame操作(二)之通過反射實現RDD與DataFrame的轉換

參考

場景

檔案/home/pengyucheng/java/rdd2dfram.txt中有如下4條記錄:

1,hadoop,11
2,spark,7
3,flink,5
4,ivy,27

編碼實現:查詢並在控制檯打印出每行第三個欄位值大於7的記錄-例如,第一條記錄1,hadoop,11中第三個欄位值為 11 大於7故應該打印出來。

分析

上述問題屬於Spark SQL類問題:即查詢出第三個欄位值為 11 大於7的記錄。關鍵在於將txt中的非結構化的資料轉換成DataFrame中的類似myql中的結構化(有具體資料型別)的資料,而轉換的方式有兩種:

  • 1、通過反射指定結構型別(Inferring the Schema Using Reflection)
  • 2、程式設計動態指定結構型別(Programmatically Specifying the Schema)

“Spark SQL supports two different methods for converting existing RDDs into DataFrames. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

The second method for creating DataFrames is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct DataFrames when the columns and their types are not known until runtime.”
本文通過案例講解第一轉換方式,下一篇文章分析第二種轉換方式。

實驗

java版

package main.scala;
import java.io.Serializable;
/**
 * 封裝檔案內容
 * @author pengyucheng
 */
public class Person implements Serializable{
    private static final long serialVersionUID = 1L;
    private int id;
    private String name;
    private int age;

    public int getId() {
        return id;
    }
    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";
    }
    public String getName() {
        return name;
    }
    public int getAge() {
        return age;
    }
    public void setId(int id) {
        this.id = id;
    }
    public void setName(String name) {
        this.name = name;
    }
    public void setAge(int age) {
        this.age = age;
    }
}
package cool.pengych.spark.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

/**
 * RDD 轉換成DataFrame
 * @author pengyucheng
 *
 */
public class RDD2DataFrameByReflection
{
    public static void main(String[] args)
    {
        /*
         * 1、建立SQLContext
         */
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameByReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        /*
         * 2、讀取外部檔案,並轉換封裝到對應的JavaRDD中
         */
        JavaRDD<String> lines = sc.textFile("file:////home/pengyucheng/java/rdd2dfram.txt");
        JavaRDD<Person> persons = lines.map(new Function<String,Person>(){
            private static final long serialVersionUID = 1L;
            @Override
            public Person call(String line) throws Exception
            {
                String[] attrs = line.split(",");
                Person person = new Person();
                person.setId(Integer.valueOf(attrs[0]));
                person.setAge(Integer.valueOf(attrs[2]));//age的序列號為 2
                person.setName(attrs[1]);
                return person;
            }
        });

        /*
         * 3、生成DataFrame:在底層通過反射的方式獲得Person的所有fields,結合RDD本身,就生成了DataFrame
         */
        DataFrame df = sqlContext.createDataFrame(persons, Person.class);
        df.registerTempTable("persons");

        DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");
        JavaRDD<Row> bigDatasRDD = bigDatas.javaRDD();

        JavaRDD<Person> result = bigDatasRDD.map(new Function<Row, Person>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Person call( Row row) throws Exception
            {
                Person person = new Person();
                person.setId(row.getInt(1));
                person.setAge(row.getInt(0));//age的序列號變為 0:DataFrame -> JavaRDD過程中對
                person.setName(row.getString(2));
                return person;
            }
        });

        /*
         * 4、在控制檯輸出結果
         */
        List<Person> personList = result.collect();
        for (Person person : personList)
        {
            System.out.println(person);
        }
    }
}

scala版

package main.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object RDD2DataFrameByReflection {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameByReflection")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

     val persons = sc.textFile("file:////home/pengyucheng/java/rdd2dfram.txt").map(row => {
     val attrs = row.split(",")
     val person = new Person
            person.setId(Integer.valueOf(attrs(0)))
                person.setAge(Integer.valueOf(attrs(2)))
                person.setName(attrs(1))
                person
    } )

   sqlContext.createDataFrame(persons,new Person().getClass).registerTempTable("persons")

  val personsRDD = sqlContext.sql("select * from persons where age >= 6 ").rdd

  val result = personsRDD.map( row => {
    val person = new Person
        person.setId(row.getInt(1))
                person.setAge(row.getInt(0))
                person.setName(row.getString(2))
                person
  } )

    result.collect.foreach(println)
  }
}

執行結果

16/05/25 18:10:16 INFO DAGScheduler: Job 0 finished: collect at RDD2DataFrameByReflection.java:69, took 0.795109 s
Person [id=1, name=hadoop, age=11]
Person [id=2, name=spark, age=7]
Person [id=4, name=ivy, age=27]
16/05/25 18:10:16 INFO SparkContext: Invoking stop() from shutdown hook

遇到的問題

  • java.lang.IllegalAccessException
16/05/25 17:42:59 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class cool.pengych.spark.sql.Person with modifiers "public"
    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
    at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
    at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)

解決辦法

以下程式碼段,通過反射的方式動態獲取Person類的屬性值,從而將RDD轉換成DataFrame,這就要求Person類的訪問修飾符為public,如果不是則拋上述異常。

DataFrame df = sqlContext.createDataFrame(persons, Person.class);
  • java.lang.ClassCastException
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
    at org.apache.spark.sql.Row$class.getInt(Row.scala:218)

解決辦法

person.setAge(Integer.valueOf(attrs[2]));//age的初始序列號為 2
person.setAge(row.getInt(0));//age的序列號變為 0

age的初始序列號為 2,猜想大概是轉換成DataFrame後,做sql查詢,DataFrame內部為了提高查詢效能進行了欄位重新排序(安裝字母升序)優化,所以age序列號變成了 0。如果寫成:person.setAge(row.getInt(2))由於獲取的是name欄位的值,故出現上述異常。

總結

  • RDD 轉換成 DataFrame使用SQLContext的方法createDataFrame:
def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame
  • DataFrame轉換成RDD使用DataFrame的欄位 rdd:
dataFrame.rdd