使用隱馬爾可夫模型生成資料
阿新 • • 發佈:2018-10-31
隱馬爾可夫模型是一個強大的分析時間序列資料的分析工具。假定被建模的系統是帶有隱藏狀態的馬爾可夫過程,這意味著底層系統可以是一組可能的狀態之一,系統經歷一系列的狀態轉換,從而產生一系列輸出。我們僅能觀察輸出,而無法觀測狀態,因為這些狀態被隱藏了。我們的目標是對這些資料建模,以便我們能推斷未知資料的狀態轉換。
- S={s1,s2,…,sN,} 表示所有可能狀態的集合(《統計學習方法》使用Q來表示),
- N 表示可能的狀態數,
- V={v1,v2,…,vM,} 表示可能的觀測的集合,
- M 表示可能的觀測數,
- I 表示長度為T的狀態序列,O是對應的觀測序列,
- A 表示狀態轉移概率矩陣,
- B 表示觀測概率矩陣,
- Pi 表示初始狀態概率向量,
- Lambda =(A, B, Pi)表示隱馬爾可夫模型的引數模型。
通常來說隱馬爾可夫模型(以下簡稱HMM)有3個基本問題:概率計算問題,學習問題,預測問題(也稱解碼問題)。第一個問題對應前向演算法和後向演算法;二,三一般使用Baum-Welch演算法,維特比Viterbi演算法來解決。
大部分時候前向演算法和後向演算法都是為Baum-Welch演算法服務的,而維特比Viterbi演算法是單獨存在的,所以我們先講維特比(以下簡稱Viterbi)演算法。
""" hmm1.py is two slow: delta_lambda = 15 ,x=8 more zhan 30k times 1. with scale 2. be modified Pivot: 1. alpha,beta,gamma,xi are all for Baum_Welch 2. delta,psi are for viterbi 3. Baum_Welch for traning, viterbi for predict Ideas for application: 1. predict 2. speech recognition 3. NLP error: 1. the computing X must be array, need to np.array 2. and the cells must be float, need np.float, if initialization need decimal. 3. initialization is big proplem, every cell shouldn't be 0 """ import numpy as np from scipy.spatial.distance import cdist # computing Chebyshev distance between matrixes class HMM: def __init__(self, Ann, Bnm, Pi, O): self.A = np.array(Ann, np.float) self.B = np.array(Bnm, np.float) self.Pi = np.array(Pi, np.float) self.O = np.array(O, np.float) self.N = self.A.shape[0] self.M = self.B.shape[1] def forward(self): T = len(self.O) alpha = np.zeros((T, self.N), np.float) for i in range(self.N): alpha[0, i] = self.Pi[i] * self.B[i, int(self.O[0])] for t in range(T - 1): for i in range(self.N): summation = 0 # for every i 'summation' should reset to '0' for j in range(self.N): summation += alpha[t, j] * self.A[j, i] alpha[t + 1, i] = summation * self.B[i, int(self.O[t + 1])] summation = 0.0 for i in range(self.N): summation += alpha[T - 1, i] Polambda = summation return Polambda, alpha def forward_with_scale(self): T = len(self.O) alpha_raw = np.zeros((T, self.N), np.float) alpha = np.zeros((T, self.N), np.float) c = [i for i in range(T)] # scalint factor; 0 or sequence doesn't matter for i in range(self.N): alpha_raw[0, i] = self.Pi[i] * self.B[i, [int(self.O[0])]] c[0] = 1.0 / sum(alpha_raw[0, i] for i in range(self.N)) for i in range(self.N): alpha[0, i] = c[0] * alpha_raw[0, i] for t in range(T - 1): for i in range(self.N): summation = 0.0 for j in range(self.N): summation += alpha[t, j] * self.A[j, i] alpha_raw[t + 1, i] = summation * self.B[i, int(self.O[t + 1])] c[t + 1] = 1.0 / sum(alpha_raw[t + 1, i1] for i1 in range(self.N)) for i in range(self.N): alpha[t + 1, i] = c[t + 1] * alpha_raw[t + 1, i] return alpha, c def backward(self): T = len(self.O) beta = np.zeros((T, self.N), np.float) for i in range(self.N): beta[T - 1, i] = 1.0 for t in range(T - 2, -1, -1): for i in range(self.N): summation = 0.0 # 這個必須在這一行,每一次for i 都要重置為0 for j in range(self.N): summation += self.A[i, j] * self.B[j, int(self.O[t + 1])] * beta[t + 1, j] beta[t, i] = summation Polambda = 0.0 for i in range(self.N): Polambda += self.Pi[i] * self.B[i, int(self.O[0])] * beta[0, i] return Polambda, beta def backward_with_scale(self, c): T = len(self.O) beta_raw = np.zeros((T, self.N), np.float) beta = np.zeros((T, self.N), np.float) for i in range(self.N): beta_raw[T - 1, i] = 1.0 beta[T - 1, i] = c[T - 1] * beta_raw[T - 1, i] for t in range(T - 2, -1, -1): for i in range(self.N): summation = 0.0 for j in range(self.N): summation += self.A[i, j] * self.B[j, int(self.O[t + 1])] * beta[t + 1, j] beta[t, i] = c[t] * summation # summation = beta_raw[t,i] return beta def viterbi(self): # given O,lambda .finding I T = len(self.O) I = np.zeros(T, np.float) delta = np.zeros((T, self.N), np.float) psi = np.zeros((T, self.N), np.float) for i in range(self.N): delta[0, i] = self.Pi[i] * self.B[i, int(self.O[0])] psi[0, i] = 0 for t in range(1, T): for i in range(self.N): delta[t, i] = self.B[i, int(self.O[t])] * np.array([delta[t - 1, j] * self.A[j, i] for j in range(self.N)]).max() psi[t, i] = np.array([delta[t - 1, j] * self.A[j, i] for j in range(self.N)]).argmax() P_T = delta[T - 1, :].max() I[T - 1] = delta[T - 1, :].argmax() for t in range(T - 2, -1, -1): I[t] = psi[t + 1, int(I[t + 1])] return I def compute_gamma(self, alpha, beta): T = len(self.O) gamma = np.zeros((T, self.N), np.float) # the probability of Ot=q for t in range(T): for i in range(self.N): gamma[t, i] = alpha[t, i] * beta[t, i] / sum( alpha[t, j] * beta[t, j] for j in range(self.N)) return gamma def compute_xi(self, alpha, beta): T = len(self.O) xi = np.zeros((T - 1, self.N, self.N), np.float) # note that: not T for t in range(T - 1): # note: not T for i in range(self.N): for j in range(self.N): numerator = alpha[t, i] * self.A[i, j] * self.B[j, int(self.O[t + 1])] * beta[t + 1, j] # the multiply term below should not be replaced by 'nummerator', # since the 'i,j' in 'numerator' are fixed. # In addition, should not use 'i,j' below, to avoid error and confusion. denominator = sum(sum( alpha[t, i1] * self.A[i1, j1] * self.B[j1, int(self.O[t + 1])] * beta[t + 1, j1] for j1 in range(int(self.N))) # the second sum for i1 in range(self.N)) # the first sum xi[t, i, j] = numerator / denominator return xi def Baum_Welch(self): # given O list finding lambda model(can derive T form O list) # also given N, M, T = len(self.O) V = [k for k in range(self.M)] # initialization - lambda self.A = np.array(([[0, 1, 0, 0], [0.4, 0, 0.6, 0], [0, 0.4, 0, 0.6], [0, 0, 0.5, 0.5]]), np.float) self.B = np.array(([[0.5, 0.5], [0.3, 0.7], [0.6, 0.4], [0.8, 0.2]]), np.float) # mean value is not a good choice self.Pi = np.array(([1.0 / self.N] * self.N), np.float) # must be 1.0 , if 1/3 will be 0 # self.A = np.array([[1.0 / self.N] * self.N] * self.N) # must array back, then can use[i,j] # self.B = np.array([[1.0 / self.M] * self.M] * self.N) x = 1 delta_lambda = x + 1 times = 0 # iteration - lambda while delta_lambda > x: # x Polambda1, alpha = self.forward() # get alpha Polambda2, beta = self.backward() # get beta gamma = self.compute_gamma(alpha, beta) # use alpha, beta xi = self.compute_xi(alpha, beta) lambda_n = [self.A, self.B, self.Pi] for i in range(self.N): for j in range(self.N): numerator = sum(xi[t, i, j] for t in range(T - 1)) denominator = sum(gamma[t, i] for t in range(T - 1)) self.A[i, j] = numerator / denominator for j in range(self.N): for k in range(self.M): numerator = sum(gamma[t, j] for t in range(T) if self.O[t] == V[k]) # TBD denominator = sum(gamma[t, j] for t in range(T)) self.B[i, k] = numerator / denominator for i in range(self.N): self.Pi[i] = gamma[0, i] # if sum directly, there will be positive and negative offset # computes the Chebyshev distance between two matrixes cdist_A = cdist(lambda_n[0], self.A, 'chebyshev') # cdist_A is still a matrix cdist_B = cdist(lambda_n[1], self.B, 'chebyshev') cdist_Pi = cdist([lambda_n[2]], [self.Pi], 'chebyshev') # turn Pi(vector) to matrix. delta_lambda = sum([sum(sum(cdist_A)), sum(sum(cdist_B)), sum(sum(cdist_Pi))]) times += 1 print(times) return self.A, self.B, self.Pi def Baum_Welch_with_scale(self): T = len(self.O) V = [k for k in range(self.M)] # initialization - lambda , should be float(need .0) self.A = np.array([[0.2, 0.2, 0.3, 0.3], [0.2, 0.1, 0.6, 0.1], [0.3, 0.4, 0.1, 0.2], [0.3, 0.2, 0.2, 0.3]]) self.B = np.array([[0.5, 0.5], [0.3, 0.7], [0.6, 0.4], [0.8, 0.2]]) x = 3 delta_lambda = x + 1 times = 0 # iteration - lambda while delta_lambda > x: # x alpha, c = self.forward_with_scale() beta = self.backward_with_scale(c) gamma = self.compute_gamma(alpha, beta) xi = self.compute_xi(alpha, beta) lambda_n = [self.A, self.B, self.Pi] for i in range(self.N): for j in range(self.N): numerator = sum(xi[t, i, j] for t in range(T - 1)) denominator = sum(gamma[t, i] for t in range(T - 1)) self.A[i, j] = numerator / denominator for j in range(self.N): for k in range(self.M): numerator = sum(gamma[t, j] for t in range(T) if self.O[t] == V[k]) # TBD denominator = sum(gamma[t, j] for t in range(T)) self.B[i, k] = numerator / denominator for i in range(self.N): self.Pi[i] = gamma[0, i] # if sum directly, there will be positive and negative offset # computes the Chebyshev distance between two matrixes cdist_A = cdist(lambda_n[0], self.A, 'chebyshev') cdist_B = cdist(lambda_n[1], self.B, 'chebyshev') cdist_Pi = cdist([lambda_n[2]], [self.Pi], 'chebyshev') # delta_lambda = sum([ sum(sum(cdist_A)), sum(sum(cdist_B)), sum(sum(cdist_Pi)) ]) # try igore B's error, not work casue the lambda is still in local optimal delta_lambda = sum([sum(sum(cdist_A)), sum(sum(cdist_Pi))]) times += 1 print(times) return self.A, self.B, self.Pi def try_viterbi(): A11 = [[0, 1, 0, 0], [0.4, 0, 0.6, 0], [0, 0.4, 0, 0.6], [0, 0, 0.5, 0.5]] B11 = [[0.5, 0.5], [0.3, 0.7], [0.6, 0.4], [0.8, 0.2]] Pi11 = [0.25] * 4 O11 = [0, 1, 0, 1, 0, 0, 1, 0, 1] # 0 denote red, 1 denote white hmm11 = HMM(A11, B11, Pi11, O11) I = hmm11.viterbi() print(I) def try_Baum_Welch(): A21 = np.zeros((4, 4), np.float) B21 = np.zeros((4, 2), np.float) Pi21 = [0.25, 0.25, 0.25, 0.25] O21 = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1] V21 = [k for k in range(2)] Q21 = [q for q in range(4)] # T21 = len(O21) hmm21 = HMM(A21, B21, Pi21, O21) A, B, Pi = hmm21.Baum_Welch() print(A, B, Pi) def try_Baum_Welch_with_scale(): A22 = np.zeros((4, 4), np.float) B22 = np.zeros((4, 2), np.float) Pi22 = [0.25, 0.25, 0.25, 0.25] O22 = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 1] * 4 # V22 = [k for k in range(2)] # Q22 = [q for q in range(4)] # T21 = len(O21) hmm22 = HMM(A22, B22, Pi22, O22) A, B, Pi = hmm22.Baum_Welch_with_scale() print(A, B, Pi) if __name__ == '__main__': print('HMM in python') # try_viterbi() try_Baum_Welch() # try_Baum_Welch_with_scale()
測試結果:
1、try_viterbi()
HMM in python
[0. 1. 0. 1. 2. 3. 2. 3. 2.]
2、try_Baum_Welch()
3、try_Baum_Welch_with_scale()