1. 程式人生 > >python_NLP實戰之中文分詞技術

python_NLP實戰之中文分詞技術

一、規則分詞

1.1 正向最大匹配演算法

# 正向最大匹配演算法 MM法 規則分詞
class MM(object):
    def __init__(self):
        self.window_size=3

    def cut(self,text):
        result=[]
        index=0
        text_length=len(text)
        dic=['研究','研究生','生命','命','的','起源']
        while text_length>index:
            for size in range(self.window_size+index,index,-1):
                piece=text[index:size]
                if piece in dic:
                    index=size-1
                    break
            index=index+1
            result.append(piece+'-------')
        print(result)
if __name__=='__main__':
    text='研究生命的起源'
    tokenizer=MM()
    print(tokenizer.cut(text))

1.2 逆向最大匹配演算法

# RMM逆向最大匹配演算法   規則分詞
class RMM(object):
    def __init__(self):
        self.window_size=3
    def cut(self,text):
        result=[]
        index=len(text)
        dic=['研究','研究生','生命','命','的','起源']
        while index>0:
            for size in range(index-self.window_size,index):
                piece=text[size:index]
                if piece in dic:
                    index=size+1
                    break
            index=index-1
            result.append(piece+'------')
        result.reverse()
        print(result)

if __name__=='__main__':
    text = '研究生命的起源'
    tokenizer = RMM()
    print(tokenizer.cut(text))

二、統計分詞

2.1 HMM模型

初始概率分佈

      z1可能是狀態1,狀態2 ... 狀態n,於是z1就有個N點分佈:

Z1

狀態1

狀態2

...

狀態n

概率

P1

P2

...

Pn

      即:Z1對應個n維的向量。

      上面這個n維的向量就是初始概率分佈,記做π。

狀態轉移矩陣

      但Z2就不能簡單的“同上”完事了,因為Z2和Z1不獨立,所以Z2是狀態1的概率有:Z1是狀態1時Z2是狀態1,Z1是狀態2時Z2是狀態1,..., Z1是狀態n時Z2是狀態1,於是就是下面的表

Z2

Z1

狀態1

狀態2

...

狀態n

狀態1

P11

P12

...

P1n

狀態2

P21

P22

...

P2n

...

...

...

...

...

狀態n

Pn1

Pn2

...

Pnn

       即:Z1->Z2對應個n*n的矩陣。

       同理:Zi -> Zi+1對應個n*n的矩陣。

      上面這些n*n的矩陣被稱為狀態轉移矩陣,用An*n表示。

      當然了,真要說的話,Zi -> Zi+1的狀態轉移矩陣一定都不一樣,但在實際應用中一般將這些狀態轉移矩陣定為同一個,即:只有一個狀態轉移矩陣。

      圖1的第一行就搞定了,下面是第二行。

觀測矩陣

      如果對於zi有:狀態1, 狀態2, ..., 狀態n,那zi的每一個狀態都會從下面的m個觀測中產生一個:觀測1, 觀測2, ..., 觀測m,所以有如下矩陣:

X

Z

觀測1

觀測2

...

觀測m

狀態1

P11

P12

...

P1m

狀態2

P21

P22

...

P2m

...

...

...

...

...

狀態n

Pn1

Pn2

...

Pnm

      這可以用一個n*m的矩陣表示,也就是觀測矩陣,記做Bn*m。

      由於HMM用上面的π,A,B就可以描述了,於是我們就可以說:HMM由初始概率分佈π、狀態轉移概率分佈A以及觀測概率分佈B確定,為了方便表達,把A, B, π 用 λ 表示,即:

            λ = (A, B, π)

例子

      假設我們相對如下這行話進行分詞:

           歡迎來到我的部落格

      再假設我們是這樣分的:找到“終止字”,然後根據終止字來分詞。即:對於這行字,“迎、到、我、的、客”是終止字,於是最終這麼分詞:歡迎/來到/我/的/部落格

      下面用上面的知識對這個例子建立HMM的A, B, π:

       初始概率分佈的確定:

           1,對於每個樣本,我們的目標是確定其是不是“終止字”,因此對於每個樣本,其狀態只有n=2個:狀態1 -- 是、狀態2 -- 不是。

            2,因此初始概率分佈π為:

                 π = {p1,p2}

                 P1:整個句子中第一個字是非終止字的概率

                  P2:整個句子中第一個字是終止字的概率

      狀態轉移矩陣的確定:

           剛才已經知道狀態有n=2個,於是狀態轉移矩陣就立馬得出了,即狀態轉移矩陣是個n*n的矩陣,如下:

                 A=

                 p11:非終止字 -> 非終止字的概率。

                 p12:非終止字 -> 終止字的概率。

                 p21:終止字 -> 非終止字的概率。

                  p22:終止字 -> 終止字的概率。

       觀測矩陣的確定:

           如果我們的目標文字使用Unicode編碼,那麼上面的任何一個字都是0~65535中的一個數,於是我們的觀測就會有m=65536個,於是觀測矩陣就是個n*m的矩陣,如下:

                 B=

                 p1,0:Unicode編碼中0對應的漢字是非終止字的概率

                 p1,65535:Unicode編碼中65535對應的漢字是非終止字的概率

                 p2,0:Unicode編碼中0對應的漢字是終止字的概率

                 p2,65535:Unicode編碼中65535對應的漢字是終止字的概率

            PS:為什麼x會有65535個觀測啊?“歡迎來到我的部落格”這個明明只有8個字。原因是因為真正的HMM面臨的情況,即:現有了 Z1=“非終止字”這個狀態,然後根據這個狀態從65535個字中選出x1=“歡”這個字,然後根據狀態轉移矩陣,下一次轉移到了Z2 =“終止字”,然後根據Z2從65535個字中選出了x2=“迎”這個字,這樣,最終生成了這句話。

# 統計分詞
#  1、先建立語言模型
#  2、對句子進行單詞劃分,對劃分結果進行概率計算,獲得概率最大的分詞方式
# HMM
class HMM(object):
    def __init__(self):
        import os
        # 儲存訓練的模型
        self.model_file='./data/hmm_model.pkl'
        # 狀態特徵值集合
        self.state_list=['B','M','E','S']
        # 判斷是否需要重新載入模型
        self.load_para=False
    def try_load_model(self,trained):
        if trained:
            import pickle
            with open(self.model_file,'rb' ) as f:
                self.A_dic=pickle.load(f)
                self.B_dic=pickle.load(f)
                self.Pi_dic=pickle.load(f)
                self.load_para=True
        else:
            # 狀態轉移概率 (狀態-》狀態的條件概率)
            self.A_dic={}
            # 發射概率 (狀態-》詞語的條件概率
            self.B_dic={}
            # 狀態的初始概率
            self.Pi_dic={}
            self.load_para=False
    #         計算轉移概率,初始概率,發射概率
    def train(self,path):
        # 重置幾個概率矩陣
        self.try_load_model(False)
        # 統計狀態出現次數
        Count_dic={}
        def init_parameters():
            for state in self.state_list:
                self.A_dic[state]={s:0.0 for s in self.state_list}
                self.Pi_dic[state]=0.0
                self.B_dic[state]={}
                Count_dic[state]=0

        def makeLabel(text):
            out_text=[]
            if len(text)==1:
                out_text.append(['S'])
            else:
                out_text+=['B']+['M']*(len(text)-2)+['E']
            return out_text
        init_parameters()
        line_num=-1

        words=set()
        with open(path,encoding='utf-8') as f:
            for line in f:
                line_num+=1
                line=line.strip()
                if not line:
                    continue
                word_list=[i for i in line if i!='']
                words |=set(word_list)

                linelist=line.split()

                line_state=[]
                for w in linelist:
                    line_state.extend(makeLabel(w))
                assert len(word_list)==len(line_state)

                for k, v in enumerate(line_state):
                    Count_dic[v]+=1
                    if k==0:
                        self.Pi_dic[v]+=1
                    else:
                        self.A_dic[line_state[k-1]][v]+=1
                        self.B_dic[line_state[k]][word_list[k]]=self.B_dic[line_state[k]].get(word_list[k],0)+1.0



        self.Pi_dic={k: v*1.0/line_num for k,v in self.Pi_dic.items()}
        self.A_dic={k:{k1: v1/Count_dic[k] for k1,v1 in v.items()}for k,v in self.A_dic.items()}
        self.B_dic={k: {k1:(v1+1)/Count_dic[k] for k1,v1 in v.items()} for k,v in self.B_dic.items()}
        import pickle
        with open(self.model_file,'wb') as f:
            pickle.dump(self.A_dic,f)
            pickle.dump(self.B_dic,f)
            pickle.dump(self.Pi_dic,f)
        return self

    def viterbi(self,text,states,start_p,train_p,emit_p):
        V=[{}]
        path={}
        for y in states:
            V[0][y]=start_p[y]*emit_p[y].get(text[0],0)
            path[y]=[y]
        for t in range(1,len(text)):
            V.append({})
            newpath={}
            neverSeen=text[t] not in emit_p['S'].keys() and \
                      text[t] not in emit_p['M'].keys() and \
                      text[t] not in emit_p['E'].keys() and \
                      text[t] not in emit_p['B'].keys()
            for y in states:
                emitP=emit_p[y].get(text[t],0) if not neverSeen else 1.0
                (prob,state)=max([(V[len(text)-1][y],y) for y in ('E','M')])
            else:
                (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
        return (prob,path[state])
    def cut(self,text):
        import os
        if not self.load_para:
            self.try_load_model(os.path.exists(self.model_file))
        prob,pos_list=self.viterbi(text,self.state_list,self.Pi_dic,self.A_dic,self.B_dic)
        begin,next=0,0
        for i ,char in enumerate(text):
            pos=pos_list[i]
            if pos=='B':
                begin=i
            elif pos=='E':
                yield text[begin:i+1]
                next=i+1
            elif pos=='S':
                yield char
                next=i+1
        if next<len(text):
            yield text[next:]

hmm=HMM()
hmm.train('./data/trainCorpus.txt_utf8')

2.2 CRF

三、中文分詞工具_JieBa

jieba分詞結合了基於規則和基於統計的兩種方法

基於漢字成詞的HMM模型,採用了Verterbi演算法進行推導

3.1高頻詞提取

高頻詞就是NLP中的TF策略

進行資料的讀取
def get_content(path):
    with open(path,'r',encoding='utf-8',errors='ignore') as f:
        content=''
        for l in f:
            l=l.strip()
            content+=l
    return content
def stop_words(path):
    with open(path,encoding='utf-8') as f:
        return [l.strip() for l in f]
定義高頻詞統計的函式,輸入是一個詞的陣列
def get_TF(words,topK=10):
    tf_dic={ }
    for w in words:
        tf_dic[w]=tf_dic.get(w,0)+1
    return sorted(tf_dic.items(),key =lambda x:x[1],reverse=True)[:topK]

def main():
    import glob
    import random
    import jieba
    files=glob.glob('./data/news/C000013/*.txt')
    corpus=[get_content(x) for x in files]

    sample_inx=random.randint(0,len(corpus))
    split_words=[x for x in jieba.cut(corpus[sample_inx]) if x not in stop_words('./data/stop_words.utf8')]
    print('樣本之一:' + corpus[sample_inx])
    print('樣本分詞效果:' + '/ '.join(split_words))
    print('樣本的topK(10)詞:' + str(get_TF(split_words)))

main()