spark初探,官方文件
pyspark.sql
- pyspark.sql.SparkSession Main entry point for DataFrame and SQL functionality. SQL功能和DataFrame的主要入口
- pyspark.sql.DataFrame A distributed collection of data grouped into named columns. 分散式資料集合,感覺有點像pandas的DF
- pyspark.sql.Column A column expression in a DataFrame.
- pyspark.sql.Row A row of data in a DataFrame.
- pyspark.sql.GroupedData Aggregation methods, returned by DataFrame.groupBy(). 不知道這玩意啥用
- pyspark.sql.DataFrameNaFunctions Methods for handling missing data (null values). 缺失值的處理
- pyspark.sql.DataFrameStatFunctions Methods for statistics functionality. 統計功能
- pyspark.sql.functions List of built-in functions available for DataFrame. DataFrame可用的內建函式列表
- pyspark.sql.types List of data types available.資料型別的型別列表
- pyspark.sql.Window For working with window functions.用於處理視窗函式
SparkSession
spark程式設計 DataFrame and SQL的API
能夠用來建立DF,將DF註冊為表,並對其執行SQL操作,快取表,讀取parquet 檔案。SparkSession這樣建立
spark = SparkSession.builder \
... .master("local") \ #連線到spark主節點,local[4]四核心 , spark://master:7077 執行在獨立叢集
... .appName("Word Count") \
... .config("spark.some.config.option", "some-value") \
... .getOrCreate() #該方法先檢查全域性中是否存在sparksession,存在則返回該sparksession,否則新建立一個
或者
>>> from pyspark.conf import SparkConf
>>> conf = SparkConf().setMaster("local").setAppName(" test ")
>>> SparkSession.builder.config(conf=conf)
SparkSession.catalog
使用者可以建立、刪除、修改或查詢底層資料庫、表、函式等的介面
SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
從一個RDD, List 或者pandas.DataFrame建立一個DF
schema 列名,類似於columns,型別從資料中推斷,如果為None,則從資料推斷,資料必須為 RDD of Row, or namedtuple, or dict.
如果為 pyspark.sql.types.DataType or a datatype string必須與資料匹配,否則會丟擲異常,如果不是pyspark.sql.types.StructType,則會被包裝為pyspark.sql.types.StructType作為唯一欄位,欄位名將為value,
如果需要從data推斷schema, samplingRatio 確定抽取多少row來確定,預設取第一行
verifyschema,驗證每一行對模式的資料型別
SparkSession.newSession()
返回一個新的session,具有獨立的conf
SparkSession.range(start, end=None, step=1, numPartitions=None)
返回一個列名為id的range
spark.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]
SparkSession.read
讀入資料作為df
class pyspark.sql.DataFrame(jdf, sql_ctx)
agg(*exprs)
無分組的聚合整個df
>>> df.agg({"age": "max"}).collect()
[Row(max(age)=5)]
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(min(age)=2)]