1. 程式人生 > >Spark SQL and DataFrames

Spark SQL and DataFrames

1.SparkSession

SparkSQL的操作都建立在SparkSession上,建立一個SparkSession叫spark,後面程式碼都基於此,不再提示

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

2.建立SparkDataFrames

df = spark.read
.json("examples/src/main/resources/people.json") df.show()

3.DataFrame的基本操作

讀取檔案生成DataFrame df = spark.read.json("examples/src/main/resources/people.json")
檢視內容 df.show()
樹結構打印表結構df.printSchema()
選擇一列df.select("name").show()
選擇兩列,其中一列+1df.select(df['name'],df['age']+1).show()
篩選df.filter(df['age']>21).show()


分組聚合df.groupBy("age").count().show()

4.由RDD轉換

方式1:Row推斷模式

from pyspark.sql import Row
sc = spark.sparkContext

lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l:l.split(','))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))

schemaPeople = spark.createSchema(people)
schemaPeople.createOrReplaceTempView('people'
)

名為spark的SparkSession註冊了名為people的table,可通過spark.sql()執行對註冊的表的sql語句。
table由RDD轉化來,由Row()建立列,用createDataFrame()註冊table,用createOrReplaceTempView()建立表名。

teenagers = spark.sql('SELECT name FROM people WHERE age >= 13 AND age <= 19')
teenNames = teenagers.map(lambda p:'name:' + p.name)
for teenName in teenNames.collect():
    print(teenName)

已經註冊了表的SparkSession執行的sql語句可用RDD的操作。

方式2:StructType指定模式

from pyspark.sql.types import *
sc = spark.sparkContext

lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l:l.split(','))
people = parts.map(lambda p:(p[0],p[1].strip()))

schema = StructType().add('name','string',True).add('age','int',True)

schemaPeople = spark.createDataFrame(people, schema)
schemaPeople.createOrReplaceTempView("people")

RDD轉換為DataFrame沒有用Row時未識別模式,通過StructType()用新增add()方法加入StructField('列名','資料型別',是否允許null)建立表結構(即模式schema),註冊table時候StructType()物件作為第二個引數以createDataFrame加入SparkSession。

5.Todo