建立DataFrame的幾種方式
阿新 • • 發佈:2018-12-31
1.讀取json格式的檔案建立DataFrame
json檔案如下:
{"name":"Fanbingbing", "score":100}
{"name":"Xuruyun", "score":99}
{"name":"Liangyongqi", "score":74}
Java程式碼:
package demo.java.cn; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; public class DataFrameFromJson { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("jsonfile"); SparkContext sc = new SparkContext(conf); //建立sqlContext SQLContext sqlContext = new SQLContext(sc); //讀取json格式的檔案 DataFrame df = sqlContext.read().format("json").load("star.json"); df.show();//顯示 DataFrame中的內容,如果顯示多行要指定多少行show(行數) df.printSchema();//顯示schema資訊 //將DataFrame註冊成臨時的一張表,這張表臨時註冊到記憶體中,不會到磁碟 df.registerTempTable("startable"); DataFrame sqlDf = sqlContext.sql("select * from startable where score >80"); sqlDf.show(); sc.stop(); } }
打印出來的結果:
+-----------+-----+ | name|score| +-----------+-----+ |Fanbingbing| 100| | Xuruyun| 99| |Liangyongqi| 74| +-----------+-----+ root |-- name: string (nullable = true) |-- score: long (nullable = true) +-----------+-----+ | name|score| +-----------+-----+ |Fanbingbing| 100| | Xuruyun| 99| +-----------+-----+
Scala程式碼:
package demo.scala.cn import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object DataFrameFromJson { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local").setAppName("jsonfile") val sc=new SparkContext(conf) val sqlContext=new SQLContext(sc) val df=sqlContext.read.json("star.json") df.show() //顯示 DataFrame中的內容,如果顯示多行要指定多少行show(行數) df.printSchema() //顯示schema資訊 df.registerTempTable("startable") val sqlDf=sqlContext.sql("select * from startable where score >80") sqlDf.show() sc.stop() } }
2.非Json格式的檔案建立DataFrame
資料檔案如下:
Fanbingbing,100
Xuruyun,99
Liangyongqi,74
Java程式碼:
package demo.java.cn;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
public class DataFrameFromFile
{
public static void main(String[] args)
{
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("rddStruct");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> linesRDD = sc.textFile("star.txt");
JavaRDD<Row> rowRDD = linesRDD.map(new Function<String, Row>()
{
private static final long serialVersionUID = 1L;
public Row call(String s) throws Exception
{
String[] split = s.split(",");
return RowFactory.create(//這裡欄位順序一定要和下邊 StructField對應起來
String.valueOf(split[0]),
Integer.valueOf(split[1])
);
}
});
List<StructField> asList = Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("score", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(asList);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();
//DataFrame再轉為RDD
JavaRDD<Row> rowRDD2 = df.javaRDD();
rowRDD2.foreach(new VoidFunction<Row>()
{
public void call(Row row) throws Exception
{
System.out.print(row.getString(0));
System.out.println(","+row.getInt(1));
}
});
sc.stop();
}
}
Scala程式碼:
package demo.scala.cn
import org.apache.spark.sql.{RowFactory, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object DataFrameFromFile {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("rddStruct")
val sc = new SparkContext(conf)
val sqlContext=new SQLContext(sc)
val linesRDD = sc.textFile("star.txt")
val rowRDD = linesRDD.map { x => {
val split = x.split(",")
RowFactory.create(split(0), Integer.valueOf(split(1)))
}}
val schema = StructType(List(
StructField("name", StringType, true),
StructField("score", IntegerType, true)
))
val df=sqlContext.createDataFrame(rowRDD,schema)
df.show()
df.printSchema()
sc.stop()
}
}
3.將DataFrame儲存成parquet檔案,儲存成parquet的方式有兩種:
a.
df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
b.
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
4.讀取parquet檔案建立DataFrame
Java程式碼:
package demo.java.cn;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class DataFrameFromParquet
{
public static void main(String[] args)
{
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("fromparquet");
SparkContext sc = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().parquet("./sparksql/parquet");
df.show();
sc.stop();
}
}
Scala程式碼:
package demo.scala.cn
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object DataFrameFromParquet {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("fromparquet")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.parquet("./sparksql/parquet")
df.show()
sc.stop()
}
}
5.讀取mysql中的資料建立DataFrame
mysql中的資料如下:
mysql> select * from Star;
+-------------+-------+
| name | score |
+-------------+-------+
| Fanbingbing | 100 |
| Xuruyun | 99 |
| Liangyongqi | 74 |
+-------------+-------+
3 rows in set (0.00 sec)
Java程式碼:
package demo.java.cn;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import java.util.HashMap;
import java.util.Map;
public class DataFrameFromMysql
{
public static void main(String[] args)
{
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("mysql");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://master.cn:3306/db_spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "123456");
options.put("dbtable", "Star");
DataFrame df = sqlContext.read().format("jdbc").options(options).load();
df.show();
sc.stop();
}
}
Scala程式碼:
package demo.scala.cn
import java.util
import java.util.Properties
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
object DataFrameFromMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("mysql")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val options = new util.HashMap[String, String]()
options.put("url", "jdbc:mysql://master.cn:3306/db_spark")
options.put("driver", "com.mysql.jdbc.Driver")
options.put("user", "root")
options.put("password", "123456")
options.put("dbtable", "Star")
val df = sqlContext.read.format("jdbc").options(options).load()
df.show()
//將DataFrame的資料插入mysql
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456");
df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://master.cn:3306/db_spark","result",properties)
sc.stop()
}
}