1. 程式人生 > >[2.3]Spark DataFrame操作(二)之通過程式設計動態完成RDD與DataFrame的轉換

[2.3]Spark DataFrame操作(二)之通過程式設計動態完成RDD與DataFrame的轉換

參考

場景

一、上一篇部落格將待分析資料影射成JavaBean的欄位,然後通過def createDataFrame(data:java.util.List[_],beanClass:Class[_]):DataFrame完成了RDD與DataFrame的轉換(即:Inferring the Schema Using Reflection);今天換一種實現方式-Programmatically Specifying the Schema:構建StructType,通過def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

以完成RDD與DataFrame的轉換!
二、Programmatically Specifying the Schema
“When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

1、 Create an RDD of Rows from the original RDD;
2、 Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
3、Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.”

實驗

java版

package cool.pengych.spark
.sql; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class RDD2DataFrameProgrammatically { public static void main(String[] args) { /* * 1、建立SQLContext */ SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("RDD2DataFrameByReflection"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); /* * 2、在RDD的基礎上建立型別為Row的RDD */ JavaRDD<String> lines = sc.textFile("file:////home/pengyucheng/java/rdd2dfram.txt"); JavaRDD<Row> rows = lines.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(String line) { String[] splited = line.split(","); return RowFactory.create(Integer.valueOf(splited[0]),splited[1],Integer.valueOf(splited[2])); } }); /* * 3、動態構造DataFrame的元資料,一般而言,有多少列以及每列的具體型別可能來自於 * Json檔案,也可能來自於DB */ List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields); /* * 4、基於以後的MetaData以及RDD<Row>來構造DataFrame */ DataFrame personDF = sqlContext.createDataFrame(rows, structType); /* * 5、註冊臨時表供以後SQL使用 */ personDF.registerTempTable("persons"); /* * 6、進行資料的多維度分析 */ DataFrame results = sqlContext.sql("select * from persons where age > 8"); /* * 7、由DataFrame轉換成為RDD */ List<Row> listRows = results.javaRDD().collect(); for (Row row : listRows) { System.out.println(row); } } }

scala版

package main.scala
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.RowFactory
import org.apache.spark.sql.types.StructField
import java.util.ArrayList
import org.apache.spark.sql.types.DataTypes

object RDD2DataFrameProgrammatically
{
    def main(args: Array[String]): Unit = {
       val conf = new SparkConf().setMaster("local[*]").setAppName("DataFram Ops")
       val sc = new SparkContext(conf)
       val sqlContext = new SQLContext(sc)

       val lines = sc.textFile("file:////home/pengyucheng/java/rdd2dfram.txt")
       val rowsRDD = lines.map(line => {
         val splited = line.split(",")
         val row = RowFactory.create(Integer.valueOf(splited(0)),splited(1),Integer.valueOf(splited(2)))
         row
       })     
       val structFields = new ArrayList[StructField]()
       structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
          structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
       val structType = DataTypes.createStructType(structFields)

       val personDf = sqlContext.createDataFrame(rowsRDD, structType)
       personDf.registerTempTable("persons")

       sqlContext.sql("select * from persons where age > 8").rdd.collect.foreach(println)
    }  
}

執行結果

16/05/26 21:28:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
[1,hadoop,11]
[4,ivy,27]
16/05/26 21:28:20 INFO SparkContext: Invoking stop() from shutdown hook

總結

Spark DataFrame處理資料流程圖(純屬個人總結,準備性有待後續驗證)

這裡寫圖片描述