1. 程式人生 > >pandas apply 函式 多程序實現

pandas apply 函式 多程序實現

@creat_data: 2017-05-08
@author: huangyongye

前言: 在進行資料處理的時候,我們經常會用到 pandas 。但是 pandas 本身好像並沒有提供多程序的機制。本文將介紹如何來自己實現 pandas (apply 函式)的多程序執行。其中,我們主要藉助 joblib 庫,這個庫為python 提供了一個非常簡潔方便的多程序實現方法。

所以,本文將按照下面的安排展開,前面可能比較囉嗦,若只是想知道怎麼用可直接看第三部分:
- 首先簡單介紹 pandas 中的分組聚合操作 groupby
- 然後簡單介紹 joblib 的使用方法。
- 最後,通過一個去停用詞的實驗詳細介紹如何實現 pandas 中 apply 函式多程序執行。

注意:本文說的都是多程序不是多執行緒

1. DataFrame.groupby 分組聚合操作

# groupby 操作
df1 = pd.DataFrame({'a':[1,2,1,2,1,2], 'b':[3,3,3,4,4,4], 'data':[12,13,11,8,10,3]})
df1

這裡寫圖片描述

按照某列分組

grouped = df1.groupby('b')
# 按照 'b' 這列分組了,name 為 'b' 的 key 值,group 為對應的df_group
for name, group in grouped:
    print name, '->'
print group
3 ->
   a  b  data
0  1  3    12
1  2  3    13
2  1  3    11
4 ->
   a  b  data
3  2  4     8
4  1  4    10
5  2  4     3

按照多列分組

grouped = df1.groupby(['a','b'])
# 按照 'b' 這列分組了,name 為 'b' 的 key 值,group 為對應的df_group
for name, group in grouped:
    print name, '->'
    print
group
(1, 3) ->
   a  b  data
0  1  3    12
2  1  3    11
(1, 4) ->
   a  b  data
4  1  4    10
(2, 3) ->
   a  b  data
1  2  3    13
(2, 4) ->
   a  b  data
3  2  4     8
5  2  4     3

若 df.index 為[1,2,3…]這樣一個 list, 那麼按照 df.index分組,其實就是每組就是一行,在後面去停用詞實驗中,我們就用這個方法把 df_all 處理成每行為一個元素的 list, 再用多程序處理這個 list。

grouped = df1.groupby(df1.index)
# 按照 index 分組,其實每行就是一個組了
print len(grouped), type(grouped)
for name, group in grouped:
    print name, '->'
    print group
6 <class 'pandas.core.groupby.DataFrameGroupBy'>
0 ->
   a  b  data
0  1  3    12
1 ->
   a  b  data
1  2  3    13
2 ->
   a  b  data
2  1  3    11
3 ->
   a  b  data
3  2  4     8
4 ->
   a  b  data
4  1  4    10
5 ->
   a  b  data
5  2  4     3

2. joblib 用法

# 1. Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly:
from joblib import Parallel, delayed
from math import sqrt

處理小任務的時候,多程序並沒有體現出優勢。

%time result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))
%time result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
CPU times: user 316 ms, sys: 0 ns, total: 316 ms
Wall time: 309 ms
CPU times: user 692 ms, sys: 384 ms, total: 1.08 s
Wall time: 1.03 s

當需要處理大量資料的時候,並行處理就體現出了它的優勢

%time result = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(1000000))
CPU times: user 3min 43s, sys: 5.66 s, total: 3min 49s
Wall time: 3min 33s
%time result = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(1000000))
CPU times: user 50.9 s, sys: 12.6 s, total: 1min 3s
Wall time: 52 s

3. apply 函式的多程序執行(去停用詞)


上圖中,我們要把 AbstractText 去停用詞, 處理成 AbstractText1 那樣。首先,匯入停用詞表。

# 讀入所有停用詞
with open('stopwords.txt', 'rb') as inp:
    lines = inp.read()
stopwords = re.findall('"(.*?)"', lines)
print len(stopwords)
print stopwords[:10]
692
['a', "a's", 'able', 'about', 'above', 'according', 'accordingly', 'across', 'actually', 'after']
# 對 AbstractText 去停用詞
# 方法一:暴力法,對每個詞進行判斷
def remove_stopwords1(text):
    words = text.split(' ')
    new_words = list()
    for word in words:
        if word not in stopwords:
            new_words.append(word)
    return new_words

# 方法二:先構建停用詞的對映
for word in stopwords:
    if word in words_count.index:
        words_count[word] = -1

def remove_stopwords2(text):
    words = text.split(' ')
    new_words = list()
    for word in words:
        if words_count[word] != -1:
            new_words.append(word)
    return new_words

%time df_all['AbstractText1'] = df_all['AbstractText'].apply(remove_stopwords1)
%time df_all['AbstractText2'] = df_all['AbstractText'].apply(remove_stopwords2)
CPU times: user 8min 56s, sys: 2.72 s, total: 8min 59s
Wall time: 8min 48s
CPU times: user 1min 2s, sys: 4.12 s, total: 1min 6s
Wall time: 1min 2s

上面我嘗試了兩種不同的方法來去停用詞:

方法一中使用了比較粗暴的方法:首先用一個 list 儲存所有的 stopwords,然後對於每一個 text 中的每一個 word,我們判斷它是否出現在 stopwords 的list中(複雜度 O(n) ), 若為 stopword 則去掉。

方法二中我用 一個Series(words_count) 對所有的詞進行對映,如果該詞為 stopword, 則把它的值修改為 -1。這樣,對於 text 中的每個詞 w, 我們只需要判斷它的值是否為 -1 即可判定是否為 stopword (複雜度 O(1))。

所以,在這兩個方法中,我們都是採用單程序來執行,方法二的速度(1min 2s)明顯高於方法一(8min 48s)。

from joblib import Parallel, delayed
import multiprocessing


# 方法三:對方法一使用多程序
def tmp_func(df):
    df['AbstractText3'] = df['AbstractText'].apply(remove_stopwords1)
    return df

def  apply_parallel(df_grouped, func):
    """利用 Parallel 和 delayed 函式實現並行運算"""
    results = Parallel(n_jobs=-1)(delayed(func)(group) for name, group in df_grouped)
    return pd.concat(results)

if __name__ == '__main__':
    time0 = time.time()
    df_grouped = df_all.groupby(df_all.index)
    df_all =applyParallel(df_grouped, tmp_func)
    print 'time costed {0:.2f}'.format(time.time() - time0)
time costed 150.81
# 方法四:對方法二使用多程序
def tmp_func(df):
    df['AbstractText3'] = df['AbstractText'].apply(remove_stopwords2)
    return df

def  apply_parallel(df_grouped, func):
    """利用 Parallel 和 delayed 函式實現並行運算"""
    results = Parallel(n_jobs=-1)(delayed(func)(group) for name, group in df_grouped)
    return pd.concat(results)

if __name__ == '__main__':
    time0 = time.time()
    df_grouped = df_all.groupby(df_all.index)
    df_all =applyParallel(df_grouped, tmp_func)
    print 'time costed {0:.2f}'.format(time.time() - time0)
time costed 123.80

上面方法三和方法四分別對應於前面方法一和方法二,但是都是用了多程序操作。結果是方法一使用多程序以後,速度一下子提高了好幾倍,但是方法二的多程序速度不升反降。這是不是有問題?的確,但是首先可以肯定,我們的程式碼沒有問題。下圖顯示了我用 top 命令看到各個方法的程序執行情況。可以看出,在方法三和方法四中,的的確確是 12 個CPU核都跑起來了。只是在方法四中,每個核佔用的比例都是比較低的。


fig1. 單程序 cpu 使用情況
fig2. 方法三 cpu 使用情況 fig3. 方法四 cpu 使用情況

一個直觀的解釋就是,當我們開啟多程序的時候,程序開啟和最後結果合併,程序結束,這些操作都是要消耗時間的。如果我們執行的任務比較小,那麼程序開啟等操作所消耗的時間可能就要比執行任務本身消耗的時間還多。這樣就會出現多程序的方法四比單程序的方法二耗時更多的情況了。

所以總結來說,在處理小任務的時候沒有必要開啟多程序。藉助joblib (Parallel, delayed 兩個函式) ,我們能夠很方便地實現 python 多程序。