經典整合學習演算法和部分python實現
Boosting
Boosting的大概思想是先從初始訓練集中訓練出一個基學習器,再根據這個基學習器對訓練集的判斷重新調整訓練集,讓當前分類器判斷錯誤的樣本在後續學習中受到更高的關注,如此不斷迭代,直到生成目標數目的基學習器,然後根據權重相加,獲得一個強學習器。如下圖所示的流程 (上圖來自https://blog.csdn.net/willduan1/article/details/73618677) Boosting一類的演算法中AdaBoost是最著名的代表,下圖為一個流程: (上圖來自https://www.cnblogs.com/DianaCody/p/5425633.html)
- 第一行左邊的圖為樣本的分佈,第一次迭代的時候樣本分佈是均勻的,然後使用分佈均勻的樣本訓練一個弱分類器,然後根據誤差e (這裡e實際上是被分類錯誤的樣本的分佈權重的和)
- 第二次迭代的時候我們就要用新的樣本分佈下的樣本訓練弱分類器了,然後再根據誤差更新樣本分佈並計算該分類器的權重
Bagging
前面說的Boosting由於每次學習的弱學習器都是迭代獲得的,因此只能序列計算,而Bagging的特性允許進行平行計算
隨機森林(Random Forest,RF)是Bagging的一個擴充套件變體,隨機森林使用的基分類器是決策樹,但是與傳統的決策樹不同的是,傳統決策樹在選擇劃分屬性時在當前結點的屬性集合中選擇最優屬性,而隨機森林的決策樹是先從該節點的屬性集合中隨機選擇一個包含k個屬性的子集,然後從子集中選擇最優屬性,一般推薦,d為樣本屬性個數
# -*- coding: gbk -*-
import pandas as pd
def ReadAndSaveDataByPandas(target_url=None, save=False):
wine = pd.read_csv(target_url, header=0, sep=";")
if save == True:
wine.to_csv("D:\Documents\ml_data\carbon_nanotubes.csv",
sep=' ', index=False)
target_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/glass/glass.data" # 一個玻璃的多分類的資料集
ReadAndSaveDataByPandas(target_url, True)
然後給出決策樹的程式碼DecisionTree.py
# -*- coding: gbk -*-
import numpy as np
import math
from collections import Counter
class decisionnode:
def __init__(self, d=None, thre=None, results=None, NH=None, lb=None, rb=None, max_label=None):
self.d = d # d表示維度
self.thre = thre # thre表示二分時的比較值,將樣本集分為2類
self.results = results # 最後的葉節點代表的類別
self.NH = NH # 儲存各節點的樣本量與經驗熵的乘積,便於剪枝時使用
self.lb = lb # desision node,對應於樣本在d維的資料小於thre時,樹上相對於當前節點的子樹上的節點
self.rb = rb # desision node,對應於樣本在d維的資料大於thre時,樹上相對於當前節點的子樹上的節點
self.max_label = max_label # 記錄當前節點包含的label中同類最多的label
def entropy(y):
'''
計算資訊熵,y為labels
'''
if y.size > 1:
category = list(set(y))
else:
category = [y.item()]
y = [y.item()]
ent = 0
for label in category:
p = len([label_ for label_ in y if label_ == label]) / len(y)
ent += -p * math.log(p, 2)
return ent
def Gini(y):
'''
計算基尼指數,y為labels
'''
category = list(set(y))
gini = 1
for label in category:
p = len([label_ for label_ in y if label_ == label]) / len(y)
gini += -p * p
return gini
def GainEnt_max(X, y, d):
'''
計算選擇屬性attr的最大資訊增益,X為樣本集,y為label,d為一個維度,type為int
'''
ent_X = entropy(y)
X_attr = X[:, d]
X_attr = list(set(X_attr))
X_attr = sorted(X_attr)
Gain = 0
thre = 0
for i in range(len(X_attr) - 1):
thre_temp = (X_attr[i] + X_attr[i + 1]) / 2
y_small_index = [i_arg for i_arg in range(
len(X[:, d])) if X[i_arg, d] <= thre_temp]
y_big_index = [i_arg for i_arg in range(
len(X[:, d])) if X[i_arg, d] > thre_temp]
y_small = y[y_small_index]
y_big = y[y_big_index]
Gain_temp = ent_X - (len(y_small) / len(y)) * \
entropy(y_small) - (len(y_big) / len(y)) * entropy(y_big)
'''
intrinsic_value = -(len(y_small) / len(y)) * math.log(len(y_small) /
len(y), 2) - (len(y_big) / len(y)) * math.log(len(y_big) / len(y), 2)
Gain_temp = Gain_temp / intrinsic_value
'''
# print(Gain_temp)
if Gain < Gain_temp:
Gain = Gain_temp
thre = thre_temp
return Gain, thre
def Gini_index_min(X, y, d):
'''
計算選擇屬性attr的最小基尼指數,X為樣本集,y為label,d為一個維度,type為int
'''
X = X.reshape(-1, len(X.T))
X_attr = X[:, d]
X_attr = list(set(X_attr))
X_attr = sorted(X_attr)
Gini_index = 1
thre = 0
for i in range(len(X_attr) - 1):
thre_temp = (X_attr[i] + X_attr[i + 1]) / 2
y_small_index = [i_arg for i_arg in range(
len(X[:, d])) if X[i_arg, d] <= thre_temp]
y_big_index = [i_arg for i_arg in range(
len(X[:, d])) if X[i_arg, d] > thre_temp]
y_small = y[y_small_index]
y_big = y[y_big_index]
Gini_index_temp = (len(y_small) / len(y)) * \
Gini(y_small) + (len(y_big) / len(y)) * Gini(y_big)
if Gini_index > Gini_index_temp:
Gini_index = Gini_index_temp
thre = thre_temp
return Gini_index, thre
def attribute_based_on_GainEnt(X, y):
'''
基於資訊增益選擇最優屬性,X為樣本集,y為label
'''
D = np.arange(len(X[0]))
Gain_max = 0
thre_ = 0
d_ = 0
for d in D:
Gain, thre = GainEnt_max(X, y, d)
if Gain_max < Gain:
Gain_max = Gain
thre_ = thre
d_ = d # 維度標號
return Gain_max, thre_, d_
def attribute_based_on_Giniindex(X, y):
'''
基於資訊增益選擇最優屬性,X為樣本集,y為label
'''
D = np.arange(len(X.T))
Gini_Index_Min = 1
thre_ = 0
d_ = 0
for d in D:
Gini_index, thre = Gini_index_min(X, y, d)
if Gini_Index_Min > Gini_index:
Gini_Index_Min = Gini_index
thre_ = thre
d_ = d # 維度標號
return Gini_Index_Min, thre_, d_
def devide_group(X, y, thre, d):
'''
按照維度d下閾值為thre分為兩類並返回
'''
X_in_d = X[:, d]
x_small_index = [i_arg for i_arg in range(
len(X[:, d])) if X[i_arg, d] <= thre]
x_big_index = [i_arg for i_arg in range(
len(X[:, d])) if X[i_arg, d] > thre]
X_small = X[x_small_index]
y_small = y[x_small_index]
X_big = X[x_big_index]
y_big = y[x_big_index]
return X_small, y_small, X_big, y_big
def NtHt(y):
'''
計算經驗熵與樣本數的乘積,用來剪枝,y為labels
'''
if len(y) == 0:
return 0.1
ent = entropy(y)
print('ent={},y_len={},all={}'.format(ent, len(y), ent * len(y)))
return ent * len(y)
def maxlabel(y):
print(y)
label_ = Counter(y).most_common(1)
return label_[0][0]
def ChooseSubsetForRF(X, y, d):
'''
d為屬性的個數
'''
k = int(np.log2(d))
index = np.random.choice(d, k, replace=False)
X_sub = X[:, index]
return X_sub, index
def buildtree(X, y, method='Gini'):
'''
遞迴的方式構建決策樹
'''
if y.size > 1:
X_sub, d_index = ChooseSubsetForRF(X, y, d=X.shape[1])
if method == 'Gini':
Gain_max, thre, d = attribute_based_on_Giniindex(X_sub, y)
elif method == 'GainEnt':
Gain_max, thre, d = attribute_based_on_GainEnt(X_sub, y)
if (Gain_max > 0 and method == 'GainEnt') or (Gain_max >= 0 and len(list(set(y))) > 1 and method == 'Gini'):
X_small, y_small, X_big, y_big = devide_group(
X, y, thre, d_index[d])
left_branch = buildtree(X_small, y_small, method=method)
right_branch = buildtree(X_big, y_big, method=method)
nh = NtHt(y)
max_label = maxlabel(y)
return decisionnode(d=d_index[d], thre=thre, NH=nh, lb=left_branch, rb=right_branch, max_label=max_label)
else:
nh = NtHt(y)
max_label = maxlabel(y)
return decisionnode(results=y[0], NH=nh, max_label=max_label)
elif y.size == 1:
nh = NtHt(y)
max_label = maxlabel(y)
return decisionnode(results=y.item(), NH=nh, max_label=max_label)
def classify(observation, tree):
if tree.results != None:
return tree.results
else:
v = observation[tree.d]
branch = None
if v > tree.thre:
branch = tree.rb
else:
branch = tree.lb
return classify(observation, branch)
最後給出隨機森林的程式碼RandomForest.py
# -*- coding: gbk -*-
import pandas as pd
import numpy as np
import DecisionTree
from collections import Counter
from sklearn.ensemble import RandomForestClassifier
class RF():
def __init__(self, num=5):
self.num = num # 基分類器數量
def random_Xy(self, X, y, per=80):
'''
per表示要取出的X,y中的數量佔原來的比重,預設為80%
'''
index = np.random.choice(y.shape[0], int(
y.shape[0] * per / 100), replace=False)
return X[index], y[index]
def fit(self, X, y):
self.tree = []
for i in range(self.num):
X_r, y_r = self.random_Xy(X, y)
self.tree.append(DecisionTree.buildtree(X_r, y_r, method='Gini'))
def predict(self, x):
results = []
for i in range(self.num):
results.append(DecisionTree.classify(x, self.tree[i]))
return Counter(results).most_common(4)[0][0]
if __name__ == '__main__':
dir = 'D:\\Documents\\ml_data\\'
name = 'glass.csv'
obj = pd.read_csv(dir + name, header=None)
data = np.array(obj[:][:])
label = data[:, -1]
label = label.astype(int)
data = data[:, 1:data.shape[1] - 1]
test_index = np.random.choice(label.shape[0], 50, replace=False)
test_label = label[test_index]
test_data = data[test_index]
train_index = np.linspace(
0, data.shape[0], num=data.shape[0], endpoint=False, dtype=int)
train_index = np.delete(train_index, test_index, axis=0)
train_label = label[train_index]
train_data = data[train_index]
rf = RF(num=50)
rf.fit(train_data, train_label)
true_count = 0
for i in range(len(test_label)):
predict = rf.predict(test_data[i])
if predict == test_label[i]:
true_count += 1
print(true_count / test_label.shape[0])
clf = RandomForestClassifier(n_estimators=50)
clf = clf.fit(train_data, train_label)
pl = clf.predict(test_data)
diff = pl - test_label
count1 = 0
for i in diff:
if i == 0:
count1 += 1
print(count1 / test_label.shape[0])
這裡同時呼叫的sklearn中的隨機森林,兩者進行了比較,執行結束後兩者正確率對比如下 自己寫的隨機森林正確率80% 庫函式隨機森林正確率84% 生成的決策樹都為50個
stacking
前面說的方法對於基學習器的處理都是線性累加,而stacking是通過另一個學習器來對基學習器進行結合的,我們把個體學習器稱為出基學習器,把用於結合的學習器稱為次級學習器或元學習器(meta-learner)。 (圖片來自https://www.cnblogs.com/jiaxin359/p/8559029.html) 在網上看到好多引用這個圖的部落格,有一些講解的感覺不是很容易理解,這裡我詳細記錄一下:
為了防止過擬合,stacking通常用k折交叉驗證來進行訓練,圖中使用的是5折,只用了1個學習器Model1,但是由於這裡用了5折交叉驗證,因此擁有5個子樣本集,所以實際上訓練了5個不同的Model1,然後再將那5個子樣本集對應的沒有訓練的樣本代入對應的Model1中,得到的預測結果拼起來,變成一個1*n維的向量,作為次級學習器的樣本集。
通常stacking演算法會使用m種Model1,那麼最後獲得的供給次級學習器的樣本集就是一個m維的n個樣本。
全部學習完以後,對於預測,首先將預測樣本代入所有的基學習器(5*m個),然後同類學習器獲得的結果取平均值,最後得到的是1*m的向量,代入次級學習器給出最終的預測結果
這裡還有另外一個圖看著更清晰一些: (圖片來自https://blog.csdn.net/qq_18916311/article/details/78557722) 這張圖描述的是使用XGB和RF作為基學習器,LR作為次級學習器的stacking 注意:已很強的學習器(如randomForest,SVM, gbm,mboost 等),再用強學習器整合,一般沒啥子效果,,該結合特定專業或實際需要,採用特定方法整合,才會對實踐有價值 這裡我借用sklearn庫中的一些分類器作為基學習器寫了一個stacking 分類的資料來自http://archive.ics.uci.edu/ml/datasets/Avila,訓練集和測試集都有10000多個樣本
# -*- coding: gbk -*-
import numpy as np
import pandas as pd
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, ExtraTreesClassifier
import copy
class stacking():
def __init__(self, n_folds=5, meta_model_name='svc'):
self.n_folds = n_folds
self.meta_model_name = meta_model_name
try:
if self.meta_model_name == 'svc':
self.meta_model = SVC(gamma=1, C=1)
elif self.meta_model_name == 'gbc':
self.meta_model = GradientBoostingClassifier()
elif self.meta_model_name == 'etc':
self.meta_model = ExtraTreesClassifier()
elif self.meta_model_name == 'dtc':
self.meta_model = DecisionTreeClassifier()
elif self.meta_model_name == 'rfc':
self.meta_model = RandomForestClassifier(n_estimators=50)
except ValueError:
print('error arg')
self.gbc_model = GradientBoostingClassifier()
self.svc_model = SVC(gamma=0.5, C=50)
self.etc_model = ExtraTreesClassifier()
self.dtc_model = DecisionTreeClassifier()
self.rfc_model = RandomForestClassifier(n_estimators=50)
self.gbc_models = []
self.svc_models = []
self.etc_models = []
self.dtc_models = []
self.rfc_models = []
self.base_models = [self.gbc_models, self.svc_models,
self.etc_models, self.dtc_models, self.rfc_models]
def get_n_folds(self, X, y):
n_X = []
n_y = []
for i in range(self.n_folds):
n_X.append(X[i::self.n_folds])
n_y.append(y[i::self.n_folds])
return n_X, n_y
def multi_base_models_train(self, X, y):
n_X, n_y = self.get_n_folds(X, y)
meta_data = np.array([])
meta_label = np.array([])
for i in range(self.n_folds):
index = np.arange(self.n_folds)
index = np.delete(index, i)
# 獲取n_fold中除去某一折以後的訓練集
n_fold_X = np.array([])
n_fold_y = np.array([])
for k in index:
n_fold_X = np.append(n_fold_X, n_X[k])
n_fold_y = np.append(n_fold_y, n_y[k])
n_fold_X = n_fold_X.reshape(-1, X.shape[1])
part_meta_data = np.array([])
for model_, j in [(self.gbc_model, 0), (self.svc_model, 1), (self.etc_model, 2), (self.dtc_model, 3), (self.rfc_model, 4)]:
# 訓練初級分類器
model_.fit(n_fold_X, n_fold_y)
new_model = copy.deepcopy(model_)
self.base_models[j].append(new_model)
predict_y = self.base_models[j][i].predict(n_X[i])
predict_y = predict_y.reshape(-1, 1)
# 將一折的預測label資料作為次級訓練集
if j == 0:
part_meta_data = predict_y
else:
part_meta_data = np.append(
part_meta_data, predict_y, axis=1)
if i == 0:
meta_data = part_meta_data
else:
meta_data = np.append(meta_data, part_meta_data, axis=0)
meta_label = np.append(meta_label, n_y[i])
return meta_data, meta_label
def meta_model_train(self, X, y):
self.meta_model.fit(X, y)
def fit(self, X, y):
meta_data, meta_label = self.multi_base_models_train(X, y)
self.meta_model_train(meta_data, meta_label)
def predict(self, XX):
for i in range(self.n_folds): # 選擇交叉驗證得到的分類器組
for j in range(5): # 選擇分類器型別
predict_y = self.base_models[j][i].predict(XX)
predict_y = predict_y.reshape(-1, 1)
if j == 0:
meta_data = predict_y
else:
meta_data = np.append(meta_data, predict_y, axis=1)
if i == 0:
ave_meta_data = meta_data
else:
ave_meta_data = ave_meta_data + meta_data
ave_meta_data = ave_meta_data / self.n_folds
pre = self.meta_model.predict(ave_meta_data)
return pre
def count_true(pre_p, p):#計算正確率
p1 = pre_p - p
count = 0
for i in p1:
if i == 0:
count += 1
print(count / p.shape[0])
if __name__ == '__main__':
label_in = {'A': 0, 'F': 1, 'H': 2, 'E': 3, 'I': 4, 'Y': 5,
'D': 6, 'X': 7, 'G': 8, 'W': 9, 'C': 10, 'B': 11} # 將標籤從字母轉換為數字,便於使用
dir = 'D:\\Documents\\ml_data\\avila\\'
train_name = 'avila-tr.txt'
test_name = 'avila-ts.txt'
obj = pd.read_csv(dir + train_name, header=None)
data = np.array(obj[:][:])
train_label = data[:, -1]
for i in range(train_label.shape[0]):
train_label[i] = label_in[train_label[i]]
train_label = train_label.astype(int)
train_data = data[:, 0:data.shape[1] - 1]
obj = pd.read_csv(dir + test_name, header=None)
data = np.array(obj[:][:])
test_label = data[:, -1]
for i in range(test_label.shape[0]):
test_label[i] = label_in[test_label[i]]
test_label = test_label.astype(int)
test_data = data[:, 0:data.shape[1] - 1]
stk = stacking(n_folds=5, meta_model_name='dtc')
stk.fit(train_data, train_label)
for j in range(5): # 顯示出每個基分類器對測試集的預測正確率
for i in range(stk.n_folds):
a = stk.base_models[j][i].predict(test_data)
count_true(a, test_label)
print('-------')
a = stk.predict(test_data)
count_true(a, test_label)
這裡使用5折交叉驗證,基學習器為GradientBoostingClassifier, SVC, ExtraTreesClassifier, DecisionTreeClassifier, RandomForestClassifier, 次級學習器為DecisionTreeClassifier,執行結果正確率為95.3% 輸出如下:
0.9434703458848328
0.9468237999425122
0.9495065631886558
0.9412666475040721
0.9500814410271151
-------
0.8215004311583789
0.8243748203506754
0.8232250646737568
0.8215962441314554
0.8267701446775894
-------
0.9122353166618761
0.9565009102232442
0.9229663696464502
0.9566925361693973
0.9133850723387946
-------
0.969244035642426
0.9697231005078087
0.964740825907828
0.9622496886078375
0.9381048193925458
-------
0.9767174475423972
0.9736514324039475
0.9743221232154834
0.9743221232154834
0.9758551307847082
-------
0.9531474561655648
這裡每5個數據表示同類分類器對測試集的正確率,最後一個是整個stacking預測的正確率,我發現效果不是很好,考慮前面加粗的“注意”說過的話,我嘗試將這些很強的學習器換成弱一些的,然而最後的