1. 程式人生 > >標註——隱馬爾科夫模型(HMM)以及Python實現

標註——隱馬爾科夫模型(HMM)以及Python實現

隱馬爾可夫模型(HMM)是可用於標註問題的統計模型。關於HMM通常包含三類問題:1.概率計算 2.引數學習 3.預測狀態。本部落格簡單羅列下HMM的知識點,給出程式碼。詳細地參考李航《統計學習方法》。

模型簡介

HMM描述先由隱藏的馬爾可夫鏈生成狀態序列,各個狀態序列生成一個觀測,組合成最終的觀測序列。故整個模型包含三個要素:

  1. 初始狀態概率向量:生成第一個狀態的概率
  2. 狀態轉移概率矩陣:當前狀態轉移到下一個狀態的概率矩陣
  3. 觀測生成概率矩陣:每個狀態生成各個觀測的概率

概率計算問題

給定模型,求解觀測序列出現的概率。直接求解概率的計算十分複雜,通常採用遞推的方法求解,即從部分序列到整體序列。可以從前往後遞推,也可從後往前遞推。

前向演算法

前向概率:到時刻t的部分觀測序列的時刻t狀態為qiq_i的概率。
時刻t-1的每個可能狀態乘以狀態轉移概率,和再乘以觀測生成概率即為時刻t的前向概率。
後向概率:對於時刻t+1到T的部分序列,在時刻t的狀態為qiq_i的概率。
時刻t+1的每個可能狀態乘以觀測生成概率,再乘以狀態轉移概率, 和即為時刻t的前向概率。

引數學習

隱馬爾可夫模型中"狀態"是隱藏的,是一個典型的含有隱藏變數的模型,故需要使用EM演算法來學習模型引數。EM演算法應用到HMM模型中時稱為Baum-Welch演算法。

預測狀態

狀態序列是比較長且可能性非常多的,首先想到將問題規模縮小。而在隱馬爾可夫模型中,當前狀態是依賴於前一個狀態的,那麼規模縮小後,子問題之間必定是相關的,應當採用動態規劃的演算法,即Viterbi演算法。**該演算法將狀態序列視為一條路徑,那麼路徑的長度就是狀態轉移概率乘以觀測生成概率,再求和。問題變為求解最長路徑。**如圖所示:path

  • 子問題結構:從0時刻到時刻t且狀態為i的所有路徑中的最長路徑。定義最長路徑長度為δt(i)\delta_t(i)
  • 遞推式:時刻t+1且狀態為i的最長路徑長度必然是到時刻t的最長路徑長度加上t到t+1的最長路徑。
  • 構造最優解:在求解子問題的過程中,儲存上一個時刻的狀態。最後遞迴構建最優解。

程式碼

"""
隱馬爾科夫模型
三類問題:1.概率計算 2.學習問題(引數估計) 3.預測問題(狀態序列的預測)
"""
import numpy as np
from itertools import accumulate


class GenData:
    """
    根據隱馬爾科夫模型生成相應的觀測資料
    """
def __init__(self, hmm, n_sample): self.hmm = hmm self.n_sample = n_sample def _locate(self, prob_arr): # 給定概率向量,返回狀態 seed = np.random.rand(1) for state, cdf in enumerate(accumulate(prob_arr)): if seed <= cdf: return state return def init_state(self): # 根據初始狀態概率向量,生成初始狀態 return self._locate(self.hmm.S) def state_trans(self, current_state): # 轉移狀態 return self._locate(self.hmm.A[current_state]) def gen_obs(self, current_state): # 生成觀測 return self._locate(self.hmm.B[current_state]) def gen_data(self): # 根據模型產生觀測資料 current_state = self.init_state() start_obs = self.gen_obs(current_state) state = [current_state] obs = [start_obs] n = 0 while n < self.n_sample - 1: n += 1 current_state = self.state_trans(current_state) state.append(current_state) obs.append(self.gen_obs(current_state)) return state, obs class HMM: def __init__(self, n_state, n_obs, S=None, A=None, B=None): self.n_state = n_state # 狀態的個數n self.n_obs = n_obs # 觀測的種類數m self.S = S # 1*n, 初始狀態概率向量 self.A = A # n*n, 狀態轉移概率矩陣 self.B = B # n*m, 觀測生成概率矩陣 def _alpha(hmm, obs, t): # 計算時刻t各個狀態的前向概率 b = hmm.B[:, obs[0]] alpha = np.array([hmm.S * b]) # n*1 for i in range(1, t + 1): alpha = (alpha @ hmm.A) * np.array([hmm.B[:, obs[i]]]) return alpha[0] def forward_prob(hmm, obs): # 前向演算法計算最終生成觀測序列的概率, 即各個狀態下概率之和 alpha = _alpha(hmm, obs, len(obs) - 1) return np.sum(alpha) def _beta(hmm, obs, t): # 計算時刻t各個狀態的後向概率 beta = np.ones(hmm.n_state) for i in reversed(range(t + 1, len(obs))): beta = np.sum(hmm.A * hmm.B[:, obs[i]] * beta, axis=1) return beta def backward_prob(hmm, obs): # 後向演算法計算生成觀測序列的概率 beta = _beta(hmm, obs, 0) return np.sum(hmm.S * hmm.B[:, obs[0]] * beta) def fb_prob(hmm, obs, t=None): # 將前向和後向合併 if t is None: t = 0 res = _alpha(hmm, obs, t) * _beta(hmm, obs, t) return res.sum() def _gamma(hmm, obs, t): # 計算時刻t處於各個狀態的概率 alpha = _alpha(hmm, obs, t) beta = _beta(hmm, obs, t) prob = alpha * beta return prob / prob.sum() def point_prob(hmm, obs, t, i): # 計算時刻t處於狀態i的概率 prob = _gamma(hmm, obs, t) return prob[i] def _xi(hmm, obs, t): alpha = np.mat(_alpha(hmm, obs, t)) beta_p = _beta(hmm, obs, t + 1) obs_prob = hmm.B[:, obs[t + 1]] obs_beta = np.mat(obs_prob * beta_p) alpha_obs_beta = np.asarray(alpha.T * obs_beta) xi = alpha_obs_beta * hmm.A return xi / xi.sum() def fit(hmm, obs_data, maxstep=100): # 利用Baum-Welch演算法學習 hmm.A = np.ones((hmm.n_state, hmm.n_state)) / hmm.n_state hmm.B = np.ones((hmm.n_state, hmm.n_obs)) / hmm.n_obs hmm.S = np.random.sample(hmm.n_state) # 初始狀態概率矩陣(向量),的初始化必須隨機狀態,否則容易陷入區域性最優 hmm.S = hmm.S / hmm.S.sum() step = 0 while step < maxstep: xi = np.zeros_like(hmm.A) gamma = np.zeros_like(hmm.S) B = np.zeros_like(hmm.B) S = _gamma(hmm, obs_data, 0) for t in range(len(obs_data) - 1): tmp_gamma = _gamma(hmm, obs_data, t) gamma += tmp_gamma xi += _xi(hmm, obs_data, t) B[:, obs_data[t]] += tmp_gamma # 更新 A for i in range(hmm.n_state): hmm.A[i] = xi[i] / gamma[i] # 更新 B tmp_gamma_end = _gamma(hmm, obs_data, len(obs_data) - 1) gamma += tmp_gamma_end B[:, obs_data[-1]] += tmp_gamma_end for i in range(hmm.n_state): hmm.B[i] = B[i] / gamma[i] # 更新 S hmm.S = S step += 1 return hmm def predict(hmm, obs): # 採用Viterbi演算法預測狀態序列 N = len(obs) nodes_graph = np.zeros((hmm.n_state, N), dtype=int) # 儲存時刻t且狀態為i時, 前一個時刻t-1的狀態,用於構建最終的狀態序列 delta = hmm.S * hmm.B[:, obs[0]] # 儲存到t時刻,且此刻狀態為i的最大概率 nodes_graph[:, 0] = range(hmm.n_state) for t in range(1, N): new_delta = [] for i in range(hmm.n_state): temp = [hmm.A[j, i] * d for j, d in enumerate(delta)] # 當前狀態為i時, 選取最優的前一時刻狀態 max_d = max(temp) new_delta.append(max_d * hmm.B[i, obs[t]]) nodes_graph[i, t] = temp.index(max_d) delta = new_delta current_state = np.argmax(nodes_graph[:, -1]) path = [] t = N while t > 0: path.append(current_state) current_state = nodes_graph[current_state, t - 1] t -= 1 return list(reversed(path)) if __name__ == '__main__': # S = np.array([0.2, 0.4, 0.4]) # A = np.array([[0.5, 0.2, 0.3], [0.3, 0.5, 0.2], [0.2, 0.3, 0.5]]) # B = np.array([[0.5, 0.2, 0.3], [0.4, 0.2, 0.4], [0.6, 0.3, 0.1]]) # hmm_real = HMM(3, 3, S, A, B) # g = GenData(hmm_real, 500) # state, obs = g.gen_data() # 檢測生成的資料 # state, obs = np.array(state), np.array(obs) # ind = np.where(state==2)[0] # from collections import Counter # obs_ind = obs[ind] # c1 = Counter(obs_ind) # n = sum(c1.values()) # for o, val in c.items(): # print(o, val/n) # ind_next = ind + 1 # ind_out = np.where(ind_next==1000) # ind_next = np.delete(ind_next, ind_out) # state_next = state[ind_next] # c2 = Counter(state_next) # n = sum(c2.values()) # for s, val in c2.items(): # print(s, val/n) # 預測 S = np.array([0.5, 0.5]) A = np.array([[0.8, 1], [0.8, 0.8]]) B = np.array([[0.2, 0.0, 0.8], [0, 0.8, 0.2]]) hmm = HMM(2, 3, S, A, B) g = GenData(hmm, 200) state, obs = g.gen_data() print(obs) path = predict(hmm, obs) score = sum([int(i == j) for i, j in zip(state, path)]) print(score / len(path)) # 學習 # import matplotlib.pyplot as plt # # # def triangle_data(n_sample): # # 生成三角波形狀的序列 # data = [] # for x in range(n_sample): # x = x % 6 # if x <= 3: # data.append(x) # else: # data.append(6 - x) # return data # # # hmm = HMM(10, 4) # data = triangle_data(30) # hmm = fit(hmm, data) # g = GenData(hmm, 30) # state, obs = g.gen_data() # # x = [i for i in range(30)] # plt.scatter(x, obs, marker='*', color='r') # plt.plot(x, data, color='g') # plt.show()

我的GitHub
注:如有不當之處,請指正。

相關推薦

no