1. 程式人生 > >建立DataFrame的幾種方式

建立DataFrame的幾種方式

1.讀取json格式的檔案建立DataFrame

json檔案如下:

{"name":"Fanbingbing", "score":100}
{"name":"Xuruyun", "score":99}
{"name":"Liangyongqi", "score":74}

Java程式碼:

package demo.java.cn;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class DataFrameFromJson
{
    public static void main(String[] args)
    {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("jsonfile");
        SparkContext sc = new SparkContext(conf);
        //建立sqlContext
        SQLContext sqlContext = new SQLContext(sc);
        //讀取json格式的檔案
        DataFrame df = sqlContext.read().format("json").load("star.json");
        df.show();//顯示 DataFrame中的內容,如果顯示多行要指定多少行show(行數)
        df.printSchema();//顯示schema資訊
        //將DataFrame註冊成臨時的一張表,這張表臨時註冊到記憶體中,不會到磁碟
        df.registerTempTable("startable");
        DataFrame sqlDf = sqlContext.sql("select * from startable where score >80");
        sqlDf.show();
        sc.stop();
    }
}

打印出來的結果:

+-----------+-----+
|       name|score|
+-----------+-----+
|Fanbingbing|  100|
|    Xuruyun|   99|
|Liangyongqi|   74|
+-----------+-----+

root
 |-- name: string (nullable = true)
 |-- score: long (nullable = true)

+-----------+-----+
|       name|score|
+-----------+-----+
|Fanbingbing|  100|
|    Xuruyun|   99|
+-----------+-----+

Scala程式碼:

package demo.scala.cn

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object DataFrameFromJson {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("jsonfile")
    val sc=new SparkContext(conf)
    val sqlContext=new SQLContext(sc)
    val df=sqlContext.read.json("star.json")
    df.show() //顯示 DataFrame中的內容,如果顯示多行要指定多少行show(行數)
    df.printSchema() //顯示schema資訊
    df.registerTempTable("startable")
    val sqlDf=sqlContext.sql("select * from startable where score >80")
    sqlDf.show()
    sc.stop()
  }
}

2.非Json格式的檔案建立DataFrame

資料檔案如下:

Fanbingbing,100
Xuruyun,99
Liangyongqi,74

Java程式碼:

package demo.java.cn;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

public class DataFrameFromFile
{
    public static void main(String[] args)
    {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("rddStruct");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        JavaRDD<String> linesRDD = sc.textFile("star.txt");
        JavaRDD<Row> rowRDD = linesRDD.map(new Function<String, Row>()
        {
            private static final long serialVersionUID = 1L;
            public Row call(String s) throws Exception
            {
                String[] split = s.split(",");
                return RowFactory.create(//這裡欄位順序一定要和下邊 StructField對應起來
                        String.valueOf(split[0]),
                        Integer.valueOf(split[1])
                );
            }
        });
        List<StructField> asList = Arrays.asList(
                DataTypes.createStructField("name", DataTypes.StringType, true),
                DataTypes.createStructField("score", DataTypes.IntegerType, true)
        );
        StructType schema = DataTypes.createStructType(asList);
        DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
        df.show();
        //DataFrame再轉為RDD
        JavaRDD<Row> rowRDD2 = df.javaRDD();
        rowRDD2.foreach(new VoidFunction<Row>()
        {
            public void call(Row row) throws Exception
            {
                System.out.print(row.getString(0));
                System.out.println(","+row.getInt(1));
            }
        });
        sc.stop();
    }
}

Scala程式碼:

package demo.scala.cn

import org.apache.spark.sql.{RowFactory, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object DataFrameFromFile {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("rddStruct")
    val sc = new SparkContext(conf)
    val sqlContext=new SQLContext(sc)
    val linesRDD = sc.textFile("star.txt")
    val rowRDD = linesRDD.map { x => {
      val split = x.split(",")
      RowFactory.create(split(0), Integer.valueOf(split(1)))
    }}
    val schema = StructType(List(
      StructField("name", StringType, true),
      StructField("score", IntegerType, true)
    ))
    val df=sqlContext.createDataFrame(rowRDD,schema)
    df.show()
    df.printSchema()
    sc.stop()
  }
}

3.將DataFrame儲存成parquet檔案,儲存成parquet的方式有兩種:

a.

df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");

b.

df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");

4.讀取parquet檔案建立DataFrame

Java程式碼:

package demo.java.cn;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class DataFrameFromParquet
{
    public static void main(String[] args)
    {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("fromparquet");
        SparkContext sc = new SparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame df = sqlContext.read().parquet("./sparksql/parquet");
        df.show();
        sc.stop();
    }
}

Scala程式碼:

package demo.scala.cn

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object DataFrameFromParquet {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("fromparquet")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.parquet("./sparksql/parquet")
    df.show()
    sc.stop()
  }
}

 5.讀取mysql中的資料建立DataFrame

mysql中的資料如下:

mysql> select * from Star;
+-------------+-------+
| name        | score |
+-------------+-------+
| Fanbingbing |   100 |
| Xuruyun     |    99 |
| Liangyongqi |    74 |
+-------------+-------+
3 rows in set (0.00 sec)

 Java程式碼:

package demo.java.cn;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.HashMap;
import java.util.Map;

public class DataFrameFromMysql
{
    public static void main(String[] args)
    {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("mysql");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        Map<String, String> options = new HashMap<String, String>();
        options.put("url", "jdbc:mysql://master.cn:3306/db_spark");
        options.put("driver", "com.mysql.jdbc.Driver");
        options.put("user", "root");
        options.put("password", "123456");
        options.put("dbtable", "Star");
        DataFrame df = sqlContext.read().format("jdbc").options(options).load();
        df.show();
        sc.stop();
    }
}

Scala程式碼:

package demo.scala.cn

import java.util
import java.util.Properties

import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

object DataFrameFromMysql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("mysql")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val options = new util.HashMap[String, String]()
    options.put("url", "jdbc:mysql://master.cn:3306/db_spark")
    options.put("driver", "com.mysql.jdbc.Driver")
    options.put("user", "root")
    options.put("password", "123456")
    options.put("dbtable", "Star")
    val df = sqlContext.read.format("jdbc").options(options).load()
    df.show()
    //將DataFrame的資料插入mysql
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123456");
    df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://master.cn:3306/db_spark","result",properties)
    sc.stop()
  }
}