1. 程式人生 > >用它做Python並行資料分析,隔壁程式猿都饞哭了

用它做Python並行資料分析,隔壁程式猿都饞哭了


有時候你在做 Python 資料分析的時候,可能會出現這麼個情況:用 Pandas 開啟一個超大型資料集,想得到一些度量(metrics),然後就尷尬地卡住了。


大家都知道,如果你處理大資料,手裡用的是 Pandas,有時要等上一小時才能得到一個 Series 的平均值,甚至都還沒呼叫 apply 函式。這還只是幾百萬行啊,如果是幾十億行,那最好還是用 Spark 之類的高階工具吧。


那麼就沒有好辦法了嗎?有的,就有這麼一個工具,能夠加速 Python 資料分析,既不需要你使用配置更高的硬體設施,也不必切換程式語言。當然,如果你的資料集超級超級大,它的最終作用也會有限,但比普通的 Python 擴充套件工具好多了。特別是如果你不用做大量的重建索引,那麼這個工具非常適合你。


領取福利加python程式語言學習QQ群 515267276


這個工具叫 Dask,資料科學家 Luciano Strika 專門試用了這個工具,並做了測試,發現 Dask 在做並行資料分析時,比常規 Pandas 快出許多倍。


什麼是Dask?


Dask 是一個開源專案,能提供 NumPy Arrays,Pandas Dataframes 和常規列表的抽象,允許你使用多核處理並行執行它們。


下面這段直接摘自教程:


Dask 提供模仿了 NumPy,列表和 Pandas 的高階 Array,Bag 以及 DataFrame 集合,但能夠在無法放入主記憶體的資料集上並行執行。對大型資料集來說,Dask 的高階集合是 NumPy 和 Pandas 的替代方案。


聽起來真不錯!於是我(作者Luciano Strika)決定親自試試 Dask Dataframes,並對它們進行了幾個基準測試。


 


閱讀文件


我首先閱讀了官方文件,看看建議我們使用 Dask 做哪些工作。以下是官方文件(docs.dask.org/en/latest)中的相關部分:


操縱大型資料集,即使這些資料集無法放入記憶體


使用許多核來加速長計算


使用標準Pandas操作(如 groupby,join 和時間序列計算)對大型資料集進行分散式計算


然後在下面,它列出了一些如果使用 Dask Dataframes 會快速完成的事情:


算術運算(乘以或新增到Series)


常見聚合(平均值,最小值,最大值,求和等)


呼叫 apply(只要它在索引中,也就是說,不是在 groupby('y')之後'y'不是索引)


呼叫 value_counts(),drop_duplicates()或corr()


使用 loc,isin 和行式選擇進行過濾



 

#returns only the rows where x is >5, by reference (writing on them alters original df) df2 = df.loc[df['x'] > 5] #returns only the rows where x is 0,1,2,3 or 4, by reference df3 = df.x.isin(range(4)) #returns only the rows where x is >5, by read-only reference (can't be written on) df4 = df[df['x']>5]


 


如何使用 Dask Dataframes


Dask Dataframes 與 Pandas Dataframes 具有相同的 API,只是聚合和 apply 函式延遲執行,並且需要通過呼叫 compute 方法來計算。要想生成 Dask Dataframe,可以像在 Pandas 中一樣呼叫 read_csv 方法,或者,如果給出 Pandas Dataframe df,只需呼叫



 

dd = ddf.from_pandas(df, npartitions=N)


其中 ddf 是你匯入 Dask Dataframes 的名稱,而 npartitions 是一個引數,告訴 Dataframe 如何對其進行分割槽。


根據 StackOverflow 上的說法,建議將 Dataframe 劃分為與計算機核數數量相同的分割槽,或者是該數量的幾倍,因為每個分割槽將在不同的執行緒上執行。如果分割槽過多,它們之間的通訊代價會高很多。


 


動手吧:做點基準測試


我寫了一個 Jupyter notebook 試用這個框架,在 GitHub 上能看到,可以自己執行一下:github.com/StrikingLoo/


我執行的基準測試可以在 Github 上的 Jupyter notebook 中找到,主要有以下要點:


領取福利加python程式語言學習QQ群 515267276

def get_big_mean():    return dfn.salary.mean().compute() def get_big_mean_old():    return df3.salary.mean() def get_big_max():    return dfn.salary.max().compute() def get_big_max_old():    return df3.salary.max() def get_big_sum():    return dfn.salary.sum().compute() def get_big_sum_old():    return df3.salary.sum() def filter_df():    df = dfn[dfn['salary']>5000] def filter_df_old(): df = df3[df3['salary']>5000]


這裡df3是一個常規的 Pandas Dataframe,擁有 2500 萬行,使用這段指令碼生成,其中列是名稱,姓氏和薪水,從列表中隨機抽樣。我用了一個有 50 行的資料集並連線了 500000 次,因為我對分析本身並不太感興趣,但執行它時才會。


dfn 就是基於 df3 的 Dask Dataframe。


 


第一批結果:不太樂觀


我首先嚐試使用 3 個分割槽進行測試,因為我的電腦只有 4 個核心,不想過度使用它。這次使用 Dask 的結果非常差,而且還要等很久才能得到結果,不過我懷疑這可能是分割槽過少的原因:



 

204.313940048 seconds for get_big_mean 39.7543280125 seconds for get_big_mean_old 131.600986004 seconds for get_big_max 43.7621600628 seconds for get_big_max_old 120.027213097 seconds for get_big_sum 7.49701309204 seconds for get_big_sum_old 0.581165790558 seconds for filter_df 226.700095892 seconds for filter_df_old


可以看到在我使用 Dask 時大多數操作變慢了很多。這給了我一些啟示,可能必須使用更多分割槽才行。產生延遲計算所花的成本也可以忽略不計(在某些情況下不到半秒),所以如果重複使用它,成本不會隨著時間推移而攤銷。


我還用 apply 方法嘗試了這個測試:



 

def f(x):    return (13*x+5)%7 def apply_random_old():    df3['random']= df3['salary'].apply(f) def apply_random(): dfn['random']= dfn['salary'].apply(f).compute()


並有非常相似的結果:



 

369.541605949 seconds for apply_random 157.643756866 seconds for apply_random_old


因此,一般來說,大多數操作的速度都是初始操作的兩倍,儘管過濾器的速度要快得多。我覺得或許應該在上面呼叫 compute,所以對這個結果持保留態度。


 


更多分割槽:加速驚人


得到前面這些不盡人意的結果之後,我決定我可能只是沒有使用足夠的分割槽。畢竟,整件事情的重點是並行執行,所以或許我只需進一步並行化?所以我嘗試了 8 個分割槽的相同測試,得到了如下結果(省略了非並行資料幀的結果,因為它們基本相同):



 

3.08352184296 seconds for get_big_mean 1.3314101696 seconds for get_big_max 1.21639800072 seconds for get_big_sum 0.228978157043 seconds for filter_df 112.135010004 seconds for apply_random 50.2007009983 seconds for value_count_test


這就對了!大多數操作的執行速度比常規 Dataframe 快十倍,甚至 apply 的執行速度也更快了!我還運行了 value_count 測試,它只調用“薪水”Series 上的 value_count 方法。對於上下文,在我足足等 10 分鐘後在常規 Dataframe 上執行此測試時,必須終止該過程。這次只用了 50 秒!


所以之前都沒用對工具,它的速度可比常規 Dataframe 快多了。


 


最後再說一點


鑑於我是在一臺相當舊的 4 核 PC 上運行了 2500 萬行資料,所以是相當了不起的。所以我的建議是,下回你必須在本地或從單個 AWS 例項上理資料集時,一定要試試 Dask 這個框架。執行速度簡直不要太快。


關注微信公眾號:程式設計師交流互動平臺!獲取資料學習!