Spark SQL資料載入和儲存實戰

Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
二:Spark SQL讀寫資料程式碼實戰:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext
; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.List; public class SparkSQLLoadSaveOps { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster
("local").setAppName("SparkSQLLoadSaveOps"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext = new SQLContext(sc); /** * read()是DataFrameReader型別,load可以將資料讀取出來 */ DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json"
); /** * 直接對DataFrame進行操作 * Json: 是一種自解釋的格式,讀取Json的時候怎麼判斷其是什麼格式? * 通過掃描整個Json。掃描之後才會知道元資料 */ //通過mode來指定輸出檔案的是append。建立新檔案來追加檔案 peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames"); } }

1. read方法返回DataFrameReader,用於讀取資料。

 * :: Experimental ::
 * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
 * {{{
 *   sqlContext.read.parquet("/path/to/file.parquet")
 *   sqlContext.read.schema(schema).json("/path/to/file.json")
 * }}}
 * @group genericdata
 * @since 1.4.0
def read: DataFrameReader = new DataFrameReader(this)
2.  然後再呼叫DataFrameReader類中的format,指出讀取檔案的格式。
 * Specifies the input data source format.
 * @since 1.4.0
def format(source: String): DataFrameReader = {
  this.source = source
3.  通過DtaFrameReader中load方法通過路徑把傳入過來的輸入變成DataFrame。
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 * @since 1.4.0
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
  option("path", path).load()

1. 呼叫DataFrame中select函式進行對列篩選

 * Selects a set of columns. This is a variant of `select` that can only select
 * existing columns using column names (i.e. cannot construct expressions).
 * {{{
 *   // The following two are equivalent:
 *   df.select("colA", "colB")
 *   df.select($"colA", $"colB")
 * }}}
 * @group dfops
 * @since 1.3.0
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
2.  然後通過write將結果寫入到外部儲存系統中。
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 * @group output
 * @since 1.4.0
def write: DataFrameWriter = new DataFrameWriter(this)
3.   在保持檔案的時候mode指定追加檔案的方式
 * Specifies the behavior when data or table already exists. Options include:
// Overwrite是覆蓋
 *   - `SaveMode.Overwrite`: overwrite the existing data.
 *   - `SaveMode.Append`: append the data.
 *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
 *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 * @since 1.4.0
def mode(saveMode: SaveMode): DataFrameWriter = {
  this.mode = saveMode
4.   最後,save()方法觸發action,將檔案輸出到指定檔案中。
 * Saves the content of the [[DataFrame]] at the specified path.
 * @since 1.4.0
def save(path: String): Unit = {
  this.extraOptions += ("path" -> path)

三:Spark SQL讀寫整個流程圖如下:
1. Load()返回DataFrame型別的資料集合,使用的資料是從預設的路徑讀取。

 * Returns the dataset stored at path as a DataFrame,
 * using the default data source configured by spark.sql.sources.default.
 * @group genericdata
 * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0.
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {
2.  追蹤load原始碼進去,原始碼如下:


 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 * @since 1.4.0
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
  option("path", path).load()
3.  追蹤load原始碼如下:
 * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
 * key-value stores).
 * @since 1.4.0
def load(): DataFrame = {
  val resolved = ResolvedDataSource(
    userSpecifiedSchema = userSpecifiedSchema,
    partitionColumns = Array.empty[String],
    provider = source,
    options = extraOptions.toMap)
  DataFrame(sqlContext, LogicalRelation(resolved.relation))

1. Format:具體指定檔案格式,這就獲得一個巨大的啟示是:如果是Json檔案格式可以保持為Parquet等此類操作。
Spark SQL在讀取檔案的時候可以指定讀取檔案的型別。例如,Json,Parquet.

 * Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
 * @since 1.4.0
def format(source: String): DataFrameReader = {
  this.source = source //FileType

1. 建立DataFrameWriter例項

 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 * @group output
 * @since 1.4.0
def write: DataFrameWriter = new DataFrameWriter(this)
2.  追蹤DataFrameWriter原始碼如下:


 * :: Experimental ::
 * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
 * key-value stores, etc). Use [[DataFrame.write]] to access this.
 * @since 1.4.0
final class DataFrameWriter private[sql](df: DataFrame) {

1. Overwrite是覆蓋,之前寫的資料全都被覆蓋了。

 * Specifies the behavior when data or table already exists. Options include:
 *   - `SaveMode.Overwrite`: overwrite the existing data.
 *   - `SaveMode.Append`: append the data.
 *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
 *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 * @since 1.4.0
def mode(saveMode: SaveMode): DataFrameWriter = {
  this.mode = saveMode
2.  通過模式匹配接收外部引數
 * Specifies the behavior when data or table already exists. Options include:
 *   - `overwrite`: overwrite the existing data.
 *   - `append`: append the data.
 *   - `ignore`: ignore the operation (i.e. no-op).
 *   - `error`: default option, throw an exception at runtime.
 * @since 1.4.0
def mode(saveMode: String): DataFrameWriter = {
  this.mode = saveMode.toLowerCase match {
    case "overwrite" => SaveMode.Overwrite
    case "append" => SaveMode.Append
    case "ignore" => SaveMode.Ignore
    case "error" | "default" => SaveMode.ErrorIfExists
    case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
      "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")

1. save將結果儲存傳入的路徑。

 * Saves the content of the [[DataFrame]] at the specified path.
 * @since 1.4.0
def save(path: String): Unit = {
  this.extraOptions += ("path" -> path)
2.  追蹤save方法。
 * Saves the content of the [[DataFrame]] as the specified table.
 * @since 1.4.0
def save(): Unit = {
3.  其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName


// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
  defaultValue = Some("org.apache.spark.sql.parquet"),
  doc = "The default data source to use in input/output.")

1. toDF函式是將RDD轉換成DataFrame

 * Returns the object itself.
 * @group basic
 * @since 1.3.0
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = this
2.  show()方法:將結果顯示出來
 * Displays the [[DataFrame]] in a tabular form. For example:
 * {{{
 *   year  month AVG('Adj Close) MAX('Adj Close)
 *   1980  12    0.503218        0.595103
 *   1981  01    0.523289        0.570307
 *   1982  02    0.436504        0.475256
 *   1983  03    0.410516        0.442194
 *   1984  04    0.450090        0.483521
 * }}}
 * @param numRows Number of rows to show
 * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
 *              be truncated and all cells will be aligned right
 * @group action
 * @since 1.5.0
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println


 * Compose the string representing rows for output
 * @param _numRows Number of rows to show
 * @param truncate Whether truncate long strings and align cells right
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
  val numRows = _numRows.max(0)
  val sb = new StringBuilder
  val takeResult = take(numRows + 1)
  val hasMoreData = takeResult.length > numRows
  val data = takeResult.take(numRows)
  val numCols = schema.fieldNames.length


