RDD轉換為DataFrame案例
阿新 • • 發佈:2019-02-15
SparkSQL支援兩種方式來將RDD轉換為DataFrame。
第一種方式,是使用反射來推斷包含了特定資料型別的RDD的元資料。這種基於反射的方式,程式碼比較簡潔,當你已經知道你的RDD的元資料時,是一種非常不錯的方式。
第二種方式,是通過程式設計介面來建立DataFrame,你可以在程式執行時動態構建一份元資料,然後將其應用到已經存在的RDD上。這種方式的程式碼比較冗長,但是如果在編寫程式時,還不知道RDD的元資料,只有在程式執行時,才能動態得知其元資料,那麼只能通過這種動態構建元資料的方式。
檔案students.txt中內容如下:
1,leo,17 2,marry,17 3,jack,18 4,tom,19
1. 使用反射方式將RDD轉換為DataFrame
Java程式碼如下:
Scala程式碼如下:public class RDD2DataFrameReclection { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("RDD2DataFrameReflection"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> lines = sc.textFile("./data/students.txt"); JavaRDD<Student> students = lines.map(new Function<String, Student>() { @Override public Student call(String line) throws Exception { String[] lineSplited = line.split(","); Student stu = new Student(); stu.setId(Integer.valueOf(lineSplited[0].trim())); stu.setName(lineSplited[1]); stu.setAge(Integer.valueOf(lineSplited[2])); return stu; } }); //使用反射方式將RDD轉換為DataFrame //將Student.class傳入進入,其實就是用反射的方式來建立DataFrame //因為Student.class本身就是反射的一個應用 //然後底層還得通過對Student Class進行反射,來獲取其中的field //這裡要求,JavaBean必須實現Serializable介面,是可序列化的 DataFrame studentDF = sqlContext.createDataFrame(students, Student.class); //拿到了一個DataFrame之後,就可以將去註冊為一個臨時表,然後針對其中的資料執行SQL語句 studentDF.registerTempTable("students"); //針對students臨時表執行SQL語句,查詢年齡小於等於18歲的學生,就是teenager DataFrame teenagerDF = sqlContext.sql("select * from students where age<=18"); //將查詢出來的DataFrame再次轉換為RDD JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD(); //將RDD中的資料進行對映,對映為student JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Function<Row, Student>() { @Override public Student call(Row row) throws Exception { //row中的資料順序可以與期望的不同 Student stu = new Student(); stu.setAge(row.getInt(0)); stu.setId(row.getInt(1)); stu.setName(row.getString(2)); return stu; } }); //將資料collect回來,打印出來 List<Student> studentList = teenagerStudentRDD.collect(); for(Student stu : studentList) System.out.println(stu); } }
object RDD2DataFrameReflection extends App { val conf = new SparkConf() .setAppName("RDD2DataFrameReflection") .setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //在scala中使用反射方式,進行RDD到DataFrame的轉換,需要手動匯入一個隱式轉換 import sqlContext.implicits._ case class Student(id:Int,name:String,age:Int) //這裡其實就是一個普通的,元素為case class的RDD //直接對它使用toDF()方法,即可轉換為DataFrame val studentDF = sc.textFile("./data/students.txt", 1) .map { line => line.split(",") } .map { arr => Student(arr(0).trim().toInt, arr(1), arr(2).trim().toInt) } .toDF() studentDF.registerTempTable("students") val teenagerDF = sqlContext.sql("select * from students where age<=18") val teenagerRDD = teenagerDF.rdd teenagerRDD.map { row => Student(row(0).toString().toInt,row(1).toString(),row(2).toString().toInt) } .collect() .foreach { stu => println(stu.id + ":" + stu.name + ":" + stu.age) } // 在scala中,對row的使用,比java中的row的使用,更加豐富 // 在scala中,可以用row的getAs()方法,獲取指定列名的列 teenagerRDD.map { row => Student(row.getAs[Int]("id"),row.getAs[String]("name"),row.getAs[Int]("age")) } .collect() .foreach { stu => println(stu.id + ":" + stu.name + ":" + stu.age) } // 還可以通過row的getValuesMap()方法,獲取指定幾列的值,返回的是個map val studentRDD = teenagerRDD.map { row => { val map = row.getValuesMap[Any](Array("id","name","age")); Student(map("id").toString().toInt,map("name").toString(),map("age").toString().toInt) } } studentRDD.collect().foreach { stu => println(stu.id + ":" + stu.name + ":" + stu.age) } }
2. 以程式設計方式動態指定元資料,將RDD轉換為DataFrame
Java程式碼如下:
public class RDD2DataFrameProgramatically {
public static void main(String[] args) {
//建立SparkConf、JavaSparkContext、SQLContext
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("RDD2DataFrameProgramatically");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
//第一步,建立一個普通的RDD,但是,必須將其轉換為RDD<Row>的這種格式
JavaRDD<String> lines = sc.textFile("./data/students.txt");
JavaRDD<Row> studentRows = lines.map(new Function<String, Row>() {
@Override
public Row call(String line) throws Exception {
String[] lineSplited = line.split(",");
return RowFactory.create(Integer.valueOf(lineSplited[0])
,lineSplited[1],
Integer.valueOf(lineSplited[2]));
}
});
//第二步,動態構造元資料
//比如說,id、name等,field的名稱和型別,可能都是在程式執行過程中,動態從mysql db裡
//或者配置檔案中,加載出來的,是不固定的
//所以特別適合用這種程式設計的方式,來構造元資料
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(structFields);
//第三步,使用動態構造的元資料將RDD轉換為DataFrame
DataFrame studentDF = sqlContext.createDataFrame(studentRows, structType);
//後面,就可以使用DataFrame了
studentDF.registerTempTable("students");
DataFrame teenagerDF = sqlContext.sql("select * from students where age < 18");
List<Row> rows = teenagerDF.javaRDD().collect();
for(Row row : rows) {
System.out.println(row);
}
}
}
Scala程式碼如下:
object RDD2DataFrameProgrammatically extends App {
val conf = new SparkConf()
.setMaster("local")
.setAppName("RDD2DataFrameProgrammatically")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//第一步,構造出元素為Row的普通RDD
val studentRDD = sc.textFile("./data/students.txt", 1)
.map { line => Row(line.split(",")(0).toInt, line.split(",")(1), line.split(",")(2).toInt) }
//第二步,程式設計方式動態構造元資料
val structType = StructType(Array(
StructField("id",IntegerType,true),
StructField("name",StringType,true),
StructField("age",IntegerType,true)))
//第三步,進行RDD到DataFrame的轉換
val studentDF = sqlContext.createDataFrame(studentRDD, structType)
//接續正常使用
studentDF.registerTempTable("students")
val teenagerDF = sqlContext.sql("select * from students where age<=18")
val teenagerRDD = teenagerDF.rdd.collect().foreach { row => println(row) }
}