機器學習演算法--CART分類迴歸樹
許多問題都是非線性的,用線性模型並不能很好的擬合數據,這種情況下可以使用樹迴歸來擬合數據。介紹CART, 樹剪枝,模型樹。
1.CART
傳統決策樹是一種貪心演算法,在給定時間內做出最佳選擇,不關心是否達到全域性最優。切分過於迅速,特徵一旦使用後面將不再使用。不能處理連續型特徵,進行離散化可能會破壞連續變數的內在特徵。
CART 分類迴歸樹,既能分類又能迴歸。CRAT來進行節點決策時,使用二元切分來處理連續型變數,給定特徵屬性以及特徵值,若大於該值則執行左子樹,相反則放入右子樹。當某個節點不能再切分時,節點值是單個值(CART),也可以是一個線性方程(模型樹)。
載入資料集 按行載入到矩陣中:
def loadDataSet(fileName): #general function to parse tab -delimited floats dataMat = [] #assume last column is target value fr = open(fileName) for line in fr.readlines(): curLine = line.strip().split('\t') fltLine = list(map(float,curLine)) #map all elements to float() dataMat.append(fltLine) return dataMat
按照某一特徵以及響應特徵值來對資料集進行劃分:
feature特徵屬性的索引 即列數 value劃分閾值 大於閾值放入mat0 否則mat1
def binSplitDataSet(dataSet, feature, value):
mat0 = dataSet[nonzero(dataSet[:,feature] > value)[0],:]
mat1 = dataSet[nonzero(dataSet[:,feature] <= value)[0],:]
return mat0,mat1
建立迴歸樹:
找到最佳待切分屬性;
如果該節點不能切分,則該節點存為子節點
執行二元切分
右子樹呼叫createTree()
左子樹呼叫createTree()
leafType 建立葉節點的函式 errType代表誤差計算函式 每一個節點使用字典來儲存,分別包含spInd spVal left right等key值。
def createTree(dataSet, leafType=regLeaf, errType=regErr, ops=(1,4)):#assume dataSet is NumPy Mat so we can array filtering
feat, val = chooseBestSplit(dataSet, leafType, errType, ops)#choose the best split
if feat == None: return val #if the splitting hit a stop condition return val
retTree = {}
retTree['spInd'] = feat
retTree['spVal'] = val
lSet, rSet = binSplitDataSet(dataSet, feat, val)
retTree['left'] = createTree(lSet, leafType, errType, ops)
retTree['right'] = createTree(rSet, leafType, errType, ops)
return retTree
樹節點劃分的度量,計算連續函式的混亂度(決策樹使用資訊熵以及基尼係數等),這裡可以採用資料的總方差來計算資料的混亂度,均方差乘以資料集的樣本數。
遍歷所有特徵以及所有特徵值使總方差最小的值即為劃分特徵以及劃分閾值。
def chooseBestSplit(dataSet, leafType=regLeaf, errType=regErr, ops=(1,4)):
tolS = ops[0]; tolN = ops[1]
#if all the target variables are the same value: quit and return value
if len(set(dataSet[:,-1].T.tolist()[0])) == 1: #exit cond 1
return None, leafType(dataSet)
m,n = shape(dataSet)
#the choice of the best feature is driven by Reduction in RSS error from mean
S = errType(dataSet)
bestS = inf; bestIndex = 0; bestValue = 0
for featIndex in range(n-1):
for splitVal in set((dataSet[:,featIndex].T.A.tolist())[0]):
mat0, mat1 = binSplitDataSet(dataSet, featIndex, splitVal)
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN): continue
newS = errType(mat0) + errType(mat1)
if newS < bestS:
bestIndex = featIndex
bestValue = splitVal
bestS = newS
#if the decrease (S-bestS) is less than a threshold don't do the split
if (S - bestS) < tolS:
return None, leafType(dataSet) #exit cond 2
mat0, mat1 = binSplitDataSet(dataSet, bestIndex, bestValue)
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN): #exit cond 3
return None, leafType(dataSet)
return bestIndex,bestValue#returns the best feature to split on
#and the value used for that split
tolS為容許的誤差最小下降值,當劃分一次誤差小於該值時,提升效果不大,直接返回。
tolN為切分的最少樣本數,當切分之後,左右子數量小於tolN,說明切分位元組過小,直接返回。
leafType為葉子結點的建立函式,採用均值方式
def regLeaf(dataSet):#returns the value used for each leaf
return mean(dataSet[:,-1])
errType為誤差估計函式,這裡使用總方差,即均方差乘以樣本總數
def regErr(dataSet):
return var(dataSet[:,-1]) * shape(dataSet)[0]
如果某個節點資料特徵值都相同,則無法繼續劃分,直接返回葉子結點。
if len(set(dataSet[:,-1].T.tolist()[0])) == 1:
遍歷每一個特徵以及相應的特徵值來進行劃分,計算每一種劃分的總方差,返回最優的特徵屬性以及特徵閾值:
for featIndex in range(n-1):
for splitVal in set(dataSet[:,featIndex]):
繪出樣本集的分佈圖:
def plotarr(arr):
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
ax.scatter(arr[:,0].flatten().A[0], arr[:,1].flatten().A[0])
plt.show()
執行測試如下:
載入另一資料集:
得到CART迴歸結果:
2.剪枝策略
當迴歸樹葉子結點過多時,容易發生過擬合,導致泛化效能降低。可以採取剪枝來防止過擬合,有預剪枝以及後剪枝。
def chooseBestSplit(dataSet, leafType=regLeaf, errType=regErr, ops=(1,4)):
tolS = ops[0]; tolN = ops[1]
#if all the target variables are the same value: quit and return value
if len(set(dataSet[:,-1].T.tolist()[0])) == 1: #exit cond 1
return None, leafType(dataSet)
m,n = shape(dataSet)
#the choice of the best feature is driven by Reduction in RSS error from mean
S = errType(dataSet)
bestS = inf; bestIndex = 0; bestValue = 0
for featIndex in range(n-1):
for splitVal in set((dataSet[:,featIndex].T.A.tolist())[0]):
mat0, mat1 = binSplitDataSet(dataSet, featIndex, splitVal)
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN): continue
newS = errType(mat0) + errType(mat1)
if newS < bestS:
bestIndex = featIndex
bestValue = splitVal
bestS = newS
#if the decrease (S-bestS) is less than a threshold don't do the split
if (S - bestS) < tolS:
return None, leafType(dataSet) #exit cond 2
mat0, mat1 = binSplitDataSet(dataSet, bestIndex, bestValue)
if (shape(mat0)[0] < tolN) or (shape(mat1)[0] < tolN): #exit cond 3
return None, leafType(dataSet)
return bestIndex,bestValue#returns the best feature to split on
#and the value used for that split
其中tolS與tolN就能在一定程度上防止過擬合,主要採用預剪枝。通過tolS如果剪枝對於資料集的誤差降低不大則可以不劃分節點,tolN如果剪枝之後葉子結點資料過少,則也可以預剪枝處理。這對引數tolS,tolN的取值提出了很高的要求,往往難以設定求解。
後剪枝:將資料分為訓練集與測試集,首先構建一顆完整樹,然後依次尋找葉子結點,用測試集來判斷將葉子結點合併是否能降低測試誤差,若能則採取後剪枝。
基於已有的樹切分測試資料:
如果存在任一子集是一棵樹,在該子集繼續剪枝過程。
計算將兩個葉子結點合併後的誤差
計算不合並的誤差
若合併會降低誤差,則合併兩個葉子結點
判斷某一節點是否是一棵樹,及判斷是否為字典型別:
def isTree(obj):
return (type(obj).__name__=='dict')
執行樹坍塌過程,返回樹的平均值
def getMean(tree):
if isTree(tree['right']): tree['right'] = getMean(tree['right'])
if isTree(tree['left']): tree['left'] = getMean(tree['left'])
return (tree['left']+tree['right'])/2.0
進行後剪枝處理:
def prune(tree, testData):
if shape(testData)[0] == 0: return getMean(tree) #if we have no test data collapse the tree
#如果該樹是子集,則劃分測試資料,繼續後剪枝
if (isTree(tree['right']) or isTree(tree['left'])):#if the branches are not trees try to prune them
lSet, rSet = binSplitDataSet(testData, tree['spInd'], tree['spVal'])
if isTree(tree['left']): tree['left'] = prune(tree['left'], lSet)
if isTree(tree['right']): tree['right'] = prune(tree['right'], rSet)
#if they are now both leafs, see if we can merge them
#如果節點是葉子結點
if not isTree(tree['left']) and not isTree(tree['right']):
#劃分測試資料
lSet, rSet = binSplitDataSet(testData, tree['spInd'], tree['spVal'])
#沒有合併前的誤差
errorNoMerge = sum(power(lSet[:,-1] - tree['left'],2)) +\
sum(power(rSet[:,-1] - tree['right'],2))
#合併後的誤差 合併後 節點值變為兩個子節點的平均值
treeMean = (tree['left']+tree['right'])/2.0
#取出最後一列y的值真實值與預測值求總方差
errorMerge = sum(power(testData[:,-1] - treeMean,2))
if errorMerge < errorNoMerge:
print ("merging")
return treeMean
else: return tree
else: return tree
載入資料集,建立一顆最完整的分類迴歸樹, 設定tolS=0, tolN=1
>>> dd = mat(regTrees.loadDataSet('ex2.txt'))
>>> mt = regTrees.createTree(dd, ops(0,1))
此時迴歸樹
剪枝之後:
有一部分節點被剪掉。
3.模型樹
前面CART葉子結點為某個值,現在可以把葉子結點變為一個分段函式,即某一個葉子結點下面允許分段函式形式的資料存在。
將一個數據集求出線性擬合函式:
def linearSolve(dataSet): #helper function used in two places
m,n = shape(dataSet)
X = mat(ones((m,n))); Y = mat(ones((m,1)))#create a copy of data with 1 in 0th postion
X[:,1:n] = dataSet[:,0:n-1]; Y = dataSet[:,-1]#and strip out Y
xTx = X.T*X
if linalg.det(xTx) == 0.0:
raise NameError('This matrix is singular, cannot do inverse,\n\
try increasing the second value of ops')
ws = xTx.I * (X.T * Y)
return ws,X,Y
首先進行資料矩陣變換,利用線性模型直接求解迴歸係數ws
如果一個節點是葉子結點時,需要儲存ws係數權向量
def modelLeaf(dataSet):#create linear model and return coeficients
ws,X,Y = linearSolve(dataSet)
return ws
當採用線性模型時,使用平方誤差和來計算總誤差:
def modelErr(dataSet):
ws,X,Y = linearSolve(dataSet)
yHat = X * ws
return sum(power(Y - yHat,2))
載入資料集進行測試:
def testmodel():
tt = mat(loadDataSet('exp2.txt'))
return createTree(tt, modelLeaf, modelErr, (1, 10))
資料集分佈:
4.線性迴歸 迴歸樹 模型樹比較
通過對於同一份資料進行訓練模型,在通過測試集比較不同模型之間的效能差異。
模型樹與迴歸樹預測值的輸出:
def treeForeCast(tree, inData, modelEval=regTreeEval):
if not isTree(tree): return modelEval(tree, inData)
if inData[tree['spInd']] > tree['spVal']:
if isTree(tree['left']): return treeForeCast(tree['left'], inData, modelEval)
else: return modelEval(tree['left'], inData)
else:
if isTree(tree['right']): return treeForeCast(tree['right'], inData, modelEval)
else: return modelEval(tree['right'], inData)
tree訓練樹所得,inData為待預測的樣本行向量,modelEval表示節點型別,當modelEval=regTreeEval說明葉子節點為分類型別,節點值為具體的分類值,即預測值值直接返回節點值即可,當modelEval=modelTreeEval時,說明葉子結點為迴歸型別,節點值為線性權向量,返回值應該與測試資料相乘得到最終預測值。
def regTreeEval(model, inDat):
return float(model)
def modelTreeEval(model, inDat):
n = shape(inDat)[1]
X = mat(ones((1,n+1)))
X[:,1:n+1]=inDat
return float(X*model)
返回測試集的預測值,列向量:
def createForeCast(tree, testData, modelEval=regTreeEval):
m=len(testData)
yHat = mat(zeros((m,1)))
for i in range(m):
yHat[i,0] = treeForeCast(tree, mat(testData[i]), modelEval)
return yHat
利用相關係數來衡量資料擬合情況:
def regtree():
traindata = mat(loadDataSet('bikeSpeedVsIq_train.txt'))
testdata = mat(loadDataSet('bikeSpeedVsIq_test.txt'))
mt = createTree(traindata, ops=(1, 20))
yHat = createForeCast(mt, testdata[:,0])
return corrcoef(yHat, testdata[:,1], rowvar=0)[0,1]
def modeltree():
traindata = mat(loadDataSet('bikeSpeedVsIq_train.txt'))
testdata = mat(loadDataSet('bikeSpeedVsIq_test.txt'))
mt = createTree(traindata, modelLeaf, modelErr, ops=(1, 20))
yHat = createForeCast(mt, testdata[:,0], modelTreeEval)
return corrcoef(yHat, testdata[:,1], rowvar=0)[0,1]
def reg():
traindata = mat(loadDataSet('bikeSpeedVsIq_train.txt'))
testdata = mat(loadDataSet('bikeSpeedVsIq_test.txt'))
ws, x, y = linearSolve(traindata)
yHat=[0]*shape(testdata)[0]
for i in range(shape(testdata)[0]):
yHat[i] = testdata[i,0]*ws[1,0]+ws[0,0]
return corrcoef(yHat, testdata[:,1], rowvar=0)[0,1]
可知模型樹擬合效果最好
5.Tkinter庫圖形化
使用tkinter庫來實現是圖畫化展示資料擬合
from numpy import *
#python3匯入方式不變
from tkinter import *
import regTrees
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
def reDraw(tolS,tolN):
reDraw.f.clf() # clear the figure
reDraw.a = reDraw.f.add_subplot(111)
if chkBtnVar.get():
if tolN < 2: tolN = 2
#繪出模型樹
myTree=regTrees.createTree(reDraw.rawDat, regTrees.modelLeaf,\
regTrees.modelErr, (tolS,tolN))
yHat = regTrees.createForeCast(myTree, reDraw.testDat, \
regTrees.modelTreeEval)
else:
#繪出迴歸樹
myTree=regTrees.createTree(reDraw.rawDat, ops=(tolS,tolN))
yHat = regTrees.createForeCast(myTree, reDraw.testDat)
#繪出資料分佈時,矩陣和一位陣列之間的轉換
reDraw.a.scatter(reDraw.rawDat[:,0].flatten().A[0], reDraw.rawDat[:,1].flatten().A[0], s=5) #use scatter for data set
reDraw.a.plot(reDraw.testDat, yHat, linewidth=2.0) #use plot for yHat
#修改draw()為show()
reDraw.canvas.draw()
def getInputs():
try: tolN = int(tolNentry.get())
except:
tolN = 10
print ("enter Integer for tolN")
tolNentry.delete(0, END)
tolNentry.insert(0,'10')
try: tolS = float(tolSentry.get())
except:
tolS = 1.0
print ("enter Float for tolS")
tolSentry.delete(0, END)
tolSentry.insert(0,'1.0')
return tolN,tolS
def drawNewTree():
tolN,tolS = getInputs()#get values from Entry boxes
reDraw(tolS,tolN)
root=Tk()
reDraw.f = Figure(figsize=(5,4), dpi=100) #create canvas
reDraw.canvas = FigureCanvasTkAgg(reDraw.f, master=root)
# show()方法應該修改為draw()
reDraw.canvas.draw()
reDraw.canvas.get_tk_widget().grid(row=0, columnspan=3)
Label(root, text="tolN").grid(row=1, column=0)
tolNentry = Entry(root)
tolNentry.grid(row=1, column=1)
tolNentry.insert(0,'10')
Label(root, text="tolS").grid(row=2, column=0)
tolSentry = Entry(root)
tolSentry.grid(row=2, column=1)
tolSentry.insert(0,'1.0')
Button(root, text="ReDraw", command=drawNewTree).grid(row=1, column=2, rowspan=3)
chkBtnVar = IntVar()
chkBtn = Checkbutton(root, text="Model Tree", variable = chkBtnVar)
chkBtn.grid(row=3, column=0, columnspan=2)
reDraw.rawDat = mat(regTrees.loadDataSet('sine.txt'))
reDraw.testDat = arange(min(reDraw.rawDat[:,0]),max(reDraw.rawDat[:,0]),0.01)
reDraw(1.0, 10)
root.mainloop()
由於python3的變化,程式碼需要改變如下:
1.from tkinter import * 庫匯入庫名變為小寫
2.reDraw.canvas.draw() FigureCanvasTkAgg物件draw方法而不是show()
3.reDraw.a.scatter(reDraw.rawDat[:,0].flatten().A[0], reDraw.rawDat[:,1].flatten().A[0], s=5)進行資料分佈繪製時需要轉換矩陣為一維陣列。
改變tolS tolN的值,繪製如下: