基於監督學習的隱馬爾科夫模型(HMM)實現中文分詞
阿新 • • 發佈:2019-02-13
因為語料是分好詞來訓練的,所以程式碼寫起來還算簡單,HMM的引數pi,A,B訓練只是做一個簡單的統計工作
反倒是寫維特比演算法時出了一些問題,因為之前都是紙上談兵,真正寫這個演算法才發現之前有的地方沒有搞明白!!
維特比的演算法大致如下:
注:下面[]中代表下標
在計算δ[t](i)是需要遍歷δ[t-1](j),j遍歷所有的隱狀態,找到一個隱狀態使得δ[t](i)最大,計算完所有的δ後是一個觀測序列O長度*狀態長度的矩陣,特別注意的是這時候並不是在矩陣中找出每一個觀測最優可能的狀態構成的狀態序列成為預測序列,因為計算δ[t](i)時是由前一個δ[t-1](j)影響的,而j這個狀態影響了δ[t](i)使得最大,我們需要一個矩陣(大小同δ矩陣,記為states)儲存δ[t-1](j)是由哪一個狀態影響才得到最優狀態,最後計算預測序列時,先從之前的δ矩陣的最後一行,也就是最後一個觀測值找出它最有可能的狀態,然後從這個狀態出發,由之前的狀態矩陣states回溯得到預測序列,這將在程式碼中體現!
下面是python程式碼實現:
# coding: utf-8 import numpy as np import math #每一個字的4個隱藏狀態 0123/start(該字是一個詞的開頭) middle(該字是一個詞的中間部分) end(該字是一個詞的結束) single(該字是單獨就是一個詞) #監督學習:用的語料庫是分好詞的來訓練 #注:ord()函式獲取一個字元的Unicode編碼 infinite = float(-2.0**31)#負無窮 #訓練引數(已知O和I的情況下監督學習),只需要做一個統計就可以得到引數 #傳入字串語料資料,必須是處理好分詞的,並且傳入分隔符 def mle(train_data,split_char=" "): tokens = train_data.split(split_char) #模型引數 pi = np.zeros(4)#狀態概率pai A = np.zeros((4,4))#狀態轉移矩陣 B = np.zeros((4,65536))#發射矩陣,某個隱狀態下的觀測分佈 last_token = tokens[0] for token in tokens: token = token.strip() token_len = len(token) last_token_state =3 if len(last_token)==1 else 2#若上一個token長度為1則為single轉移到某個狀態否則是end轉移到某個狀態 #不為空字元判斷 if token_len ==0: continue #單字成詞 if token_len == 1: pi[3]+=1 A[last_token_state][3]+=1#上一個狀態轉移到single #給出狀態single下出現的觀測字元 B[3][ord(token)]+=1 elif token_len==2: pi[0]+=1 pi[2]+=1 #start轉移到end A[0][2]+=1 A[last_token_state][0]+=1 #給出狀態start和end下出現的觀測字元 B[0][ord(token[0])]+=1 B[2][ord(token[1])]+=1 else: pi[0]+=1 pi[2]+=1 pi[1]+=token_len -2 #start轉移到middle,middle轉移到middle,middle轉移到end A[0][1]+=1 A[1][1]+=(token_len-3) A[1][2]+=1 A[last_token_state][0]+=1 #給出狀態start,middle,end下出現的觀測字元 B[0][ord(token[0])]+=1#start B[2][ord(token[token_len-1])]+=1#end for i in range(1,token_len-1):#middle B[1][ord(token[i])]+=1 last_token = token #取對數 sum1 = np.sum(pi) for i in range(len(pi)): pi[i] = math.log(pi[i] / sum1) log_val(A) log_val(B) return pi,A,B #對pi,A,B結果取對數,因為單個數值太小,會溢位 def log_val(data): #遍歷矩陣每一行,每一行概率相加為1,做取對數處理 col_len = data.shape[1] for k,line in enumerate(data): sum1 = np.sum(line) log_sum = math.log(sum1) for i in range(col_len): if data[k][i] == 0: data[k][i] = infinite else: data[k][i] = math.log(data[k][i]) - log_sum #預測維特比演算法 def viterbi(pi,A,B,O): O = O.strip() O_len = len(O) pi_len = len(pi) if O_len==0: return #儲存所有狀態的最大值是由哪一個狀態產生的也就是計算δ[t](i)時,是由哪一個δ[t-1](q)產生的,q就是哪個狀態 states = np.full(shape=(O_len,pi_len),fill_value=0.0) #儲存計算過所有的計算的δ deltas = np.full(shape=(O_len,pi_len),fill_value=0.0) #初始化計算最優P(I,O1) = max{P(O1|I)*p(I)} for j in range(0,pi_len): deltas[0][j] = pi[j] + B[j][ord(O[0])]#變加法是因為取了對數 #dp計算P(I|O1,O2,O3,...Ot,I1,I2...It-1) for t in range(1,O_len): for i in range(0,pi_len):#計算每一個δ[t](i=q1...q[pi_len]) = max{δt[j]*A[ji]*B[qi|Ot]},j是遍歷所有狀態 deltas[t][i] = deltas[t-1][0]+A[0][i] #尋找最大的δ[t](i) for j in range(1,pi_len): current = deltas[t-1][j]+A[j][i] if current > deltas[t][i]: deltas[t][i] = current #儲存當前δ[t](i)取得最大值是是從上一個哪個狀態來的 states[t][i] = j deltas[t][i] +=B[i][ord(O[t])] #回溯找到最優概率路徑 max1 = deltas[O_len-1][0] best_state = np.zeros(O_len) #先找出最後一個觀測的最可能狀態是什麼 for i in range(1,pi_len): if deltas[O_len-1][i] > max1: max1 = deltas[O_len-1][i] best_state[O_len-1] = i #由最後一個觀測得到的最好狀態往前回溯找出狀態序列 for i in range(O_len-2, -1, -1): best_state[i] = states[i+1][int(best_state[i+1])] return best_state def output_words(decode,O): T = len(O) for i in range(0,T): #如果預測當前字元最有可能的狀態是end或者single就分詞 if decode[i] ==2 or decode[i] == 3: print(O[i],"|",end='') else: print(O[i],end='') #開始訓練 f = open("./pku_training.utf8","r",encoding="utf-8") data = f.read()[3:]; f.close() pi,A,B = mle(data)#訓練結束 #測試 f2 = open("./novel.txt","r",encoding="utf-8") O = f2.read().strip() states = viterbi(pi, A, B, O) output_words(states, O)
測試結果:
附上訓練語料庫和測試語料庫: