1. 程式人生 > >python資料分析:商品資料化運營(下)——基於投票組合模型的異常檢測

python資料分析:商品資料化運營(下)——基於投票組合模型的異常檢測

本案例用到的主要技術包括:

  • 基本預處理:使用DictVectorizer將字串分類變數轉換為數值型變數、使用SMOTE對不均衡樣本做過抽樣處理。
  • 資料建模:基於cross_val_score的交叉檢驗、基於LogisticRegression、RandomForest、Bagging概率投票組合模型做分類。

案例資料

以下是本資料集的13個特徵變數的詳細說明:

  • order_id:訂單ID,數字組合而成,例如4283851335。
  • order_date:訂單日期,格式為YYYY-MM-DD,例如2013-10-17。
  • order_time:訂單日期,格式為HH:MM:SS,例如12:54:44。
  • cat:商品一級類別,字串型,包含中文、英文。
  • attribution:商品所屬的渠道來源,字串型,包含中文、英文。
  • pro_id:商品ID,數字組合而成。
  • pro_brand:商品品牌,字串型,包含中文、英文。
  • total_money:商品銷售金額,浮點型。
  • total_quantity:商品銷售數量,整數型。
  • order_source:訂單來源,從哪個渠道形成的銷售,字串型,包含中文、英文。
  • pay_type:支付型別,字串型,包含中文、英文。
  • use_id:使用者ID,由數字和字母等組成的字串。
  • city:使用者訂單時的城市,字串型,中文。

目標變數:abnormal_label,代表該訂單記錄是否是異常訂單。

import numpy as np  # numpy庫
import pandas as pd  # pandas庫
from sklearn.feature_extraction import DictVectorizer  # 數值分類轉整數分類庫
from imblearn.over_sampling import SMOTE  # 過抽樣處理庫SMOTE
from sklearn.model_selection import StratifiedKFold, cross_val_score  # 匯入交叉檢驗演算法
from sklearn.linear_model import LogisticRegression # 匯入邏輯迴歸庫 from sklearn.preprocessing import LabelEncoder from sklearn.ensemble import VotingClassifier, RandomForestClassifier, BaggingClassifier # 三種整合分類庫和投票方法庫 import warnings warnings.filterwarnings('ignore') # 定義特殊欄位資料格式 dtypes = {'order_id': np.object, 'pro_id': np.object, 'use_id': np.object} # 載入資料,有點大,需要等待 df = pd.read_csv('https://raw.githubusercontent.com/ffzs/dataset/master/abnormal_orders.txt', dtype=dtypes) # 資料概觀 df.head().T

在這裡插入圖片描述

# 資料型別
df.dtypes

在這裡插入圖片描述

# 檢視缺失值
df.isna().sum()

在這裡插入圖片描述

# 有缺失值的總行數
df.isna().any(axis=1).sum()
# 1429

# 檢視樣本類分佈情況
df.abnormal_label.value_counts()

'''
0    105733
1     28457
Name: abnormal_label, dtype: int64
'''

類樣本分佈審查,執行label_samples_summary(raw_data),從結果中發現數據存在一定程度的不均衡,異常值記錄(label為1)跟非異常值的比例為1:3.7左右。該結果可以處理也可以不處理,這裡我們選擇處理。

# 資料描述
df.describe()

在這裡插入圖片描述

通過描述性統計,發現total_money和total_quantity中存在了極大值。但是我們選擇不做任何處理,因為本節的主題就是針對異常值的分類檢測。

def str2int(set, convert_object, unique_object, training=True):
    '''
    用於將分類變數中的字串轉換為數值索引分類
    :param set: 資料集
    :param convert_object:  DictVectorizer轉換物件,當training為True時為空;當training為False時則使用從訓練階段得到的物件
    :param unique_object: 唯一值列表,當training為True時為空;當training為False時則使用從訓練階段得到的唯一值列表
    :param training: 是否為訓練階段
    :return: 訓練階段返回model_dvtransform,unique_list,traing_part_data;預測應用階段返回predict_part_data
    '''
    convert_cols = ['cat', 'attribution', 'pro_id', 'pro_brand', 'order_source', 'pay_type', 'use_id',
                    'city']  # 定義要轉換的列
    final_convert_matrix = set[convert_cols]  # 獲得要轉換的資料集合
    lines = set.shape[0]  # 獲得總記錄數
    dict_list = []  # 總空列表,用於存放字串與對應索引組成的字典
    if training == True:  # 如果是訓練階段
        unique_list = []  # 總唯一值列表,用於儲存每個列的唯一值列表
        for col_name in convert_cols:  # 迴圈讀取每個列名
            cols_unqiue_value = set[col_name].unique().tolist()  # 獲取列的唯一值列表
            unique_list.append(cols_unqiue_value)  # 將唯一值列表追加到總列表
        for line_index in range(lines):  # 讀取每行索引
            each_record = final_convert_matrix.iloc[line_index]  # 獲得每行資料,是一個Series
            for each_index, each_data in enumerate(each_record):  # 讀取Series每行對應的索引值
                list_value = unique_list[each_index]  # 讀取該行索引對應到總唯一值列表列索引下的資料(其實是相當於原來的列做了轉置成了行,目的是查詢唯一值在列表中的位置)
                each_record[each_index] = list_value.index(each_data)  # 獲得每個值對應到總唯一值列表中的索引
            each_dict = dict(zip(convert_cols, each_record))  # 將每個值和對應的索引組合字典
            dict_list.append(each_dict)  # 將字典追加到總列表
        model_dvtransform = DictVectorizer(sparse=False, dtype=np.int64)  # 建立轉換模型物件
        model_dvtransform.fit(dict_list)  # 應用分類轉換訓練
        traing_part_data = model_dvtransform.transform(dict_list)  # 轉換訓練集
        return model_dvtransform, unique_list, traing_part_data
    else:  # 如果是預測階段
        for line_index in range(lines):  # 讀取每行索引
            each_record = final_convert_matrix.iloc[line_index]  # 獲得每行資料,是一個Series
            for each_index, each_data in enumerate(each_record):  # 讀取Series每行對應的索引值
                list_value = unique_object[each_index]  # 讀取該行索引對應到總唯一值列表列索引下的資料(其實是相當於原來的列做了轉置成了行,目的是查詢唯一值在列表中的位置)
                each_record[each_index] = list_value.index(each_data)  # 獲得每個值對應到總唯一值列表中的索引
            each_dict = dict(zip(convert_cols, each_record))  # 將每個值和對應的索引組合字典
            dict_list.append(each_dict)  # 將字典追加到總列表
        predict_part_data = convert_object.transform(dict_list)  # 轉換預測集
        return predict_part_data
    
def datetime2int(df):
    '''
    將日期和時間資料拓展出其他屬性,例如星期幾、周幾、小時、分鐘等。
    :param set: 資料集
    :return: 拓展後的屬性矩陣
    '''
    # 獲取datetime
    datetime_data = pd.to_datetime(df.order_date + " " + df.order_time)
    # 獲取週數
    weekday_data = datetime_data.dt.weekday.tolist()
    # 獲取天數
    daysinmonth_data = datetime_data.dt.day.tolist()
    # 獲取月份
    month_data = datetime_data.dt.month.tolist()
    # 獲取秒
    second_data = datetime_data.dt.second.tolist()
    # 獲取分鐘
    minute_data = datetime_data.dt.minute.tolist()
    # 獲取小時
    hour_data = datetime_data.dt.hour.tolist()
    final_set = []  # 列表,用於將上述拓展屬性組合起來
    final_set.extend((weekday_data, daysinmonth_data, month_data, second_data, minute_data, hour_data))  # 將屬性列表批量組合
    final_matrix = np.array(final_set).T  # 轉換為矩陣並轉置
    return final_matrix

def sample_balance(X, y):
    '''
    使用SMOTE方法對不均衡樣本做過抽樣處理
    :param X: 輸入特徵變數X
    :param y: 目標變數y
    :return: 均衡後的X和y
    '''
    model_smote = SMOTE()  # 建立SMOTE模型物件
    x_smote_resampled, y_smote_resampled = model_smote.fit_sample(X, y)  # 輸入資料並作過抽樣處理
    return x_smote_resampled, y_smote_resampled

drop_na_set = df.dropna()  # 丟棄帶有NA值的資料行
X_raw = drop_na_set.iloc[:, 1:-1]  # 分割輸入變數X,並丟棄訂單ID列和最後一列目標變數
y_raw = drop_na_set.iloc[:, -1]  # 分割目標變數y

model_dvtransform, unique_object, str2int_data = str2int(X_raw, None, None, training=True)  # 字串分類轉整數型分類

datetime2int_data = datetime2int(X_raw)  # 拓展日期時間屬性

combine_set = np.hstack((str2int_data, datetime2int_data))  # 合併轉換後的分類和拓展後的日期資料集

constant_set = X_raw[['total_money', 'total_quantity']]  # 原始連續資料變數
X_combine = np.hstack((combine_set, constant_set))  # 再次合併資料集
X, y = sample_balance(X_combine, y_raw)  # 樣本均衡處理

丟棄NA值:由於樣本量足夠大,因此我們處理中會選擇丟棄缺失值,這是一種“大資料”量下的缺失值問題。使用drop方法丟棄,形成不含有NA值的drop_na_set。

分割輸入變數X:在drop_na_set基礎上,使用ix方法獲取從第二列開始到倒數第二列,形成輸入變數集合X_raw。第一列為訂單ID,該列用於區別每個訂單,因此該唯一區別值不具有規律特徵;最後一列是目標變數y。

分割目標變數y:在drop_na_set基礎上,使用iloc方法獲取最後一列資料,形成目標資料集y_raw。

字串分類轉整數型分類:直接呼叫str2int方法對X做轉換,返回model_dvtransform、unique_object、str2int_data分別是訓練後的DictVectorizer物件、唯一值總列表和轉換後的數值型分類。

拓展日期時間屬性:直接呼叫datetime2int方法對X做轉換,形成結果集datetime2int_data。

合併轉換後的分類和拓展後的日期資料集:使用numpy的hastck方法將分類和拓展後的日期資料集沿列合併,形成合並資料集combine_set。

將原始資料集中的total_money和total_quantity列資料集提取出來,然後再次使用numpy的hstack方法與combine_set做合併,至此形成了完整的輸入變數集X_combine。

最後呼叫sample_balance函式對X_combine和y_raw做過抽樣處理,形成最終結果集X和y。

model_rf = RandomForestClassifier(n_estimators=20, random_state=0)  # 隨機森林分類模型物件
model_lr = LogisticRegression(random_state=0)  # 邏輯迴歸分類模型物件
model_BagC = BaggingClassifier(n_estimators=20, random_state=0)  # Bagging分類模型物件
estimators = [('randomforest', model_rf), ('Logistic', model_lr), ('bagging', model_BagC)]  # 建立組合評估器列表
model_vot = VotingClassifier(estimators=estimators, voting='soft', weights=[0.9, 1.2, 1.1], n_jobs=-1)  # 建立組合評估模型
cv = StratifiedKFold(8)  # 設定交叉檢驗方法
cv_score = cross_val_score(model_vot, X, y, cv=cv)  # 交叉檢驗
print ('{:*^60}'.format('Cross val socres:'))
print (cv_score)  # 列印每次交叉檢驗得分
print ('Mean scores is: %.2f' % cv_score.mean())  # 列印平均交叉檢驗得分
model_vot.fit(X, y)  # 模型訓練

結果如下:

   *********************Cross val socres:**********************
    [0.76707504 0.92312404 0.97251149 0.97182236 0.92419602 0.90868367
     0.90967915 0.9167241 ]
    Mean scores is: 0.91

從交叉檢驗結果看出,8次交叉檢驗除了第一次結果略差以外,其他7次都比較穩定,整體交叉檢驗得分(準確率)達到91%,說明了其準確率和魯棒性相對不錯。

建立多個分類模型物件。通過RandomForestClassifier方法建立隨機森林分類模型物件model_rf,設定分類器數量為20,目的是希望通過更多的分類器達到更好的分類精度;設定隨機狀態為0,目的是控制每次隨機的結果相同。然後按照類似的步驟分別建立邏輯迴歸分類模型物件model_lr、Bagging分類模型物件model_BagC。

提示:RandomForest、Bagging以及之前我們用到的AdaBoost、Gradient Boosting都是常用的整合方法。除了這些外,sklearn.ensemble中還提供了extra-trees、Isolation Forest等多種整合方法。這些整合方法大多數都既有分類器又有迴歸器,意味著可以用於分類和迴歸。

建立一個由模型物件名稱和模型物件組合的元組的列表estimators。其中:物件名稱為了區分和識別使用,任意字串都可以;模型物件是上面建立的三個分類器物件。該列表用於組合投票模型器的引數設定。

使用VotingClassifier方法建立一個基於投票方法的組合分類模型器,具體引數如下:

  • estimators:模型組合為上面建立的estimators列表。
  • voting:投放方法設定為soft,意味著使用每個分類器的概率做投票統計,最終按投票概率選出;還可以設定為hard,意味著通過每個分類器的label按得票最多的label做預測輸出。
  • weights:設定三個分類器對應的投票權重,這樣可以將分類概率和權重做加權求和。
  • n_jobs:設定為-1意味著計算時使用所有的CPU。

ps:在設定voting引數時,如果設定為soft,要求每個模型器必須都支援predict_proba方法,否則只能使用hard方法。例如使用SVC(SVM的分類器)時,就只能設定為hard。

使用StratifiedKFold(8)設定一個8折交叉檢驗方法,這是一個按照目標變數的樣本比例進行隨機抽樣的方法,尤其適合分類演算法的交叉檢驗。

X_raw_data = pd.read_csv('https://raw.githubusercontent.com/ffzs/dataset/master/new_abnormal_orders.csv', dtype=dtypes)  # 讀取要預測的資料集
X_raw_new = X_raw_data.iloc[:, 1:]  # 分割輸入變數X,並丟棄訂單ID列和最後一列目標變數
str2int_data_new = str2int(X_raw_new, model_dvtransform, unique_object, training=False)  # 字串分類轉整數型分類
datetime2int_data_new = datetime2int(X_raw_new)  # 日期時間轉換
combine_set_new = np.hstack((str2int_data_new, datetime2int_data_new))  # 合併轉換後的分類和拓展後的日期資料集
constant_set_new = X_raw_new[['total_money', 'total_quantity']]  # 原始連續資料變數
X_combine_new = np.hstack((combine_set_new, constant_set_new))  # 再次合併資料集
y_predict = model_vot.predict(X_combine_new)  # 預測結果
print ('{:*^60}'.format('Predicted Labesls:'))
print (y_predict)  # 列印預測值。

結果:

*********************Predicted Labesls:********************* 
 [1 0 0 0 0 0 0]

結論

在該案例中,91%的準確率是一個比較高的結果,該結果無論是預測的準確率或者多資料集的魯棒性都表現突出。這首先得益於各個分類評估器本身的效能比較穩定,尤其是整合方法的隨機森林和Bagging方法;其次是基於預測概率的投票方法配合經驗上的權重分配,會使得經驗與資料完美結合,也會產生相互疊加效應,在正確配置的前提下,會進一步增強組合模型的分類準確率。

注意點

關於耗時

以筆者的工作環境,完整執行一次大約需要需要15分鐘時間,這裡面有兩個主要耗時的環節:

訓練階段的字串分類轉整數型分類str2int,該過程需要對矩陣中的每個值做對映。
訓練階段的交叉檢驗,該過程由於是8折交叉檢驗並且裡面有2個整合方法,再加上使用組合投票的分類器的應用,導致整個交叉檢驗耗時較長。
因此,如果讀者更側重於效率的話,那麼這種基於組合投票以及整合分類方法的實現思路將不是優先選擇。

關於輸入特徵變數

本案例中應用了兩個特殊欄位pro_id、use_id,這兩類ID一般作為關聯主鍵或者資料去重唯一ID,而很少用於模型訓練本身。這裡使用的原因是希望能從中找到是否異常訂單也會集中在某些品類或某些客戶上。筆者經過測試,如果把這兩個維度去掉,整個模型的準確率會下降到70%以下。

關於樣本均衡

由於本案例中的兩類資料差異並沒有特別大(例如1:10甚至更大),因此均衡處理不是必須的。本案例中由於運營對於異常的定義比較寬鬆,因此才會形成大量異常名單範圍。但實際上異常檢測在很多情況下的記錄是比較少的,因此樣本均衡操作通常必不可少。

字串分類轉整數型分類

字串分類轉整數型分類以及後續的二值化標誌問題,應用的前提是訓練集中被轉換的唯一值域必須是固定的,否則在預測集轉換時遇到新資料值時就會報錯,這點在之前提到過。在這裡再次提出希望讀者注意。

有關資料集中的NA值

在資料處理的一開始,我們就已經將NA值排除了。但讀者是否想過,如果預測應用時,再次出現NA值該如何處理?

我們先分析下輸入變數在訂單資訊生成時,是否允許出現缺失值。attribution、cat、pro_id、pro_brand這幾個欄位都是根據資料庫中商品資訊自動匹配的,因此不應該出現缺失值;order_id、order_date、order_time、total_money、total_quantity、order_source、use_id、city是訂單時生成的必填資訊,也不應該有缺失;而關於pay_type這個要看具體業務部門如何定義:

  • 如果基於已經支付的訂單做異常分類檢測,那麼該欄位不應為空;
  • 如果基於全部訂單做異常分類檢測,那麼該欄位會經常出現為空的情況;

基於上述分析,我們會有如下對應策略:

  • 針對attribution、cat、pro_id、pro_brand欄位,只要有pro_id(商品ID),就可以從商品庫中匹配出這些資訊來。
  • 針對order_id、order_date、order_time、total_money、total_quantity、order_source、use_id、city欄位,只要有order_id,就可以從訂單庫中匹配出這些資訊來。

那如果商品庫和訂單庫沒有兩個ID,或者即使匹配回來的資料仍然有NA值如何處理?由於這些資料理論上不應該為空,建議將其篩選出來單獨儲存(當然不作預測),然後跟IT部門溝通,分析到底為什麼會出現缺失值並制定補足策略,該策略會應用到缺失值處理過程中。

如果缺失值無法預測、也無法避免,那麼可以通過條件判斷如果資料記錄中有缺失值則不作檢測,畢竟缺失值只佔1%不到;如果讀者認為有必要,則可以將缺失值作為一種特殊值的分佈形態,以具體值(例如0)做填充,用於後續資料處理和建模使用,這也是一種行之有效的方法。

提示 通常很多資料處理環節對NA是“無法容忍”的,例如OneHotEncoder就無法將NA值轉換為二值化矩陣,因為NA不是整數型資料。此時將NA值以特定值填充轉換是一種變通思路。**

參考:

《python資料分析與資料化運營》 宋天龍