1. 程式人生 > >《Spark Python API 官方文檔中文版》 之 pyspark.sql (一)

《Spark Python API 官方文檔中文版》 之 pyspark.sql (一)

開始 clear sorted 緩存 news 數據 sch json 大數

摘要:在Spark開發中,由於需要用Python實現,發現API與Scala的略有不同,而Python API的中文資料相對很少。每次去查英文版API的說明相對比較慢,還是中文版比較容易get到所需,所以利用閑暇之余將官方文檔翻譯為中文版,並親測Demo的代碼。在此記錄一下,希望對那些對Spark感興趣和從事大數據開發的人員提供有價值的中文資料,對PySpark開發人員的工作和學習有所幫助。

官網地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

pyspark.sql module

Module Context

Spark SQL和DataFrames重要的類有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 將分布式數據集分組到指定列名的數據框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame數據的行
pyspark.sql.HiveContext 訪問Hive數據的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()創建的聚合方法集
pyspark.sql.DataFrameNaFunctions 處理丟失數據(空數據)的方法

pyspark.sql.DataFrameStatFunctions 統計功能的方法
pyspark.sql.functions DataFrame可用的內置函數
pyspark.sql.types 可用的數據類型列表
pyspark.sql.Window 用於處理窗口函數

1.class pyspark.sql.SQLContext(sparkContext, sqlContext=None)

SQLContext可以用來創建DataFrame、註冊DataFrame為表、在表上執行SQL、緩存表、讀取parquet文件。

參數:● sparkContext - 支持sqlcontext的sparkcontext

sqlContext - 一個可選的JVM Scala sqlcontext,若設置,我們不需要在JVM實例化一個新的sqlcontext,而是都調用這個對象。

1.1 applySchema(rdd, schema)

註:在1.3中已過時,使用createDataFrame()代替。

1.2 cacheTable(tableName)

緩存表到內存中

1.3 clearCache()

從內存緩存刪除所有緩存表。

1.4 createDataFrame(data, schema=None, samplingRatio=None)

從元組/列表RDD或列表或pandas.DataFrame創建DataFrame
當模式是列名的列表時,每個列的類型會從數據中推斷出來。
當模式沒有時,將嘗試從數據中推斷模式(列名和類型),數據應該是行或命名元組或字典的RDD。
如果模式推理是必要的,samplingRatio用來確定用於模式推理的行比率。如果沒有samplingratio,將使用第一行。

參數:● data - 行或元組或列表或字典的RDD、list、pandas.DataFrame.
   ● schema – 一個結構化類型或者列名列表,默認是空。

samplingRatio – 用於推斷的行的樣本比率。
返回: DataFrame

>>> l=[(Alice,1)]
>>> sqlContext.createDataFrame(l).collect()
[Row(_1=uAlice, _2=1)]
>>> sqlContext.createDataFrame(l,[name,age]).collect()
[Row(name=uAlice, age=1)]
>>> d=[{name:Alice,age:1}]
>>> sqlContext.createDataFrame(d).collect()
[Row(age=1, name=uAlice)]
>>> rdd=sc.parallelize(l)
>>> sqlContext.createDataFrame(rdd).collect()
[Row(_1=uAlice, _2=1)]
>>> df=sqlContext.createDataFrame(rdd,[name,age])
>>> df.collect()
[Row(name=uAlice, age=1)]
>>> sqlContext.createDataFrame(df.toPandas()).collect()  
[Row(name=uAlice, age=1)]
>>> sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()  
[Row(0=1, 1=2)]

1.5 createExternalTable(tableName, path=None, source=None, schema=None, **options)

創建基於數據源中的數據的外部表.
返回與外部表關聯的DataFrame
數據源由源和一組選項指定。如果未指定源,那麽將使用由spark.sql.sources.default 配置的默認的數據源配置。
通常,一個模式可以被提供作為返回的DataFrame的模式,然後創建外部表。
返回: DataFrame

1.6 dropTempTable(tableName)

從目錄中刪除臨時表

>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> sqlContext.dropTempTable("table1")

1.7 getConf(key, defaultValue)

返回指定鍵的Spark SQL配置屬性值。
如果鍵沒有指定返回默認值。

1.8 inferSchema(rdd, samplingRatio=None)

註:在1.3中已過時,使用createDataFrame()代替。

1.9 jsonFile(path, schema=None, samplingRatio=1.0)

從一個文本文件中加載數據,這個文件的每一行均為JSON字符串。
註:在1.4中已過時,使用DataFrameReader.json()代替。

1.10 jsonRDD(rdd, schema=None, samplingRatio=1.0)

從一個已經存在的RDD中加載數據,這個RDD中的每一個元素均為一個JSON字符串。
如果提供了模式,將給定的模式應用到這個JSON數據集。否則,它根據數據集的采樣比例來確定模式。

>>> json=sc.parallelize(["""{"name":"jack","addr":{"city":"beijing","mail":"10001"}}""","""{"name":"john","addr":{"city":"shanghai","mail":"10002"}}"""])
>>> df1 = sqlContext.jsonRDD(json)
>>> df1.collect()
[Row(addr=Row(city=ubeijing, mail=u10001), name=ujack), Row(addr=Row(city=ushanghai, mail=u10002), name=ujohn)]
>>> df2 = sqlContext.jsonRDD(json,df1.schema)
>>> df2.collect()
[Row(addr=Row(city=ubeijing, mail=u10001), name=ujack), Row(addr=Row(city=ushanghai, mail=u10002), name=ujohn)]

1.11 load(path=None, source=None, schema=None, **options)

返回數據源中的數據集為DataFrame.
註:在1.4中已過時,使用DataFrameReader.load()代替。

1.12 newSession()

返回一個新的SQLContext做為一個新的會話,這個會話有單獨的SQLConf,註冊臨時表和UDFs,但共享sparkcontext和緩存表。

1.13 parquetFile(*paths)

加載Parquet文件,返回結果為DataFrame
註:在1.4中已過時,使用DataFrameReader.parquet()代替。

1.14 range(start, end=None, step=1, numPartitions=None)

創建只有一個名為id的長類型的列的DataFrame,包含從開始到結束的按照一定步長的獨立元素。

參數:● start - 開始值
   ● end - 結束值
step - 增量值(默認:1)
   ● numPartitions – DataFrame分區數

返回: DataFrame

>>> sqlContext.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]

如果僅有一個參數,那麽這個參數被作為結束值。

>>> sqlContext.range(3).collect()
[Row(id=0), Row(id=1), Row(id=2)]

1.15 read

返回一個DataFrameReader,可用於讀取數據為DataFrame。

1.16 registerDataFrameAsTable(df, tableName)

註冊給定的DataFrame作為目錄中的臨時表。
臨時表只在當前SQLContext實例有效期間存在。

>>> sqlContext.registerDataFrameAsTable(df, "table1")

1.17 registerFunction(name, f, returnType=StringType)

註冊python方法(包括lambda方法),作為UDF,這樣可以在 SQL statements中使用。
除了名稱和函數本身之外,還可以選擇性地指定返回類型。當返回類型沒有指定時,默認自動轉換為字符串。對於任何其他返回類型,所生成的對象必須與指定的類型匹配。
參數:● name - UDF名稱
   ● f – python方法
   ● 返回類型 數據類型對象

>>> sqlContext.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlContext.sql("SELECT stringLengthString(‘test‘)").collect()
[Row(_c0=u4)]
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt(‘test‘)").collect()
[Row(_c0=4)]
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt(‘test‘)").collect()
[Row(_c0=4)]

1.18 setConf(key, value)

設置給定的Spark SQL配置屬性

1.19 sql(sqlQuery)

返回DataFrame代表給定查詢的結果
參數:● sqlQuery - sql語句
返回: DataFrame

>>> l=[(1,row1),(2,row2),(3,row3)]
>>> df = sqlContext.createDataFrame(l,[field1,field2])
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=urow1), Row(f1=2, f2=urow2), Row(f1=3, f2=urow3)]

1.20 table(tableName)

返回指定的表為DataFrame
返回: DataFrame

>>> l=[(1,row1),(2,row2),(3,row3)]
>>> df = sqlContext.createDataFrame(l,[field1,field2])
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True

1.21 tableNames(dbName=None)

返回數據庫的表名稱列表
參數dbName – 字符串類型的數據庫名稱.默認為當前的數據庫。
返回: 字符串類型的表名稱列表

>>> l=[(1,row1),(2,row2),(3,row3)]
>>> df = sqlContext.createDataFrame(l,[field1,field2])
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
True
>>> "table1" in sqlContext.tableNames("db")
True

1.22 tables(dbName=None)

返回一個包含表名稱的DataFrame從給定的數據庫。
如果數據庫名沒有指定,將使用當前的數據庫。
返回的DataFrame包含兩列: 表名稱和是否臨時表 (一個Bool類型的列,標識表是否為臨時表)。

參數:● dbName – 字符串類型的使用的數據庫名
返回: DataFrame

>>> l=[(1,row1),(2,row2),(3,row3)]
>>> df = sqlContext.createDataFrame(l,[field1,field2])
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = ‘table1‘").first()
Row(tableName=utable1, isTemporary=True)

1.23 udf

返回一個註冊的UDF為UDFRegistration。
返回: UDFRegistration

1.24 uncacheTable(tableName)

從內存的緩存表中移除指定的表。

2.class pyspark.sql.HiveContext(sparkContext, hiveContext=None)

Hive此處暫略

《Spark Python API 官方文檔中文版》 之 pyspark.sql (一)