1. 程式人生 > >pyspark中的自定義函式

pyspark中的自定義函式

由於目前的pyspark不支援dataset,因此只能用dataframe+udf或者rdd的方式來對列進行資料處理。這裡介紹一下udf,udf註冊有兩種方法,一種是呼叫register方法,一種是呼叫udf函式。兩者都能在withColumn和apply中使用。兩種方法的區別是:udf註冊後可以使用dataframe的api,而register註冊後可以使用spark sql。

1. Spark DF 和 Pandas DF

首先要區分spark dataframe和pandas dataframe。pandas df會將所有資料儲存在driver node上,一定要慎用。
spark df 與 pandas df 相互轉化效能優化,需要開啟配置,預設為關閉。

spark.sql.execution.arrow.enabled true
import numpy as np
import pandas as pd

//初始化 pandas DF
pdf = pd.DataFrame(np.random.rand(100000, 3))
// pdf -> sdf
%time df = spark.createDataFrame(pdf)
// sdf -> pdf
%time result_pdf = df.select("*").toPandas()

2. Spark UDF

UDF是使用者自定義函式的縮寫。下面展示一下幾種UDF使用方法:

from pyspark.sql.functions import udf
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))

# 使用register註冊的方法
def f1(v):
    return v+1
f1_udf = spark.udf.register('f1',f1)
df.withColumn('v2',f1_udf(df.v)).show() 

# 使用udf回撥的方法
@udf('double') #這裡也可以簡單的使用@udf
def f2(v):
    return v+1
df.withColumn('v2',f2(df.v)).show()

# 使用udf函式註冊
def f3(v):
    return v+1
f3_udf = udf(f3,'double')
df.withColumn('v2',f3_udf(df.v)).show()

# 使用register註冊的可以使用spark sql
df.createOrReplaceTempView("df")
spark.sql('select v,f1(v) v2 from df').show()

# 使用udf註冊的可以使用dataframe api
df.select(['v',f2('v').alias('v2')]).show()

3. Pandas UDF

Pandas UDF構建在 Apache Arrow 之上。Apache Arrow是一個跨平臺的在記憶體中以列式儲存的資料層,減少了大量java和python之間序列化和反序列化的工作。

from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))

#使用回撥函式@pandas_udf(schema,return type)來註冊
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").apply(normalize).show()

#使用pandas_udf函式直接註冊
def plus_one(a):
    return a + 1
plus_one_pd_udf = pandas_udf(plus_one, returnType=LongType())