1. 程式人生 > >平均數編碼:針對某個分類特征類別基數特別大的編碼方式

平均數編碼:針對某個分類特征類別基數特別大的編碼方式

target tar ane 計算 決策樹 原理 beta family 基於

原文:https://zhuanlan.zhihu.com/p/26308272

  插入一條信息:特征編碼一定要考慮是否需要距離度量,編碼方式對距離度量的適應:例如:我們用one-hot編碼顏色,向量正交,各個顏色之間的距離等同,如果此處用序數編碼顯然不太合適,但是我們用one-hot編碼星期幾就不好了,顯然星期一和星期二的距離小於星期一和星期三的距離。

  應用條件:某一個特征是分類的,特征的可能值非常多,那麽平均數編碼是一種高效的編碼方式。

  適用問題:平均數編碼是一種有監督的編碼方式,適用於分類和回歸問題。

  基本原理:基於分類問題分析

  目標y一共有C個不同類別,具體的一個類別用target表示;

  某一個定性特征variable一共有K個不同類別,具體的一個類別用k表示;

  先驗概率:數據點屬於某一個target(y)的概率,技術分享圖片

  後驗概率:該定性特征屬於某一類時,數據點屬於某一個target(y)的概率,技術分享圖片;  

  算法的基本思想:將variable中的每一個k,都表示為它所對應的目標y值概率:技術分享圖片

  因此,整個數據集將增加(C-1)列,是C-1而不是C的原因:技術分享圖片,所以最後一個技術分享圖片的概率值必然和其他技術分享圖片的概率值線性相關。在線性模型、神經網絡以及SVM裏,不能加入線性相關的特征列。如果你使用的是基於決策樹的模型(gbdt、rf等),個人仍然不推薦這種over-parameterization;

  先驗概率與後驗概率的計算:

  技術分享圖片 = (y = target)的數量 / y 的總數

  技術分享圖片 = (y = target 並且 variable = k)的數量 / (variable = k)的數量

  我們已經得到了先驗概率估計技術分享圖片和後驗概率估計技術分享圖片。最終編碼所使用的概率估算,應當是先驗概率與後驗概率的一個凸組合(convex combination)。由此,我們引入先驗概率的權重技術分享圖片來計算編碼所用概率技術分享圖片

  技術分享圖片

  直覺上,技術分享圖片(貝葉斯統計學中技術分享圖片也被稱為shrinkage parameter)應該具有以下特性:

  1. 如果測試集中出現了新的特征類別(未在訓練集中出現),那麽技術分享圖片
  2. 一個特征類別在訓練集內出現的次數越多,後驗概率的可信度越高,其權重也越大;  

  權重函數:

  我們需要定義一個權重函數,輸入是特征類別在訓練集中出現的次數n,輸出是對於這個特征類別的先驗概率的權重技術分享圖片。假設一個特征類別的出現次數為n,以下是一個常見的權重函數:

  技術分享圖片

  這個函數有兩個參數:

  k:當技術分享圖片時,技術分享圖片,先驗概率與後驗概率的權重相同;當技術分享圖片時,技術分享圖片

  f:控制函數在拐點附近的斜率,f越大,“坡”越緩。

  圖示:k=1時,不同的f對於函數圖象的影響。

  技術分享圖片

  當(n - k) / f太大的時候,np.exp可能會產生overflow的警告。我們不需要管這個警告,因為某一類別的頻數極高,分母無限時,最終先驗概率的權重將成為0,這也表示我們對於後驗概率有充足的信任。  

  以上的算法設計能解決多個類別的分類問題,自然也能解決更簡單的兩類分類問題以及回歸問題。

  還有一種情況:定性特征本身包括了不同級別。例如,國家包含了省,省包含了市,市包含了街區。有些街區可能就包含了大量的數據點,而有些省卻可能只有稀少的幾個數據點。這時,我們的解決方法是,在empirical bayes裏加入不同層次的先驗概率估計。  

  代碼實現:

  原論文並沒有提到,如果fit時使用了全部的數據,transform時也使用了全部數據,那麽之後的機器學習模型會產生過擬合。因此,我們需要將數據分層分為n_splits個fold,每一個fold的數據都是利用剩下的(n_splits - 1)個fold得出的統計數據進行轉換。n_splits越大,編碼的精度越高,但也更消耗內存和運算時間。編碼完畢後,是否刪除原始特征列,應當具體問題具體分析。

  

import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from itertools import product

class MeanEncoder:
    def __init__(self, categorical_features, n_splits=5, target_type=classification, prior_weight_func=None):
        """
        :param categorical_features: list of str, the name of the categorical columns to encode
        :param n_splits: the number of splits used in mean encoding
        :param target_type: str, ‘regression‘ or ‘classification‘
        :param prior_weight_func:
        a function that takes in the number of observations, and outputs prior weight when a dict is passed, the default exponential decay function will be used:
        k: the number of observations needed for the posterior to be weighted equally as the prior
        f: larger f --> smaller slope
        """
        self.categorical_features = categorical_features
        self.n_splits = n_splits
        self.learned_stats = {}
        if target_type == classification:
            self.target_type = target_type
            self.target_values = []
        else:
            self.target_type = regression
            self.target_values = None
        if isinstance(prior_weight_func, dict):
            self.prior_weight_func = eval(lambda x: 1 / (1 + np.exp((x - k) / f)), dict(prior_weight_func, np=np))
        elif callable(prior_weight_func):
            self.prior_weight_func = prior_weight_func
        else:
            self.prior_weight_func = lambda x: 1 / (1 + np.exp((x - 2) / 1))

    @staticmethod
    def mean_encode_subroutine(X_train, y_train, X_test, variable, target, prior_weight_func):
        X_train = X_train[[variable]].copy()
        X_test = X_test[[variable]].copy()
        if target is not None:
            nf_name = {}_pred_{}.format(variable, target)
            X_train[pred_temp] = (y_train == target).astype(int) #classification
        else:
            nf_name = {}_pred.format(variable)
            X_train[pred_temp] = y_train  #regression
        prior = X_train[pred_temp].mean()
        col_avg_y = X_train.groupby(by=variable, axis=0)[pred_temp].agg({mean: mean, beta: size})
        col_avg_y[beta] = prior_weight_func(col_avg_y[beta])
        col_avg_y[nf_name] = col_avg_y[beta] * prior + (1 - col_avg_y[beta]) * col_avg_y[mean]
        col_avg_y.drop([beta, mean], axis=1, inplace=True)
        nf_train = X_train.join(col_avg_y, on=variable)[nf_name].values
        nf_test = X_test.join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name].values
        return nf_train, nf_test, prior, col_avg_y

    def fit_transform(self, X, y):
        """
        :param X: pandas DataFrame, n_samples * n_features
        :param y: pandas Series or numpy array, n_samples
        :return X_new: the transformed pandas DataFrame containing mean-encoded categorical features
        """
        X_new = X.copy()
        if self.target_type == classification:
            skf = StratifiedKFold(self.n_splits)
        else:
            skf = KFold(self.n_splits)
        if self.target_type == classification:
            self.target_values = sorted(set(y))
            self.learned_stats = {{}_pred_{}.format(variable, target): [] for variable, target in product(self.categorical_features, self.target_values)}
            for variable, target in product(self.categorical_features, self.target_values):
                nf_name = {}_pred_{}.format(variable, target)
                X_new.loc[:, nf_name] = np.nan
                for large_ind, small_ind in skf.split(y, y):
                    nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(X_new.iloc[large_ind],y.iloc[large_ind],X_new.iloc[small_ind],variable, target, self.prior_weight_func)
                    X_new.iloc[small_ind, -1] = nf_small
                    self.learned_stats[nf_name].append((prior, col_avg_y))
        else:
            self.learned_stats = {{}_pred.format(variable): [] for variable in self.categorical_features}
            for variable in self.categorical_features:
                nf_name = {}_pred.format(variable)
                X_new.loc[:, nf_name] = np.nan
                for large_ind, small_ind in skf.split(y, y):
                    nf_large, nf_small, prior, col_avg_y = MeanEncoder.mean_encode_subroutine(
                        X_new.iloc[large_ind], y.iloc[large_ind], X_new.iloc[small_ind], variable, None, self.prior_weight_func)
                    X_new.iloc[small_ind, -1] = nf_small
                    self.learned_stats[nf_name].append((prior, col_avg_y))
        return X_new

    def transform(self, X):
        """
        :param X: pandas DataFrame, n_samples * n_features
        :return X_new: the transformed pandas DataFrame containing mean-encoded categorical features
        """
        X_new = X.copy()
        if self.target_type == classification:
            for variable, target in product(self.categorical_features, self.target_values):
                nf_name = {}_pred_{}.format(variable, target)
                X_new[nf_name] = 0
                for prior, col_avg_y in self.learned_stats[nf_name]:
                    X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name]
                X_new[nf_name] /= self.n_splits
        else:
            for variable in self.categorical_features:
                nf_name = {}_pred.format(variable)
                X_new[nf_name] = 0
                for prior, col_avg_y in self.learned_stats[nf_name]:
                    X_new[nf_name] += X_new[[variable]].join(col_avg_y, on=variable).fillna(prior, inplace=False)[nf_name]
                X_new[nf_name] /= self.n_splits
        return X_new

  

  

  

  

  

  

平均數編碼:針對某個分類特征類別基數特別大的編碼方式