1. 程式人生 > >(資料科學學習手札72)用pdpipe搭建pandas資料分析流水線

(資料科學學習手札72)用pdpipe搭建pandas資料分析流水線

本文對應指令碼及資料已上傳至我的Github倉庫https://github.com/CNFeffery/DataScienceStudyNotes

1 簡介

  在資料分析任務中,從原始資料讀入,到最後分析結果出爐,中間絕大部分時間都是在對資料進行一步又一步的加工規整,以流水線(pipeline)的方式完成此過程更有利於梳理分析脈絡,也更有利於查錯改正。pdpipe作為專門針對pandas進行流水線化改造的模組,為熟悉pandas的資料分析人員書寫優雅易讀的程式碼提供一種簡潔的思路,本文就將針對pdpipe的用法進行介紹。

2 pdpipe常用功能介紹

  pdpipe的出現極大地對資料分析過程進行規範,其主要擁有以下特性:

  • 簡潔的語法邏輯
  • 在流水線工作過程中可輸出規整的提示或錯誤警報資訊
  • 輕鬆串聯不同資料操作以組成一條完整流水線
  • 輕鬆處理多種型別資料
  • Python編寫,便於二次開發

  通過pip install pdpipe安裝完成,接下來我們將在jupyter lab中以TMDB 5000 Movie Dataset中的tmdb_5000_movies.csv資料集(圖1)為例來介紹pdpipe的主要功能,這是Kaggle上的公開資料集,記錄了一些電影的相關屬性資訊,你也可以在資料科學學習手札系列文章的Github倉庫對應本篇文章的路徑下直接獲取該資料集。

圖1 TMDB 5000 Movie Dataset資料集

2.1 從一個簡單的例子開始

  首先在jupyter lab中讀入tmdb_5000_movies.csv資料集並檢視其前3行(圖2):

import pandas as pd
import pdpipe

# 讀入tmdb_5000_movies.csv資料集並檢視前3行
data = pd.read_csv('tmdb_5000_movies.csv');data.head(3)
圖2

  可以看出,資料集包含了數值、日期、文字以及json等多種型別的資料,現在假設我們需要基於此資料完成以下流程:

1、刪除original_title列
2、對title列進行小寫化處理
3、丟掉vote_average小於等於7,且original_language不為en

的行
4、求得genres對應電影型別的數量儲存為新列genres_num,並刪除原有的genres列
5、丟掉genres_num小於等於5的行

  上述操作直接使用pandas並不會花多少時間,但是想要不創造任何中間臨時結果一步到位產生所需的資料框子集,並且保持程式碼的可讀性不是一件太容易的事,但是利用pdpipe,我們可以非常優雅地實現上述過程:

# 以pdp.PdPipeline傳入流程列表的方式建立pipeline
first_pipeline = pdp.PdPipeline([pdp.ColDrop("original_title"), 
                                 pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
                                 pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
                                 pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
                                 pdp.RowDrop({'genres_num': lambda x: x <= 5})])

# 將建立的pipeline直接作用於data直接得到所需結果,並列印流程資訊
first_pipeline(data, verbose=True).reset_index(drop=True)

  得到的結果如圖3所示:

圖3

  我們不僅保證了程式碼優雅簡潔,可讀性強,結果的一步到位,還自動打印出整個流水線運作過程的狀態說明!令人興奮的是pdpipe充分封裝了pandas的核心功能尤其是apply相關操作,使得常規或非常規的資料分析任務都可以利用pdpipe中的API結合自定義函式來優雅地完成,小小領略到pdpipe的妙處之後,下文我們來展開詳細介紹。

2.2 pdpipe中的重要子模組

  pdpipe中的API按照不同分工被劃分到若干子模組,下面將針對常用的幾類API展開介紹。

2.2.1 basic_stages

  basic_stages中包含了對資料框中的行、列進行丟棄/保留、重新命名以及重編碼的若干類:

ColDrop:
  這個類用於對指定單個或多個列進行丟棄,其主要引數如下:

  • columns:字串或列表,用於指定需要丟棄的列名
  • errors:字串,傳入'ignore'或'raise',用於指定丟棄指定列時遇到錯誤採取的應對策略,'ignore'表示忽略異常,'raise'表示丟擲錯誤打斷流水線運作,預設為'raise'

  下面是舉例演示(注意單個流水線部件可以直接傳入源資料執行apply方法直接得到結果),我們分別對單列和多列進行刪除操作:

  • 單列刪除
# 刪除budget列
pdp.ColDrop(columns='budget').apply(data).head(3)

  刪除後得到的結果如圖4:

圖4

  • 多列刪除
# 刪除budget之外的所有列

del_col = data.columns.tolist()
del_col.remove('budget')
pdp.ColDrop(columns=del_col).apply(data).head(3)

  得到的結果中只有budget列被保留,如圖5:

圖5


ColRename:
  這個類用於對指定列名進行重新命名,其主要引數如下:

  • rename_map:字典,傳入舊列名->新列名鍵值對

  下面是舉例演示:

  • 列重新命名
# 將budget重新命名為Budget
pdp.ColRename(rename_map={'budget': 'Budget'}).apply(data).head(3)

  結果如圖6:

圖6


ColReorder:
  這個類用於修改列的順序,其主要引數如下:

  • positions:字典,傳入列名->新的列下標鍵值對

  下面是舉例演示:

  • 修改列位置
# 將budget從第0列挪動為第3列
pdp.ColReorder(positions={'budget': 3}).apply(data).head(3)

  結果如圖7:

圖7


DropNa:
  這個類用於丟棄資料中空值元素,其主要引數與pandas中的dropna()保持一致,核心引數如下:

  • axis:0或1,0表示刪除含有缺失值的行,1表示刪除含有缺失值的列

  下面是舉例演示,首先我們創造一個包含缺失值的資料框:

import numpy as np
# 創造含有缺失值的示例資料
df = pd.DataFrame({'a': [1, 4, 1, 5],
                   'b': [4, None, np.nan, 7]})
df
圖8
  • 刪除缺失值所在行
# 刪除含有缺失值的行
pdp.DropNa(axis=0).apply(df)

  結果如圖9:

圖9

  • 刪除缺失值所在列
# 刪除含有缺失值的列
pdp.DropNa(axis=1).apply(df)

  結果如圖10:

圖10


FreqDrop:
  這個類用於刪除在指定的一列資料中出現頻次小於所給閾值對應的全部行,主要引數如下:

  • threshold:int型,傳入頻次閾值,低於這個閾值的行將會被刪除
  • column:str型,傳入threshold引數具體作用的列

  下面是舉例演示,首先我們來檢視電影資料集中original_language列對應的頻次分佈情況:

# 檢視original_language頻次分佈
pd.value_counts(data['original_language'])
圖11

  下面我們來過濾刪除original_language列出現頻次小於10的行:

# 過濾original_language頻次低於10的行,再次檢視過濾後的資料original_language頻次分佈
pd.value_counts(pdp.FreqDrop(threshold=10, column='original_language').apply(data)['original_language'])
圖12


RowDrop:
  這個類用於刪除滿足指定限制條件的行,主要引數如下:

  • conditions:dict型,傳入指定列->該列刪除條件鍵值對
  • reduce:str型,用於決定多列組合條件下的刪除策略,'any'相當於條件或,即滿足至少一個條件即可刪除;'all'相當於條件且,即滿足全部條件才可刪除;'xor'相當於條件異或,即當恰恰滿足一個條件時才會刪除,滿足多個或0個都不進行刪除。預設為'any'

  下面是舉例演示,我們以budget小於100000000,genres不包含Action,release_date缺失以及vote_count小於1000作為組合刪除條件,分別檢視在三種不同刪除策略下的最終得以保留的資料行數:

  • 刪除策略:any
pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
                        'genres': lambda x: 'Action' not in x,
                        'release_date': lambda x: x == np.nan,
                        'vote_count': lambda x: x <= 1000},
           reduce='any').apply(data).shape[0]
  • 刪除策略:all
pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
                        'genres': lambda x: 'Action' not in x,
                        'release_date': lambda x: x == np.nan,
                        'vote_count': lambda x: x <= 1000},
           reduce='all').apply(data).shape[0]
  • 刪除策略:xor
pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
                        'genres': lambda x: 'Action' not in x,
                        'release_date': lambda x: x == np.nan,
                        'vote_count': lambda x: x <= 1000},
           reduce='xor').apply(data).shape[0]

  對應的結果如下:

圖13

2.2.2 col_generation

  col_generation中包含了從原資料中產生新列的若干功能:

AggByCols:
  這個類用於將指定的函式作用到指定的列上以產生新結果(可以是新的列也可以是一個聚合值),即這時函式真正傳入的最小計算物件是列,主要引數如下:

  • columns:str或list,用於指定對哪些列進行計算
  • func:傳入需要計算的函式
  • drop:bool型,決定是否在計算完成後把舊列刪除,預設為True,即對應列的計算結果直接替換掉對應的舊列
  • suffix:str型,控制新列字尾名,當drop引數設定為False時,結果列的列名變為其對應列+suffix引數指定的字尾名;當drop設定為False時,此引數將不起作用(因為新列直接繼承了對應舊列的名稱)
  • result_columns:str或list,與columns引數一一對應的結果列名稱,當你想要自定義結果新列名稱時這個引數就變得非常有用,預設為None
  • func_desc:str型,可選引數,為你的函式新增說明文字,預設為None

  下面我們來舉例演示幫助理解上述各個引數:

  • 針對單個列進行計算
pdp.AggByCols(columns='budget',
              func=np.log).apply(data).head(3)

  對應的結果如圖14,可以看到在只傳入columnsfunc這兩個引數,其他引數均為預設值時,對budget列做對數化處理後的新列直接覆蓋了原有的budget列:

圖14

  設定drop引數為False,並將suffix引數設定為'_log':

# 設定drop引數為False,並將suffix引數設定為'_log'
pdp.AggByCols(columns='budget',
              func=np.log,
              drop=False,
              suffix='_log').apply(data).head(3)
圖15

  可以看到這時原有列得以保留,新的列以舊列名+字尾名的方式被新增到舊列之後,下面我們修改result_columns引數以自定義結果列名:

# 設定drop引數為False,並將suffix引數設定為'_log'
pdp.AggByCols(columns='budget',
              func=np.log,
              result_columns='budget(log)').apply(data).head(3)
圖16
  • 針對多個列進行計算
pdp.AggByCols(columns=['budget', 'revenue'],
              func=np.log,
              drop=False,
              suffix='_log').apply(data).head(3)
圖17
  • 計算列的聚合值
pdp.AggByCols(columns='budget',
              func=np.mean, # 這裡傳入的函式是聚合型別的
              drop=False,
              suffix='_mean').apply(data).loc[:, ['budget', 'budget_mean']]

  這時為了保持整個資料框形狀的完整,計算得到的聚合值填充到新列的每一個位置上:

圖18


ApplyByCols:
  這個類用於實現pandas中對列的apply操作,不同於AggByCols中函式直接處理的是列,ApplyByCols中函式直接處理的是對應列中的每個元素。主要引數如下:

  • columns:str或list,用於指定對哪些列進行apply操作
  • func:傳入需要計算的函式
  • drop:bool型,決定是否在計算完成後把舊列刪除,預設為True,即對應列的計算結果直接替換掉對應的舊列
  • colbl_sfx:str型,控制新列字尾名,當drop引數設定為False時,結果列的列名變為其對應列+suffix引數指定的字尾名;當drop設定為False時,此引數將不起作用(因為新列直接繼承了對應舊列的名稱)
  • result_columns:str或list,與columns引數一一對應的結果列名稱,當你想要自定義結果新列名稱時這個引數就變得非常有用,預設為None
  • func_desc:str型,可選引數,為你的函式新增說明文字,預設為None

  下面我們來舉例演示幫助理解上述各個引數:

  • spoken_languages涉及語言數量
      下面的示例對每部電影中涉及的語言語種數量進行計算:
pdp.ApplyByCols(columns='spoken_languages',
                func=lambda x: [item['name'] for item in eval(x)].__len__(),
                drop=False,
                result_columns='spoken_languages_num').apply(data)[['spoken_languages', 'spoken_languages_num']]

  對應的結果:

圖19


ApplyToRows:
  這個類用於實現pandas中對行的apply操作,傳入的計算函式直接處理每一行,主要引數如下:

  • func:傳入需要計算的函式,對每一行進行處理
  • colname:str型,用於定義結果列的名稱(因為ApplyToRows作用的物件是一整行,因此只能形成一列返回值),預設為'new_col'
  • follow_column:str型,控制結果列插入到指定列名之後,預設為None,即放到最後一列
  • func_desc:str型,可選引數,為你的函式新增說明文字,預設為None

  下面我們來舉例演示幫助理解上述各個引數:

  • 得到對應電影的盈利簡報
pdp.ApplyToRows(func=lambda row: f"{row['original_title']}: {round(((row['revenue'] / row['budget'] -1)*100), 2)}%" if row['budget'] != 0 
                else f"{row['original_title']}: 因成本為0故不進行計算",
                colname='movie_desc',
                follow_column='budget',
                func_desc='輸出對應電影的盈利百分比').apply(data).head(3)

  對應的結果:

圖20


Bin:
  這個類用於對連續型資料進行分箱,主要引數如下:

  • bin_map:字典型,傳入列名->分界點列表
  • drop:bool型,決定是否在計算完成後把舊列刪除,預設為True,即對應列的計算結果直接替換掉對應的舊列

  下面我們以計算電影盈利率小於0,大於0小於100%以及大於100%作為三個分箱區間,首先我們用到上文介紹過的RowDrop丟掉那些成本或利潤為0的行,再用ApplyToRows來計算盈利率,最終使用Bin進行分箱:

  • 為電影盈利率進行資料分箱
pipeline = pdp.PdPipeline([pdp.RowDrop(conditions={'budget': lambda x: x == 0,
                                                   'revenue': lambda x: x == 0},
                                       reduce='any'),
                           pdp.ApplyToRows(func=lambda row: row['revenue'] / row['budget'] - 1,
                                           colname='rate of return',
                                           follow_column='budget'),
                           pdp.Bin(bin_map={'rate of return': [0, 1]}, drop=False)])
pipeline(data).head(3)

  對應的結果:

圖21


OneHotEncode:
  這個類用於為類別型變數建立啞變數(即獨熱處理),效果等價於pandas中的get_dummies,主要引數如下:

  • columns:str或list,用於指定需要進行啞變數處理的列名,預設為None,即對全部類別型變數進行啞變數處理
  • dummy_na:bool型,決定是否將缺失值也作為啞變數的一個類別進行輸出,預設為False即忽略缺失值
  • exclude_columns:list,當columns引數設定為None時,這個引數傳入的列名列表中指定的列將不進行啞變數處理,預設為None,即不對任何列進行排除
  • drop_first:bool型或str型,預設為True,這個引數是針對啞變數中類似這樣的情況:譬如有類別型變數性別{男性,女性},那麼實際上只需要產生一列0-1型啞變數即可表示原始變數的資訊,即性別{男性,女性}->男性{0,1},0代表不為男性即女性,1相反,而drop_dirst設定為False時,原始變數有幾個類別就對應幾個啞變數被創造;當設定為指定類別值時(譬如設定drop_first = '男性'),這個值對應的類別將不進行啞變數生成
  • drop:bool型,控制是否在生成啞變數之後刪除原始的類別型變數,預設為True即刪除

  下面我們偽造包含啞變數的資料框:

# 偽造的資料框
df = pd.DataFrame({
    'a': ['x', 'y', 'z'],
    'b': ['i', 'j', 'q']
})
df
圖22

  預設引數下執行OneHotEncode

pdp.OneHotEncode().apply(df)
圖23

  設定drop_first為False:

pdp.OneHotEncode(drop_first=False).apply(df)
圖23

2.2.3 text_stages

  text_stages中包含了對資料框中文字型變數進行處理的若干類,下文只介紹其中我認為最有用的:

RegexReplace:
  這個類用於對文字型列進行基於正則表示式的內容替換,其主要引數如下:

  • columns:str型或list型,傳入要進行替換的單個或多個列名
  • pattern:str,傳入匹配替換內容的正則表示式
  • replace:str,傳入替換後的新字串
  • result_columns:str或list,與columns引數一一對應的結果列名稱,當你想要自定義結果新列名稱時這個引數就變得非常有用,預設為None,即直接替換原始列
  • drop:bool型,用於決定是否刪除替換前的原始列,預設為True,即刪除原始列

  下面是舉例演示:

  • 替換original_language中的'en'或'cn'為'英文/中文'
pdp.RegexReplace(columns='original_language',
                 pattern='en|cn',
                 replace='英文/中文').apply(data)['original_language'].unique()

  結果如圖24:

圖24


2.3 組裝pipeline的幾種方式

  上文中我們主要演示了單一pipeline部件工作時的細節,接下來我們來了解pdpipe中組裝pipeline的幾種方式:

2.3.1 PdPipeline

  這是我們在2.1中舉例說明使用到的建立pipeline的方法,直接傳入由按順序的pipeline元件組成的列表便可生成所需pipeline,而除了直接將其視為函式直接傳入原始資料和一些輔助引數(如verbose控制是否列印過程)之外,還可以用類似scikit-learn中的fit_transform方法:

# 呼叫pipeline的fit_transform方法作用於data直接得到所需結果,並列印流程資訊
first_pipeline.fit_transform(data, verbose=1)
圖25


2.3.2 make_pdpipeline

  與PdpPipeline相似,make_pdpipeline不可以傳入pipeline元件形成的列表,只能把每個pipeline元件當成位置引數按順序傳入:

# 以make_pdpipeline將pipeline元件作為位置引數傳入的方式建立pipeline
first_pipeline1 = pdp.make_pdpipeline(pdp.ColDrop("original_title"), 
                                      pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
                                      pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
                                      pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
                                      pdp.RowDrop({'genres_num': lambda x: x <= 5}))

# 以pdp.PdPipeline傳入流程列表的方式建立pipeline
first_pipeline2 = pdp.PdPipeline([pdp.ColDrop("original_title"), 
                                  pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
                                  pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
                                  pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
                                  pdp.RowDrop({'genres_num': lambda x: x <= 5})])

# 比較兩種不同方式建立的pipeline產生結果是否相同
first_pipeline1.fit_transform(data) == first_pipeline2(data)

  比較結果如圖26,兩種方式殊途同歸:

圖26





  以上就是本文全部內容,如有筆誤望指出!

參考資料:
https://pdpipe.github.io/pdpipe/doc/pdpipe/
https://tirthajyoti.github.io/Notebooks/Pandas-pipeline-with-pdpipe