PySpark 通過Arrow加速
前言
PySpark是Spark 實現 Unify BigData && Machine Learning目標的基石之一。通過PySpark,我們可以用Python在一個腳本里完成資料載入,處理,訓練,預測等完整Pipeline,加上DB良好的notebook的支援,資料科學家們會覺得非常開心。當然缺點也是有的,就是帶來了比較大的效能損耗。
效能損耗點分析
如果使用PySpark,大概處理流程是這樣的(注意,這些都是對使用者透明的)
- python通過socket呼叫Spark API(py4j完成),一些計算邏輯,python會在呼叫時將其序列化,一併傳送給Spark。
- Spark 觸發計算,比如載入資料,然後把資料轉成內部儲存格式InternalRow,接著啟動Python Deamon, Python Deamon再啟動多個Worker,
- 資料通過socket協議傳送給Python Worker(不跨網路),期間需要將InternalRow轉化為Java物件,然後再用Java Pickle進行序列化(一次),這個時候才能通過網路傳送給Worker
- Worker接收後,一條一條反序列化(python pickle,兩次),然後轉化為Python物件進行處理。拿到前面序列化好的函式反序列化,接著用這個函式對這些資料處理,處理完成後,再用pickle進行序列化(三次),傳送給Java Executor.
- Java Executor獲取資料後,需要反序列化(四次),然後轉化為InternalRow繼續進行處理。
所以可以看到,前後需要四次編碼/解碼動作。序列化反序列化耗時應該佔用額外耗時的70%左右。我們說,有的時候把序列化框架設定為Kyro之後,速度明顯快了很多,可見序列化的額外耗時是非常明顯的。
前面是一個點,第二個點是,資料是按行進行處理的,一條一條,顯然效能不好。
第三個點是,Socket協議通訊其實還是很快的,而且不跨網路,只要能克服前面兩個問題,那麼效能就會得到很大的提升。 另外可以跟大家說的是,Python如果使用一些C庫的擴充套件,比如Numpy,本身也是非常快的。
如何開啟Arrow進行加速,以及背後原理
開啟方式很簡單,啟動時加上一個配置即可:
if __name__ == '__main__': conf = SparkConf() conf.set("spark.sql.execution.arrow.enabled", "true")
你也可以在submit命令列裡新增。
那麼Arrow是如何加快速度的呢?主要是有兩點:
- 序列化友好
- 向量化
序列化友好指的是,Arrow提供了一個記憶體格式,該格式本身是跨應用的,無論你放到哪,都是這個格式,中間如果需要網路傳輸這個格式,那麼也是序列化友好的,只要做下格式調整(不是序列化)就可以將資料傳送到另外一個應用裡。這樣就大大的降低了序列化開銷。
向量化指的是,首先Arrow是將資料按block進行傳輸的,其次是可以對立面的資料按列進行處理的。這樣就極大的加快了處理速度。
實測效果
為了方便測試,我定義了一個基類:
from pyspark import SQLContext from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession import os os.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3" class _SparkBase(object): @classmethod def start(cls, conf=SparkConf()): cls.sc = SparkContext(master='local[*]', appName=cls.__name__, conf=conf) cls.sql = SQLContext(cls.sc) cls.session = SparkSession.builder.getOrCreate() cls.dataDir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest" @classmethod def shutdown(cls): cls.session.stop() cls.session = None cls.sc.stop() cls.sc = None
接著提供了一個性能測試輔助類:
import time from functools import wraps import logging logger = logging.getLogger(__name__) PROF_DATA = {} def profile(fn): @wraps(fn) def with_profiling(*args, **kwargs): start_time = time.time() ret = fn(*args, **kwargs) elapsed_time = time.time() - start_time if fn.__name__ not in PROF_DATA: PROF_DATA[fn.__name__] = [0, []] PROF_DATA[fn.__name__][0] += 1 PROF_DATA[fn.__name__][1].append(elapsed_time) return ret return with_profiling def print_prof_data(clear): for fname, data in PROF_DATA.items(): max_time = max(data[1]) avg_time = sum(data[1]) / len(data[1]) logger.warn("Function %s called %d times. " % (fname, data[0])) logger.warn('Execution time max: %.3f, average: %.3f' % (max_time, avg_time)) if clear: clear_prof_data() def clear_prof_data(): global PROF_DATA PROF_DATA = {}
很簡單,就是wrap一下實際的函式,然後進行時間計算。現在,我們寫一個PySpark的類:
import logging from random import Random import pyspark.sql.functions as F from pyspark import SparkConf from pyspark.sql.types import * from example.allwefantasy.base.spark_base import _SparkBase import example.allwefantasy.time_profile as TimeProfile import pandas as pd logger = logging.getLogger(__name__) class PySparkOptimize(_SparkBase): def trick1(self): pass if __name__ == '__main__': conf = SparkConf() conf.set("spark.sql.execution.arrow.enabled", "true") PySparkOptimize.start(conf=conf) PySparkOptimize().trick1() PySparkOptimize.shutdown()
這樣骨架就搭建好了。
我們寫第一個方法,trick1,做一個簡單的計數:
def trick1(self): df = self.session.range(0, 1000000).select("id", F.rand(seed=10).alias("uniform"), F.randn(seed=27).alias("normal")) # 更少的記憶體和更快的速度 TimeProfile.profile(lambda: df.toPandas())() TimeProfile.print_prof_data(clear=True)
並且將前面的arrow設定為false.結果如下:
Function <lambda> called 1 times. Execution time max: 6.716, average: 6.716
然後同樣的程式碼,我們把arrow設定為true,是不是會好一些呢?
Function <lambda> called 1 times. Execution time max: 2.067, average: 2.067
當然我這個測試並不嚴謹,但是對於這種非常簡單的示例,提升還是有效三倍的,不是麼?而這,只是改個配置就可以達成了。
分組聚合使用Pandas處理
另外值得一提的是,PySpark是不支援自定義聚合函式的,現在如果是資料處理,可以把group by的小集合發給pandas處理,pandas再返回,比如
def trick7(self): df = self.session.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) @F.pandas_udf("id long", F.PandasUDFType.GROUPED_MAP) def normalize(pdf): v = pdf.v return pdf.assign(v=(v - v.mean()) / v.std())[["id"]] df.groupby("id").apply(normalize).show()
這裡是id進行gourp by ,這樣就得到一張id列都是1的小表,接著呢把這個小錶轉化為pandas dataframe處理,處理完成後,還是返回一張小表,表結構則在註解裡定義,比如只返回id欄位,id欄位是long型別。