1. 程式人生 > >spark初探,官方文件

spark初探,官方文件

pyspark.sql

  • pyspark.sql.SparkSession Main entry point for DataFrame and SQL functionality. SQL功能和DataFrame的主要入口
  • pyspark.sql.DataFrame A distributed collection of data grouped into named columns. 分散式資料集合,感覺有點像pandas的DF
  • pyspark.sql.Column A column expression in a DataFrame.
  • pyspark.sql.Row A row of data in a DataFrame.
  • pyspark.sql.GroupedData Aggregation methods, returned by DataFrame.groupBy(). 不知道這玩意啥用
  • pyspark.sql.DataFrameNaFunctions Methods for handling missing data (null values). 缺失值的處理
  • pyspark.sql.DataFrameStatFunctions Methods for statistics functionality. 統計功能
  • pyspark.sql.functions List of built-in functions available for DataFrame. DataFrame可用的內建函式列表
  • pyspark.sql.types List of data types available.資料型別的型別列表
  • pyspark.sql.Window For working with window functions.用於處理視窗函式

SparkSession

spark程式設計 DataFrame and SQL的API
能夠用來建立DF,將DF註冊為表,並對其執行SQL操作,快取表,讀取parquet 檔案。SparkSession這樣建立

spark = SparkSession.builder \
...     .master("local") \ #連線到spark主節點,local[4]四核心 ,     spark://master:7077 執行在獨立叢集
... .appName("Word Count") \ ... .config("spark.some.config.option", "some-value") \ ... .getOrCreate() #該方法先檢查全域性中是否存在sparksession,存在則返回該sparksession,否則新建立一個

或者

>>> from pyspark.conf import SparkConf
>>> conf = SparkConf().setMaster("local").setAppName(" test ")
>>> SparkSession.builder.config(conf=conf)

SparkSession.catalog
使用者可以建立、刪除、修改或查詢底層資料庫、表、函式等的介面

SparkSession.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
從一個RDD, List 或者pandas.DataFrame建立一個DF
schema 列名,類似於columns,型別從資料中推斷,如果為None,則從資料推斷,資料必須為 RDD of Row, or namedtuple, or dict.
如果為 pyspark.sql.types.DataType or a datatype string必須與資料匹配,否則會丟擲異常,如果不是pyspark.sql.types.StructType,則會被包裝為pyspark.sql.types.StructType作為唯一欄位,欄位名將為value,
如果需要從data推斷schema, samplingRatio 確定抽取多少row來確定,預設取第一行
verifyschema,驗證每一行對模式的資料型別

SparkSession.newSession()
返回一個新的session,具有獨立的conf

SparkSession.range(start, end=None, step=1, numPartitions=None)
返回一個列名為id的range
spark.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]

SparkSession.read
讀入資料作為df

class pyspark.sql.DataFrame(jdf, sql_ctx)

agg(*exprs)
無分組的聚合整個df

>>> df.agg({"age": "max"}).collect()
[Row(max(age)=5)]
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(min(age)=2)]