1. 程式人生 > >Spark SQL,DataFrames and DataSets Guide官方文件翻譯

Spark SQL,DataFrames and DataSets Guide官方文件翻譯

通過使用saveAsTable命令,可以將DataFrames持久化到表中。已有的Hive部署不需要使用這個特性,Spark將會建立一個預設的本地Hive 元資料庫。和createOrRepalceTempView不同的是,saveAsTable會實體化DataFrame的內容,並且會在Hive 元資料庫中建立一個指標指向該資料。只要維護與同一個元資料庫的連線,即使你重啟Spark程式,那個持久化的表仍然會存在。通過使用SparkSession的table方法,並以表名為引數,就可以建立一個和DataFrame一樣的持久化表。 預設情況下,saveAsTable方法將建立一個“managed table”,意味著資料的位置受元資料庫控制。當表被刪除後,managed table也會刪除儲存的資料。 Parquet檔案 Parquet是多種資料處理系統支援的列式資料格式。該檔案保留了原始資料的模式,Spark SQL提供了parquet檔案的讀寫操作。 讀取Parquet檔案
例子:
schemaPeople.write.parquet("people.parquet")#schemaPeople是上邊例子建立的DataFrame,parquet方法將DataFrame內容以Parquet的格式進行儲存,維持著schema資訊。
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")
#Parquet檔案也可以用來建立一個臨時的view,然後用於SQL語句中
parquetFile.createOrReplaceTempView("parquetFile");
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
for teenName in teenNames.collect():
  print(teenName)
分割槽解析
在類似於Hive的系統中,對錶進行分割槽是對資料進行優化的方式之一。在一個分割槽的表中,資料通過分割槽列將資料儲存在不同的目錄下。Parquet資料來源現在能夠自動發現並解析分割槽資訊。例如,對人口資料進行分割槽儲存,分割槽列為gender和country,使用下面的目錄結構:
path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...
通過將path/to/table傳遞給SparkSession.read.parquet或SparkSession.read.load,Spark SQL可以根據路徑自動解析分割槽資訊。返回的DataFrame的schema如下:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
需要注意的是,分割槽列的資料型別是自動解析的。當前支援數值型別和string型別。有時使用者不希望自動解析分割槽列的資料型別,自動解析分割槽型別的引數為:park.sql.sources.partitionColumnTypeInference.enabled,預設值為true
。如果想關閉該功能,直接將該引數設定為disabled。此時,分割槽列資料格式將被預設設定為string型別,不再進行型別解析。 從Spark1.6開始,分割槽解析只有在預設給定的路徑下才會發現分割槽。對於上面的例子,如果使用者將path/to/table/gender=male傳給SparkSession.read.parquet或SparkSession.read.load,gender不會被當做分割槽列。如果使用者需要指定分割槽解析開始的基本路徑,可以在資料來源options中設定basePath。例如,當path/to/table/gender=male是資料的路徑時,使用者設定basePath的值為path/to/table/,那麼gender就會成為分割槽列。 Schema合併 像ProtocolBuffer、Avro和Thrift,Parquet也支援schema evolution(schema演變)。使用者可以先定義一個簡單的Schema,然後逐漸的向Schema中增加列描述。通過這種方式,使用者可以獲取多個有不同Schema但相互相容的Parquet檔案。現在Parquet資料來源能自動檢測這種情況,併合並這些檔案的schemas。 因為schema合併是一個高消耗的操作,很多情況下是不需要的,所以從1.5開始預設關閉了這個功能。可以通過以下兩種方式開啟: 1、當資料來源是Parquet檔案時,將資料來源選項mergeSchema設定為true(如下面的例子)。 2、將全域性SQL選項spark.sql.parquet.mergeSchema設定為true。
# spark from the previous example is used in this example.

# Create a simple DataFrame, stored into a partition directory
df1 = spark.createDataFrame(sc.parallelize(range(1, 6))\
                                   .map(lambda i: Row(single=i, double=i * 2)))
df1.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
df2 = spark.createDataFrame(sc.parallelize(range(6, 11))
                                   .map(lambda i: Row(single=i, triple=i * 3)))
df2.write.parquet("data/test_table/key=2")

# Read the partitioned table
df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- single: int (nullable = true)
# |-- double: int (nullable = true)
# |-- triple: int (nullable = true)
# |-- key : int (nullable = true)
Hive metastore Parquet錶轉換 當向Hive metastore讀寫Parquet表時,為了更好地效能,Spark SQL將使用自帶的Parquet SerDe,而不用Hive的SerDe(SerDe:Serialize/Deserialize的簡稱,目的是用於序列化和反序列化)。這個優化的配置引數為spark.sql.hive.convertMetastoreParquet,預設是開啟的。 Hive/Parquet Schema反射(Reconciliation) 從表schema處理的角度來看,Hive和Parquet有兩個主要區別: 1、Hive不區分大小寫(is case insensitive),而Parquet區分大小寫。 2、Hive允許所有列為空,而Parquet中的空是有重要意義的。 由於這兩個區別,當我們將Hive metastore Parquet錶轉換成Spark SQL Parquet表時,需要將Hive metastore schema 和Parquet schema一致化,一致化規則如下: 1、兩個schema中同名的欄位必須具有相同的資料型別。一致化後的欄位必須為Parquet的欄位型別,所以空值是很重要的(nullability is respected)。 2、一致化後的schema只包含那些在Hive metastore schema中定義的欄位。 (1)在一致化後的schema中忽略只出現在Parquet schema的欄位。 (2)將只出現在Hive metastore schema的欄位設為nullable欄位,並加到一致化後的schema中。 元資料重新整理 為了更好的效能,Spark SQL 快取了Parquet 元資料。當Hive metastore Parquet錶轉換是enabled時,那些轉換後的表的元資料也能夠被快取。當表被Hive或其他工具更新後,為了保證元資料的一致性,需要手動重新整理元資料。示例:
# spark is an existing HiveContext
spark.refreshTable("my_table")
配置 可以使用SparkSession的setConf方法來配置Parquet,或者使用SQL執行SET key==value命令。