1. 程式人生 > >機器學習---支援向量機實戰(四)核函式實現

機器學習---支援向量機實戰(四)核函式實現

這節和上一節很像,不同的是,上一篇的是通過支援向量和待分類資料內積進行分類的,只是這裡不同的是,在計算內積時使用核函式進行代替,這裡參考的是機器學習實戰中的核函式,如果前面理解的比較深入,讀程式碼還是很簡單的,這裡的程式碼建議不要剛開始就去讀核函式定義,建議先從測試核函式的程式碼讀,然後搞清楚核函式的引數都代表什麼,同時想想作者為什麼這樣使用,剛開始理解這些可能有點難,多讀幾遍,在讀程式碼時建議時刻想著前面的理論,因為程式碼就是按照前面的理論思路進行寫的,最後就是嘗試自己寫一下,下面的程式碼關鍵的地方均已註釋,上程式碼:

#!/usr/bin/env/python
# -*- coding: utf-8 -*-
# Author: 趙守風
# File name: kernelfunction.py
# Time:2018/10/25
# Email:
[email protected]
from numpy import * import numpy as np from SMO_simplify import * # 其實這裡看定義函式有點不好理解,雖然理論大家都知道,但是實現方式可以由很多種,大家可以先看看這個函式是 # 如何呼叫的即核函式測試,就知道這個函式的引數的意義,下面我把測試關鍵程式碼拿過來便於分析 ''' dataArr,labelArr = loadDataSet('testSetRBF.txt') b,alphas = smoP(dataArr, labelArr, 200, 0.0001, 10000, ('rbf', k1)) #C=200 important datMat=mat(dataArr); labelMat = mat(labelArr).transpose() svInd=nonzero(alphas.A>0)[0] sVs=datMat[svInd] #get matrix of only support vectors labelSV = labelMat[svInd]; for i in range(m): kernelEval = kernelTrans(sVs,datMat[i,:],('rbf', k1)) predict=kernelEval.T * multiply(labelSV,alphas[svInd]) + b if sign(predict)!=sign(labelArr[i]): errorCount += 1 ''' # kernelEval = kernelTrans(sVs,datMat[i,:],('rbf', k1)) # 從這一句我們看到傳入的第一個引數X就是sVs,往上追溯可知這是參與內積的訓練資料點即支援向量 # 第二個引數A,傳入的是datMat[i:0],傳入的是一條資料,即待分類的一個數據,原因是每個待分類的資料都需要和支援向量相內積 # kTup[0]第一個元素是選擇核函式型別的引數,第二個kTup[1]就更簡單了,其實就是徑向基的那個引數,大家看這個就懂了 # K = exp(K / (-1 * kTup[1] ** 2)) def kernelTrans(X, A, kTup): # calc the kernel or transform data to a higher dimensional space m, n = np.shape(X) K = np.mat(np.zeros((m, 1))) if kTup[0] == 'lin': K = X * A.T # linear kernel elif kTup[0] == 'rbf': for j in range(m): deltaRow = X[j, :] - A K[j] = deltaRow * deltaRow.T K = np.exp(K / (-1 * kTup[1] ** 2)) # divide in NumPy is element-wise not matrix like Matlab else: raise NameError('Houston We Have a Problem -- \ That Kernel is not recognized') return K class optStruct: def __init__(self, dataMatIn, classLabels, C, toler, kTup): # Initialize the structure with the parameters self.X = dataMatIn self.labelMat = classLabels self.C = C self.tol = toler self.m = np.shape(dataMatIn)[0] self.alphas = np.mat(zeros((self.m, 1))) self.b = 0 self.eCache = np.mat(zeros((self.m, 2))) # first column is valid flag self.K = np.mat(zeros((self.m, self.m))) for i in range(self.m): # 呼叫核函式處理 self.K[:, i] = kernelTrans(self.X, self.X[i, :], kTup) def calcEk(oS, k): fXk = float(np.multiply(oS.alphas, oS.labelMat).T * oS.K[:, k] + oS.b) Ek = fXk - float(oS.labelMat[k]) return Ek # 內迴圈調選第二個alpha的策略 def selectJ(i, oS, Ei): # this is the second choice -heurstic, and calcs Ej maxK = -1; maxDeltaE = 0; Ej = 0 oS.eCache[i] = [1, Ei] # set valid #choose the alpha that gives the maximum delta E validEcacheList = np.nonzero(oS.eCache[:, 0].A)[0] # 這個大家還記的啥意思吧,不解釋了,不會的翻看前面的看 if (len(validEcacheList)) > 1: for k in validEcacheList: # loop through valid Ecache values and find the one that maximizes delta E if k == i: continue # don't calc for i, waste of time Ek = calcEk(oS, k) deltaE = np.abs(Ei - Ek) if (deltaE > maxDeltaE): maxK = k maxDeltaE = deltaE Ej = Ek return maxK, Ej else: # in this case (first time around) we don't have any valid eCache values j = selectJrand(i, oS.m) Ej = calcEk(oS, j) return j, Ej def updateEk(oS, k): # after any alpha has changed update the new value in the cache Ek = calcEk(oS, k) oS.eCache[k] = [1, Ek] # 內迴圈 def innerL(i, oS): Ei = calcEk(oS, i) # 和前面的一樣,Ei為第一個alpha的差值 if ((oS.labelMat[i] * Ei < -oS.tol) and (oS.alphas[i] < oS.C)) or ( (oS.labelMat[i] * Ei > oS.tol) and (oS.alphas[i] > 0)): j, Ej = selectJ(i, oS, Ei) # 調選第二個引數 alphaIold = oS.alphas[i].copy() alphaJold = oS.alphas[j].copy() # 加入約束 if (oS.labelMat[i] != oS.labelMat[j]): L = max(0, oS.alphas[j] - oS.alphas[i]) H = min(oS.C, oS.C + oS.alphas[j] - oS.alphas[i]) else: L = max(0, oS.alphas[j] + oS.alphas[i] - oS.C) H = min(oS.C, oS.alphas[j] + oS.alphas[i]) if L == H: print("L==H") return 0 # 這裡就開始使用核函數了,2K12-K11-K22,這些都是核函式,在這裡他就是通過呼叫核函式,而以前的是直接資料內積 eta = 2.0 * oS.K[i, j] - oS.K[i, i] - oS.K[j, j] # changed for kernel if eta >= 0: print("eta>=0") return 0 # 更新alpha值 oS.alphas[j] -= oS.labelMat[j] * (Ei - Ej) / eta oS.alphas[j] = clipAlpha(oS.alphas[j], H, L) updateEk(oS, j) # 更新快取 if (abs(oS.alphas[j] - alphaJold) < 0.00001): print("j not moving enough") return 0 # 更新另外一個alpha oS.alphas[i] += oS.labelMat[j] * oS.labelMat[i] * (alphaJold - oS.alphas[j]) # update i by the same amount as j updateEk(oS, i) # 更新快取 #the update is in the oppostie direction # 計算b值,同時計算b值也使用到了核函式,大家看程式碼的時候,時刻回憶前面的理論分析過程,每一句程式碼都和理論保持同步 # 這樣多看,多理解幾次,思路就順了 b1 = oS.b - Ei - oS.labelMat[i] * (oS.alphas[i] - alphaIold) * oS.K[i, i] - oS.labelMat[j] * ( oS.alphas[j] - alphaJold) * oS.K[i, j] b2 = oS.b - Ej - oS.labelMat[i] * (oS.alphas[i] - alphaIold) * oS.K[i, j] - oS.labelMat[j] * ( oS.alphas[j] - alphaJold) * oS.K[j, j] if (0 < oS.alphas[i]) and (oS.C > oS.alphas[i]): oS.b = b1 elif (0 < oS.alphas[j]) and (oS.C > oS.alphas[j]): oS.b = b2 else: oS.b = (b1 + b2) / 2.0 return 1 else: return 0 # 引數大多數和線性一樣的,只是增加了一個kTup即可以選擇線性lin也可以選擇核函式rbf def smoP(dataMatIn, classLabels, C, toler, maxIter, kTup=('lin', 0)): # full Platt SMO # kTup引數主要是傳給自定義的類,後面使用 oS = optStruct(np.mat(dataMatIn), np.mat(classLabels).transpose(), C, toler, kTup) iter = 0 entireSet = True alphaPairsChanged = 0 while (iter < maxIter) and ((alphaPairsChanged > 0) or (entireSet)): alphaPairsChanged = 0 if entireSet: # go over all for i in range(oS.m): # 外迴圈,遍歷所有alppha引數即第一個alpha引數 alphaPairsChanged += innerL(i, oS) # 呼叫內迴圈,尋找是Ei-Ej差值最大的E,同時在inner使用核函式 print("fullSet, iter: %d i:%d, pairs changed %d" % (iter, i, alphaPairsChanged)) iter += 1 else: # go over non-bound (railed) alphas nonBoundIs = np.nonzero((oS.alphas.A > 0) * (oS.alphas.A < C))[0] for i in nonBoundIs: alphaPairsChanged += innerL(i, oS) print("non-bound, iter: %d i:%d, pairs changed %d" % (iter, i, alphaPairsChanged)) iter += 1 if entireSet: entireSet = False # toggle entire set loop elif (alphaPairsChanged == 0): entireSet = True print("iteration number: %d" % iter) return oS.b, oS.alphas # 這個和前面是一樣的,其實就是計算w def calcWs(alphas, dataArr, classLabels): X = np.mat(dataArr) labelMat = np.mat(classLabels).transpose() m, n = np.shape(X) w = np.zeros((n, 1)) for i in range(m): w += np.multiply(alphas[i] * labelMat[i], X[i, :].T) return w def testRbf(k1=1.3): dataArr, labelArr = loadDataSet('testSetRBF.txt') # 讀取資料 # 計算 b和alpha b, alphas = smoP(dataArr, labelArr, 200, 0.0001, 10000, ('rbf', k1)) # C=200 important datMat = np.mat(dataArr); labelMat = np.mat(labelArr).transpose() svInd = np.nonzero(alphas.A > 0)[0] sVs = datMat[svInd] # get matrix of only support vectors labelSV = labelMat[svInd]; print("there are %d Support Vectors" % np.shape(sVs)[0]) m, n = np.shape(datMat) errorCount = 0 for i in range(m): kernelEval = kernelTrans(sVs, datMat[i, :], ('rbf', k1)) predict = kernelEval.T * multiply(labelSV, alphas[svInd]) + b if sign(predict) != sign(labelArr[i]): errorCount += 1 print("the training error rate is: %f" % (float(errorCount) / m)) dataArr, labelArr = loadDataSet('testSetRBF2.txt') errorCount = 0 datMat = np.mat(dataArr); labelMat = np.mat(labelArr).transpose() m, n = np.shape(datMat) for i in range(m): kernelEval = kernelTrans(sVs, datMat[i, :], ('rbf', k1)) predict = kernelEval.T * np.multiply(labelSV, alphas[svInd]) + b if np.sign(predict) != np.sign(labelArr[i]): errorCount += 1 print("the test error rate is: %f" % (float(errorCount) / m))