1. 程式人生 > >Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

ati been -m displays txt -a 版本 ava form

Spark SQL是處理結構化數據的Spark模塊。它提供了DataFrames這樣的編程抽象。同一時候也能夠作為分布式SQL查詢引擎使用。

DataFrames

DataFrame是一個帶有列名的分布式數據集合。等同於一張關系型數據庫中的表或者R/Python中的data frame,只是在底層做了非常多優化;我們能夠使用結構化數據文件、Hive tables,外部數據庫或者RDDS來構造DataFrames。

1. 開始入口:

入口須要從SQLContext類或者它的子類開始,當然須要使用SparkContext創建SQLContext;這裏我們使用pyspark(已經自帶了SQLContext即sc):

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

還能夠使用HiveContext,它能夠提供比SQLContext很多其它的功能。比如能夠使用更完整的HiveQL解析器寫查詢,使用Hive UDFs。從Hive表中讀取數據等。

使用HiveContext並不須要安裝hive,Spark默認將HiveContext單獨打包避免對hive過多的依賴

2.創建DataFrames
使用JSON文件創建:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.json("examples/src/main/resources/people.json"
) # Displays the content of the DataFrame to stdout df.show()

註意:
這裏你可能須要將文件存入HDFS(這裏的文件在Spark安裝文件夾中,1.4版本號)

hadoop fs -mkdir examples/src/main/resources/
hadoop fs -put /appcom/spark/examples/src/main/resources/*         /user/hdpuser/examples/src/main/resources/

3.DataFrame操作

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Create the DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json") # Show the content of the DataFrame df.show() ## age name ## null Michael ## 30 Andy ## 19 Justin # Print the schema in a tree format df.printSchema() ## root ## |-- age: long (nullable = true) ## |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() ## name ## Michael ## Andy ## Justin # Select everybody, but increment the age by 1 df.select(df[‘name‘], df[‘age‘] + 1).show() ## name (age + 1) ## Michael null ## Andy 31 ## Justin 20 # Select people older than 21 df.filter(df[‘age‘] > 21).show() ## age name ## 30 Andy # Count people by age df.groupBy("age").count().show() ## age count ## null 1 ## 19 1 ## 30 1

4.使用編程執行SQL查詢
SQLContext能夠使用編程執行SQL查詢並返回DataFrame。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")

5.和RDD交互

將RDD轉換成DataFrames有兩種方法:

  • 利用反射來判斷包括特定類型對象的RDD的schema。這樣的方法會簡化代碼而且在你已經知道schema的時候非常適用。
  • 使用編程接口。構造一個schema並將其應用在已知的RDD上。

一、利用反射判斷Schema
Spark SQL能夠將含Row對象的RDD轉換成DataFrame。並判斷數據類型。通過將一個鍵值對(key/value)列表作為kwargs傳給Row類來構造Rows。

key定義了表的列名,類型通過看第一列數據來判斷。

(所以這裏RDD的第一列數據不能有缺失)未來版本號中將會通過看很多其它數據來判斷數據類型。像如今對JSON文件的處理一樣。

# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print teenName

二、編程指定Schema
通過編程指定Schema須要3步:

  1. 從原來的RDD創建一個元祖或列表的RDD。
  2. 用StructType 創建一個和步驟一中創建的RDD中元祖或列表的結構相匹配的Schema。

  3. 通過SQLContext提供的createDataFrame方法將schema 應用到RDD上。

# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")

# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print name

Spark SQL and DataFrame Guide(1.4.1)——之DataFrames