機器學習_6.隱馬演算法的程式碼實現
借鑑:https://github.com/Continue7777/HMM/
依舊基於三個問題進行實現
1.評估
(1)描述
給定觀測序列O(o1,o2,…,oT)和模型u = (π,A,B),求出P(O | u),即給定模型下觀測序列的概率是多少?
(2)實際演算法
不再提窮舉這件事了,無法應對長序列。
定義前向變數運算元αt(i)=P(O1,O2,…,Ot,Xt = Si | u),前向變量表示t時刻,si狀態結束是之前路徑上的總概率。可知,對αT(i)求和便能得到評估結果。而此時時間複雜度因為有了動態規劃,乘法次數在T*N^2的級別。
演算法描述:
(3)Python前向演算法程式碼
#計算公式中的alpha二維陣列 def _forward(self,observationsSeq): T = len(observationsSeq) N = len(self.pi) alpha = np.zeros((T,N),dtype=float) alpha[0,:] = self.pi * self.B[:,observationsSeq[0]] #numpy可以簡化迴圈 for t in range(1,T): for n in range(0,N): alpha[t,n] = np.dot(alpha[t-1,:],self.A[:,n]) * self.B[n,observationsSeq[t]] #使用內積簡化程式碼 return alpha
2.預測
(1)描述
維位元演算法:
維特比演算法是一個特殊但應用最廣的動態規劃演算法,利用動態規劃,可以解決任何一個圖中的最短路徑問題。而維特比演算法是針對一個特殊的圖——籬笆網路的有向圖(Lattice )的最短路徑問題而提出的。 它之所以重要,是因為凡是使用隱含馬爾可夫模型描述的問題都可以用它來解碼,包括今天的數字通訊、語音識別、機器翻譯、拼音轉漢字、分詞等。——《數學之美》
(2)維位元演算法
狀態1時刻概率都是初始概率
θt(j) = max( θt-1(i) * aij * bj ot) i∈[1,N]
狀態t時刻為t-1時刻θ變數*轉移概率的最大值
φt(j) = argmax( θt-1(i) * aij ) i∈[1,N]
狀態t時刻回溯上一個最大概率路徑
(3)Python維位元演算法程式碼
#維特比演算法進行預測,即解碼,返回最大路徑與該路徑概率
def viterbi(self,observationsSeq):
T = len(observationsSeq)
N = len(self.pi)
prePath = np.zeros((T,N),dtype=int)
dpMatrix = np.zeros((T,N),dtype=float)
dpMatrix[0,:] = self.pi * self.B[:,observationsSeq[0]]
for t in range(1,T):
for n in range(N):
probs = dpMatrix[t-1,:] * self.A[:,n] * self.B[n,observationsSeq[t]]
prePath[t,n] = np.argmax(probs)
dpMatrix[t,n] = np.max(probs)
maxProb = np.max(dpMatrix[T-1,:])
maxIndex = np.argmax(dpMatrix[T-1,:])
path = [maxIndex]
for t in reversed(range(1,T)):
path.append(prePath[t,path[-1]])
path.reverse()
return maxProb,path
3.學習
(1)描述
給定一個觀察序列,求出模型的引數(π,A,B),使用演算法baum-welch演算法。
(2)baum-welch演算法
#計算公式中的beita二維陣列
def _backward(self,observationsSeq):
T = len(observationsSeq)
N = len(self.pi)
beta = np.zeros((T,N),dtype=float)
beta[T-1,:] = 1
for t in reversed(range(T-1)):
for n in range(N):
beta[t,n] = np.sum(self.A[n,:] * self.B[:,observationsSeq[t+1]] * beta[t+1,:])
return beta
#前後向演算法學習引數
def baumWelch(self,observationsSeq,criterion=0.001):
T = len(observationsSeq)
N = len(self.pi)
while True:
# alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm)
# Initialize alpha
alpha = self._forward(observationsSeq)
# beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm)
# Initialize beta
beta = self._backward(observationsSeq)
#根據公式求解XIt(i,j) = P(qt=Si,qt+1=Sj | O,λ)
xi = np.zeros((T-1,N,N),dtype=float)
for t in range(T-1):
denominator = np.sum( np.dot(alpha[t,:],self.A) * self.B[:,observationsSeq[t+1]] * beta[t+1,:])
for i in range(N):
molecular = alpha[t,i] * self.A[i,:] * self.B[:,observationsSeq[t+1]] * beta[t+1,:]
xi[t,i,:] = molecular / denominator
#根據xi就可以求出gamma,注意最後缺了一項要單獨補上來
gamma = np.sum(xi,axis=2)
prod = (alpha[T-1,:] * beta[T-1,:])
gamma = np.vstack((gamma, prod /np.sum(prod)))
newpi = gamma[0,:]
newA = np.sum(xi,axis=0) / np.sum(gamma[:-1,:],axis=0).reshape(-1,1)
newB = np.zeros(self.B.shape,dtype=float)
for k in range(self.B.shape[1]):
mask = observationsSeq == k
newB[:,k] = np.sum(gamma[mask,:],axis=0) / np.sum(gamma,axis=0)
if np.max(abs(self.pi - newpi)) < criterion and \ np.max(abs(self.A - newA)) < criterion and \ np.max(abs(self.B - newB)) < criterion:
break
self.A,self.B,self.pi = newA,newB,newpi