1. 程式人生 > >簡單上手用於中文分詞的隱馬爾科夫模型

簡單上手用於中文分詞的隱馬爾科夫模型

前段時間一直在看自然語言處理方面的知識,所以不可避免的接觸到了隱馬爾科夫模型和條件隨機場模型。這兩個模型可以說是自然語言處理方向的基礎模型了,所以自然而然對它們上心許多。它們之間也確實是有許多的異同,當時為了清晰地區分開它們,確實是花費了我好一陣子時間,而且到現在自己也還沒有完完全全把它們吃透,但還是斗膽把自己整理的一些資料和心得貼出來供大家參考,希望大家都能少走彎路,節約時間。

隱馬爾科夫模型

第一部分我會簡單介紹隱馬爾科夫模型,我給出的介紹是我在知乎上看到的比較好的解答,所以我會借鑑部分(其實是能力不夠,寫不出來通俗易懂的東西啦)

這個是原作連線https://www.zhihu.com/question/35866596/answer/236886066

HMM屬於典型的生成式模型。應該是要從訓練資料中學到資料的各種分佈,那麼有哪些分佈呢以及是什麼呢?直接正面回答的話,正是HMM的5要素,其中有3個就是整個資料的不同角度的概率分佈:

  • N ,隱藏狀態集 N = \lbrace q_{1}, \cdots, q_{N} \rbrace , 我的隱藏節點不能隨意取,只能限定取包含在隱藏狀態集中的符號。
  • M ,觀測集 M = \lbrace v_{1}, \cdots, v_{M} \rbrace , 同樣我的觀測節點不能隨意取,只能限定取包含在觀測狀態集中的符號。
  • A ,狀態轉移概率矩陣,這個就是其中一個概率分佈。他是個矩陣, A= [a_{ij}]_{N \times N} (N為隱藏狀態集元素個數),其中 a_{ij} = P(i_{t+1}|i_{t}), i_{t} 即第i個隱狀態節點,即所謂的狀態轉移嘛。
  • B ,觀測概率矩陣,這個就是另一個概率分佈。他是個矩陣, B = [b_{ij}]_{N \times M} (N為隱藏狀態集元素個數,M為觀測集元素個數),其中 b_{ij} = P(o_{t}|i_{t}), o_{t}
    即第i個觀測節點, i_{t} 即第i個隱狀態節點,即所謂的觀測概率(發射概率)嘛。
  • π ,在第一個隱狀態節點 i_{t} ,我得人工單獨賦予,我第一個隱狀態節點的隱狀態是 N 中的每一個的概率分別是多少,然後 π 就是其概率分佈。

所以圖看起來是這樣的:

看的很清楚,我的模型先去學習要確定以上5要素,之後在inference階段的工作流程是:首先,隱狀態節點 i_{t} 是不能直接觀測到的資料節點, o_{t} 才是能觀測到的節點,並且注意箭頭的指向表示了依賴生成條件關係, i_{t} 在A的指導下生成下一個隱狀態節點 i_{t+1} ,並且 i_{t}B 的指導下生成依賴於該 i_{t} 的觀測節點 o_{t} , 並且我只能觀測到序列 (o_{1}, \cdots, o_{i})

好,舉例子說明(序列標註問題,POS,標註集BES):

input: "學習出一個模型,然後再預測出一條指定"

expected output: 學/B 習/E 出/S 一/B 個/E 模/B 型/E ,/S 然/B 後/E 再/E 預/B 測/E ……

其中,input裡面所有的char構成的字表,形成觀測集 M ,因為字序列在inference階段是我所能看見的;標註集BES構成隱藏狀態集 N ,這是我無法直接獲取的,也是我的預測任務;至於 A、B、π ,這些概率分佈資訊(上帝資訊)都是我在學習過程中所確定的引數。

然後一般初次接觸的話會疑問:為什麼要這樣?……好吧,就應該是這樣啊,根據具有同時帶著隱藏狀態節點和觀測節點的型別的序列,在HMM下就是這樣子建模的。

下面來點高層次的理解:

  1. 根據概率圖分類,可以看到HMM屬於有向圖,並且是生成式模型,直接對聯合概率分佈建模 P(O,I) = \sum_{t=1}^{T}P(O_{t} | O_{t-1})P(I_{t} | O_{t}) (注意,這個公式不在模型執行的任何階段能體現出來,只是我們都去這麼來表示HMM是個生成式模型,他的聯合概率 P(O,I) 就是這麼計算的)。
  2. 並且B中 b_{ij} = P(o_{t}|i_{t}) ,這意味著o對i有依賴性。
  3. 在A中, a_{ij} = P(i_{t+1}|i_{t}) ,也就是說只遵循了一階馬爾科夫假設,1-gram。試想,如果資料的依賴超過1-gram,那肯定HMM肯定是考慮不進去的。這一點限制了HMM的效能。

模型執行過程

模型的執行過程(工作流程)對應了HMM的3個問題。

學習訓練過程

HMM學習訓練的過程,就是找出資料的分佈情況,也就是模型引數的確定。

主要學習演算法按照訓練資料除了觀測狀態序列 (o_{1}, \cdots, o_{i}) 是否還有隱狀態序列 (i_{1}, \cdots, i_{i}) 分為:

  • 極大似然估計, with 隱狀態序列
  • Baum-Welch(前向後向), without 隱狀態序列

感覺不用做很多的介紹,都是很實實在在的演算法,看懂了就能理解。簡要提一下。

1. 極大似然估計

一般做NLP的序列標註等任務,在訓練階段肯定是有隱狀態序列的。所以極大似然估計法是非常常用的學習演算法,我見過的很多程式碼裡面也是這麼計算的。比較簡單。

  • step1. 算A

\hat{a_{ij}} = \frac{A_{ij}}{\sum_{j=1}^{N}A_{ij}}

  • step2. 算B

\hat{b_{j}}(k) = \frac{B_{jk}}{\sum_{k=1}^{M}B_{jk}}

  • step3. 直接估計 π

比如說,在程式碼裡計算完了就是這樣的:

2. Baum-Welch(前向後向)

就是一個EM的過程,如果你對EM的工作流程有經驗的話,對這個Baum-Welch一看就懂。EM的過程就是初始化一套值,然後迭代計算,根據結果再調整值,再迭代,最後收斂……好吧,這個理解是沒有捷徑的,去隔壁鑽研EM吧。

這裡只提一下核心。因為我們手裡沒有隱狀態序列 (i_{1}, \cdots, i_{i}) 資訊,所以我先必須給初值 a_{ij}^{0}, b_{j}(k)^{0}, \pi^{0} ,初步確定模型,然後再迭代計算出 a_{ij}^{n}, b_{j}(k)^{n}, \pi^{n} ,中間計算過程會用到給出的觀測狀態序列 (o_{1}, \cdots, o_{i}) 。另外,收斂性由EM的XXX定理保證。

序列標註(解碼)過程

好了,學習完了HMM的分佈引數,也就確定了一個HMM模型。需要注意的是,這個HMM是對我這一批全部的資料進行訓練所得到的引數。

序列標註問題也就是“預測過程”,通常稱為解碼過程。對應了序列建模問題3.。對於序列標註問題,我們只需要學習出一個HMM模型即可,後面所有的新的sample我都用這一個HMM去apply。

我們的目的是,在學習後已知了 P(Q,O) ,現在要求出 P(Q|O) ,進一步

Q_{max} = argmax_{allQ}\frac{P(Q,O)}{P(O)}

再直白點就是,我現在要在給定的觀測序列下找出一條隱狀態序列,條件是這個隱狀態序列的概率是最大的那個。

具體地,都是用Viterbi演算法解碼,是用DP思想減少重複的計算。Viterbi也是滿大街的,不過要說的是,Viterbi不是HMM的專屬,也不是任何模型的專屬,他只是恰好被滿足了被HMM用來使用的條件。誰知,現在大家都把Viterbi跟HMM捆綁在一起了, shame。

Viterbi計算有向無環圖的一條最大路徑,應該還好理解。如圖:

關鍵是注意,每次工作熱點區只涉及到t 與 t-1,這對應了DP的無後效性的條件。如果對某些同學還是很難理解,請參考這個答案下@Kiwee的回答吧。

序列概率過程

我通過HMM計算出序列的概率又有什麼用?針對這個點我把這個問題詳細說一下。

實際上,序列概率過程對應了序列建模問題2.,即序列分類。
在3.2.2第一句話我說,在序列標註問題中,我用一批完整的資料訓練出了一支HMM模型即可。好,那在序列分類問題就不是訓練一個HMM模型了。我應該這麼做(結合語音分類識別例子):

目標:識別聲音是A發出的還是B發出的。
HMM建模過程:
1. 訓練:我將所有A說的語音資料作為dataset_A,將所有B說的語音資料作為dataset_B(當然,先要分別對dataset A ,B做預處理encode為元資料節點,形成sequences),然後分別用dataset_A、dataset_B去訓練出HMM_A/HMM_B
2. inference:來了一條新的sample(sequence),我不知道是A的還是B的,沒問題,分別用HMM_A/HMM_B計算一遍序列的概率得到 P_{A}(S)、P_{B}(S) ,比較兩者大小,哪個概率大說明哪個更合理,更大概率作為目標類別。

所以,本小節的理解重點在於,如何對一條序列計算其整體的概率。即目標是計算出 P(O|λ) 。這個問題前輩們在他們的經典中說的非常好了,比如參考李航老師整理的:

  • 直接計演算法(窮舉搜尋)
  • 前向演算法
  • 後向演算法

後面兩個演算法採用了DP思想,減少計算量,即每一次直接引用前一個時刻的計算結果以避免重複計算,跟Viterbi一樣的技巧。

HMM實現中文分詞原始碼

import pickle
import os


class HMM(object):
    def __init__(self):
        # 主要是用於儲存演算法中間結果,不用每次都訓練模型
        self.model_file = 'data/hmm_model.pkl'
        # 狀態值集合
        self.state_list = {'B', 'M', 'E', 'S'}
        # 引數載入,用於判斷是否需要重新載入model_file
        self.load_para = False

    # 接受一個引數,用於判斷是否載入中間檔案結果
    def try_laod_model(self, trained):
        if trained:
            with open(self.model_file, 'rb') as f:
                self.A_dic = pickle.load(f)
                self.B_dic = pickle.load(f)
                self.Pi_dic = pickle.load(f)
                self.load_para = True
        else:
            # 狀態轉移概率(狀態->狀態的條件概率)
            self.A_dic = {}
            # 發射概率(狀態->詞語的條件概率)
            self.B_dic = {}
            # 狀態的初始概率
            self.Pi_dic = {}
            self.load_para = False

    # 訓練給定的分詞語料,得出HMM所需的初始概率,轉移概率以及發射概率
    def train(self, path):
        # 重置幾個基本的概率矩陣
        self.try_laod_model(False)
        # 統計狀態出現次數,求p(o)
        count_dic = {s: 0 for s in self.state_list}
        # 初始化相關引數
        self.init_parameters()
        line_num = -1
        # 觀察者集合,主要是字以及標點等
        words = set()
        with open(path, encoding='utf-8') as f:
            for line in f:
                line_num += 1
                line = line.strip()
                if not line:
                    continue
                # 集合為字以及標點等
                word_list = [i for i in line if i != ' ']
                # 更新字的集合
                words |= set(word_list)
                linelist = line.split()
                line_state = []
                for w in linelist:
                    line_state.extend(self.makeLabel(w))
                for k, v in enumerate(line_state):
                    count_dic[v] += 1
                    if k == 0:
                        # 每個句子的第一個字的狀態,用於計算初始狀態概率
                        self.Pi_dic[v] += 1
                    else:
                        # 計算轉移概率
                        self.A_dic[line_state[k - 1]][v] += 1
                        # 計算髮射概率
                        self.B_dic[line_state[k]][word_list[k]] = self.B_dic[line_state[k]].get(word_list[k], 0) + 1
        self.Pi_dic = {k: v / line_num for k, v in self.Pi_dic.items()}
        self.A_dic = {k: {k1: v1 / count_dic[k] for k1, v1 in v.items()} for k, v in self.A_dic.items()}
        # 加一平滑
        self.B_dic = {k: {k1: (v1 + 1) / count_dic[k] for k1, v1 in v.items()} for k, v in self.B_dic.items()}
        print("B_dic['S'].", self.B_dic['S'].keys())
        with open(self.model_file, 'wb') as f:
            pickle.dump(self.A_dic, f)
            pickle.dump(self.B_dic, f)
            pickle.dump(self.Pi_dic, f)
        print("B_dic['S'].", self.B_dic['S'].keys())
        return self

    # 初始化相關引數
    def init_parameters(self):
        for state in self.state_list:
            self.A_dic[state] = {s: 0.0 for s in self.state_list}
            self.Pi_dic[state] = 0.0
            self.B_dic[state] = {}

    # 為text句子打上狀態標記
    def makeLabel(self, text):
        out_text = []
        if len(text) == 1:
            out_text.append('S')
        else:
            out_text += ['B'] + ['M'] * (len(text) - 2) + ['E']
        return out_text

    def viterbi(self, text, states, start_p, trans_p, emit_p):
        V = [{}]
        path = {}
        for y in states:
            V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
            path[y] = [y]
        for t in range(1, len(text)):
            V.append({})
            newpath = {}
            # 檢驗訓練的發射概率矩陣中是否有該字
            neverSeen = text[t] not in emit_p['S'].keys() and text[t] not in emit_p['M'].keys() and \
                        text[t] not in emit_p['E'].keys() and text[t] not in emit_p['B'].keys()
            for y in states:
                # 設定未知字單獨成詞\n",
                emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0
                (prob, state) = max([(V[t - 1][y0] * trans_p[y0].get(y, 0) * emitP, y0)
                                     for y0 in states if V[t - 1][y0] > 0])
                V[t][y] = prob
                newpath[y] = path[state] + [y]
            path = newpath
        if emit_p['M'].get(text[-1], 0) > emit_p['S'].get(text[-1], 0):
            (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E', 'M')])
        else:
            (prob, state) = max([(V[len(text) - 1][y], y) for y in states])

        print("emit_p['S'].keys()", emit_p['S'].keys())
        return prob, path[state]

    def cut(self, text):
        if not self.load_para:
            self.try_laod_model(os.path.exists(self.model_file))
        prob, pos_list = self.viterbi(text, self.state_list, self.Pi_dic, self.A_dic, self.B_dic)
        begin, next = 0, 0
        for i, char in enumerate(text):
            pos = pos_list[i]
            if pos == 'B':
                begin = i
            elif pos == 'E':
                yield text[begin: i + 1]
                next = i + 1
            elif pos == 'S':
                yield char
                next = i + 1
        if next < len(text):
            yield text[next:]


if __name__ == '__main__':
    hmm = HMM()
    hmm.train('data/trainCorpus.txt_utf8')
    text = '這是一個非常棒的方案'
    res = hmm.cut(text)
    print(str(list(res)))

以下是  'data/trainCorpus.txt_utf8' 這個檔案的百度雲盤連線

連結:https://pan.baidu.com/s/1p-b4ENP8xui7DDlmuwTYuQ 
提取碼:yz10

最後還是稍微講講關於HMM與CRF之間的區別吧

CRF就像一個反向的隱馬爾可夫模型(HMM),兩者都是用了馬爾科夫鏈作為隱含變數的概率轉移模型,只不過HMM使用隱含變數生成可觀測狀態,其生成概率有標註集統計得到,是一個生成模型;而CRF反過來通過可觀測狀態判別隱含變數,其概率亦通過標註集統計得來,是一個判別模型。由於兩者模型主幹相同,其能夠應用的領域往往是重疊的,但在命名實體、句法分析等領域CRF更勝一籌。當然你並不必須學習HMM才能讀懂CRF,但通常來說如果做自然語言處理,這兩個模型應該都有了解。

關於HMM暫時就貼這麼多了,CRF我會多貼東西,敬請期待啦