1. 程式人生 > >Spark與Pandas中DataFrame對比(詳細)

Spark與Pandas中DataFrame對比(詳細)

Pandas Spark
工作方式 單機single machine tool,沒有並行機制parallelism
不支援Hadoop,處理大量資料有瓶頸
分散式平行計算框架,內建並行機制parallelism,所有的資料和操作自動並行分佈在各個叢集結點上。以處理in-memory資料的方式處理distributed資料。
支援Hadoop,能處理大量資料
延遲機制 not lazy-evaluated lazy-evaluated
記憶體快取 單機快取 persist() or cache()將轉換的RDDs儲存在記憶體
DataFrame可變性 Pandas中DataFrame是可變的 Spark中RDDs是不可變的,因此DataFrame也是不可變的
建立 從spark_df轉換:pandas_df = spark_df.toPandas() 從pandas_df轉換:spark_df = SQLContext.createDataFrame(pandas_df)
另外,createDataFrame支援從list轉換spark_df,其中list元素可以為tuple,dict,rdd
list,dict,ndarray轉換 已有的RDDs轉換
CSV資料集讀取 結構化資料檔案讀取
HDF5讀取 JSON資料集讀取
EXCEL讀取 Hive表讀取
外部資料庫讀取
index索引 自動建立 沒有index索引,若需要需要額外建立該列
行結構 Series結構,屬於Pandas DataFrame結構 Row結構,屬於Spark DataFrame結構
列結構 Series結構,屬於Pandas DataFrame結構 Column結構,屬於Spark DataFrame結構,如:DataFrame[name: string]
列名稱 不允許重名 允許重名
修改列名採用alias方法
列新增 df[“xx”] = 0 df.withColumn(“xx”, 0).show() 會報錯
from pyspark.sql import functions
df.withColumn(“xx”, functions.lit(0)).show()
列修改 原來有df[“xx”]列,df[“xx”] = 1 原來有df[“xx”]列,df.withColumn(“xx”, 1).show()
顯示 df 不輸出具體內容,輸出具體內容用show方法
輸出形式:DataFrame[age: bigint, name: string]
df 輸出具體內容 df.show() 輸出具體內容
沒有樹結構輸出形式 以樹的形式列印概要:df.printSchema()
df.collect()
排序 df.sort_index() 按軸進行排序
df.sort() 在列中按值進行排序 df.sort() 在列中按值進行排序
選擇或切片 df.name 輸出具體內容 df[] 不輸出具體內容,輸出具體內容用show方法
df[“name”] 不輸出具體內容,輸出具體內容用show方法
df[] 輸出具體內容,
df[“name”] 輸出具體內容
df.select() 選擇一列或多列
df.select(“name”)
切片 df.select(df[‘name’], df[‘age’]+1)
df[0]
df.ix[0]
df.first()
df.head(2) df.head(2)或者df.take(2)
df.tail(2)
切片 df.ix[:3]或者df.ix[:”xx”]或者df[:”xx”]
df.loc[] 通過標籤進行選擇
df.iloc[] 通過位置進行選擇
過濾 df[df[‘age’]>21] df.filter(df[‘age’]>21) 或者 df.where(df[‘age’]>21)
整合 df.groupby(“age”)
df.groupby(“A”).avg(“B”)
df.groupBy(“age”)
df.groupBy(“A”).avg(“B”).show() 應用單個函式
from pyspark.sql import functions
df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() 應用多個函式
統計 df.count() 輸出每一列的非空行數 df.count() 輸出總行數
df.describe() 描述某些列的count, mean, std, min, 25%, 50%, 75%, max df.describe() 描述某些列的count, mean, stddev, min, max
合併 Pandas下有concat方法,支援軸向合併
Pandas下有merge方法,支援多列合併
同名列自動新增字尾,對應鍵僅保留一份副本
Spark下有join方法即df.join()
同名列不自動新增字尾,只有鍵值完全匹配才保留一份副本
df.join() 支援多列合併
df.append() 支援多行合併
缺失資料處理 對缺失資料自動新增NaNs 不自動新增NaNs,且不丟擲錯誤
fillna函式:df.fillna() fillna函式:df.na.fill()
dropna函式:df.dropna() dropna函式:df.na.drop()
SQL語句 import sqlite3
pd.read_sql(“SELECT name, age FROM people WHERE age >= 13 AND age <= 19″)
表格註冊:把DataFrame結構註冊成SQL語句使用型別
df.registerTempTable(“people”) 或者 sqlContext.registerDataFrameAsTable(df, “people”)
sqlContext.sql(“SELECT name, age FROM people WHERE age >= 13 AND age <= 19″)
功能註冊:把函式註冊成SQL語句使用型別
sqlContext.registerFunction(“stringLengthString”, lambda x: len(x))
sqlContext.sql(“SELECT stringLengthString(‘test’)”)
兩者互相轉換 pandas_df = spark_df.toPandas() spark_df = sqlContext.createDataFrame(pandas_df)
函式應用 df.apply(f)將df的每一列應用函式f df.foreach(f) 或者 df.rdd.foreach(f) 將df的每一列應用函式f
df.foreachPartition(f) 或者 df.rdd.foreachPartition(f) 將df的每一塊應用函式f
map-reduce操作 map(func, list),reduce(func, list) 返回型別seq df.map(func),df.reduce(func) 返回型別seqRDDs
diff操作 有diff操作,處理時間序列資料(Pandas會對比當前行與上一行) 沒有diff操作(Spark的上下行是相互獨立,分散式儲存的)

原文連結:http://www.lining0806.com/spark%E4%B8%8Epandas%E4%B8%ADdataframe%E5%AF%B9%E6%AF%94/