1. 程式人生 > >python_NLP實戰之詞性標註與命名實體識別

python_NLP實戰之詞性標註與命名實體識別

一、詞性標註

jieba詞性標註結合規則和統計的方法,詞典匹配和HMM共同作用

二、命名實體識別

HMM將分詞作為字標記來解決,其中有兩條獨立性假設 1、輸出觀察值之間相互獨立 2、狀態轉移過程中,當前狀態只與前一狀態有關

CRF也是一種用來標記和切分序列化資料的統計模型。

兩者不同的是:條件隨機場是在給定觀察的標記序列下,計算整個標記序列的聯合概率,而HMM是在給定狀態下,定義下一個狀態的分佈。HMM處理時,每個狀態依賴於上一個狀態,線性鏈CRF依賴於當前狀態的周圍節點狀態。

2.1 日起識別

本質上是基於正則表示式的方式

# 進行日期識別
import re
from datetime import datetime,timedelta
from dateutil.parser import parse
import jieba.posseg as psg

UTIL_CN_NUM = { '零': 0,  '一': 1,  '二': 2, '兩': 2, '三': 3,  '四': 4, '五': 5,  '六': 6, '七': 7,  '八': 8, '九': 9,
    '0': 0,  '1': 1,'2': 2,'3': 3, '4': 4,'5': 5,'6': 6, '7': 7,'8': 8, '9': 9}
UTIL_CN_UNIT = {'十': 10, '百': 100, '千': 1000, '萬': 10000}

def cn2dig(src):
    if src=='':
        return None
    m=re.match("\d+",src)
    if m:
        return int(m.group(0))
    rsl=0
    unit=1
    for item in src[::-1]:
        if item in UTIL_CN_UNIT.keys():
            unit = UTIL_CN_UNIT[item]
        elif item in UTIL_CN_NUM.keys():
            num = UTIL_CN_NUM[item]
            rsl += num * unit
        else:
            return None
    if rsl<unit:
        rsl+=unit
    return rsl
def year2dig(year):
    res=''
    for item in year:
        if item in UTIL_CN_NUM.keys():
            res = res + str(UTIL_CN_NUM[item])
        else:
            res = res + item
    m = re.match("\d+", res)
    if m:
        if len(m.group(0))==2:
            return int(datetime.datetime.today().year / 100) * 100 + int(m.group(0))
        else:
            return int(m.group(0))
    else:
        return None
# 對拼接字串進行進一步處理
def check_time_valid(word):
    m=re.match("\d+$",word)
    if m:
        if len(word)<=6:
            return None
    wordl=re.sub('[號|日]\d+$','日',word)
    if wordl!=word:
        return check_time_valid(wordl)
    else:
        return wordl
# 通過正則表示式將日期串進行切割 group(i)是指的利用正則表示式匹配的條目
def parse_datetime(msg):
    if msg is None or len(msg)==0:
        return None
    try:
        dt=parse(msg,fuzzy=True)
        return dt.strftime('%Y-%m-%d %H:%M:%S')
    except Exception as e:
        m = re.match(
        r"([0-9零一二兩三四五六七八九十]+年)?([0-9一二兩三四五六七八九十]+月)?([0-9一二兩三四五六七八九十]+[號日])?([上中下午晚早]+)?([0-9零一二兩三四五六七八九十百]+[點:\\.時])?([0-9零一二三四五六七八九十百]+分?)?([0-9零一二三四五六七八九十百]+秒)?",msg)
        if m.group(0) is not None:
            res={
                "year":m.group(1),
                "month":m.group(2),
                "day":m.group(3),
                "hour": m.group(5) if m.group(5) is not None else '00',
                "minute": m.group(6) if m.group(6) is not None else '00',
                "second": m.group(7) if m.group(7) is not None else '00',
            }
            params={}
            for name in res:
                if res[name] is not None and len(res[name]) != 0:
                    tmp=None
                    if name=='year':
                        tmp = year2dig(res[name][:-1])
                    else:
                        tmp = cn2dig(res[name][:-1])
                    if tmp is not None:
                        params[name] = int(tmp)

            target_date = datetime.today().replace(**params)
            is_pm=m.group(4)
            if is_pm is not None:
                if is_pm == u'下午' or is_pm == u'晚上' or is_pm == '中午':
                    hour = target_date.time().hour
                    if hour < 12:
                        target_date = target_date.replace(hour=hour + 12)
            return target_date.strftime('%Y-%m-%d %H:%M:%S')
        else:
            return None
# 將帶有時間資訊的詞進行切分,提取表示時間的詞
def time_extract(text):
    time_res=[]
    word=''
    keyDate={'今天':0,'明天':1,'後天':2}
    for k,v in psg.cut(text):
        if k in keyDate:
            if word!='':
                time_res.append(word)
                word=(datetime.today()+timedelta(days=keyDate.get(k,0))).strftime('%Y年%m月%d日')
        elif word!='':
            if v in ['m','t']:
                word=word+k
            else:
                time_res.append(word)
                word=''
        elif v in ['m','t']:
            word=k
    if word!='':
        time_res.append(word)
    result=list(filter(lambda x:x is not None,[check_time_valid(w) for w in time_res]))
    final_res=[parse_datetime(w) for w in result]
    return [x for x in final_res if x is not None]
text1 = '我要住到明天下午三點'
print(text1, time_extract(text1), sep=':')

2.2 地名識別

基於條件隨機場進行地名識別

1、確定標籤體系

  B M E O S

2、語料資料處理

一行一個token,然後每一個有一個標籤,如下所示:

我  O

去  O

北  B

京  M

資料處理的程式碼:

#coding=utf8
# 用於進行每行的標註轉換
def tag_line(words,mark):
    chars=[]
    tags=[]
    # 用於合併組合詞
    temp_word=''
    for word in words:
        word=word.strip('\t ')
        if temp_word=='':
            bracket_pos=word.find('[')
            w,h=word.split('/')
            if bracket_pos==-1:
                if len(w)==0:
                    continue
                chars.extend(w)
                if h=='ns':
                    tags+=['S'] if len(w)==1 else ['B']+['M']*(len(w)-2)+['E']
                else:
                    tags+=['O']*len(w)

            else:
                w=w[bracket_pos+1:]
                temp_word+=w
        else:
            bracket_pos=word.find(']')
            w,b=word.split('/')
            if bracket_pos==-1:
                temp_word+=w
            else:
                w=temp_word+w
                h=word[bracket_pos+1:]
                temp_word=''
                if len(w)==0:
                    continue
                chars.extend(w)
                if h=='ns':
                    tags+=['S'] if len(w)==1 else ['B']+['M']*(len(w)-2)+['E']
                else:
                    tags+=['O']*len(w)
    assert temp_word==''
    return (chars,tags)
# 用於載入資料,儲存轉換結果
def corpusHandler(corpusPath):
    import os
    root=os.path.dirname(corpusPath)
    with open(corpusPath,encoding='utf-8') as corpus_f, open(os.path.join(root,'train.txt'),'w') as train_f, open(os.path.join(root,'test.txt'),'w') as test_f:
        pos=0
        for line in corpus_f:
            line=line.strip('\r\n\t')
            if line=='':
                continue
            isTest=True if pos%5==0 else False
            words=line.split()[1:]
            if len(words)==0:continue

            line_chars,line_tags=tag_line(words,pos)
            saveObj=test_f if isTest else train_f
            for k,v in enumerate(line_chars):
                saveObj.write(v+'\t'+line_tags[k]+'\n')
            saveObj.write('\n')
            pos+=1

3、特徵模板設計

CRF的特徵函式對應CEF++的特徵模板。格式為%x [row,col],用於確定輸入資料的一個token,raw確定當前token的相對行數,col用於確定列數。

#Unigram
U00:%x[-1,0]
U01:%x[0,0]
U02:%x[1,0]
U03:%x[2,0]
U04:%x[-2,0]
U05:%x[1,0]/%x[2,0]
U06:%x[0,0]/%x[-1,0]/%x[-2,0]
U07:%x[0,0]/%x[1,0]/%x[2,0]
U08:%x[-1,0]/%x[0,0]
U09:%x[0,0]/%x[1,0]
U10:%x[-1,0]/%x[1,0]

#Bigram
B

4、模型的訓練和測試

crf_learn  crf_test

計算測試集的效果

def f1(path):
    with open(path) as f:
        all_tag=0
        loc_tag=0
        pred_loc_tag=0
        correct_tag=0
        correct_log_tag=0
        states=['B','M','E','S']

        for line in f:
            line=line.strip()
            if line=='':
                continue
            _,r,p=line.split()
            all_tag+=1
            if r==p:
                correct_tag+=1
                if r in states:
                    correct_log_tag+=1
            if r in states:
                loc_tag+=1
                if p in states:
                    pred_loc_tag+=1
        loc_P=1.0*correct_log_tag/pred_loc_tag
        loc_R=1.0*correct_log_tag/loc_tag
        print('loc_P:{0},loc_R:{1},loc_F1:{2}'.format(loc_P,loc_R,(2*loc_P*loc_R)/(loc_P+loc_R)))

5、模型使用

def load_model(path):
    import os, CRFPP
   
    if os.path.exists(path):
        return CRFPP.Tagger('-m {0} -v 3 -n2'.format(path))
    return None

def locationNER(text):

    tagger = load_model('./model')

    for c in text:
        tagger.add(c)

    result = []

    # parse and change internal stated as 'parsed'
    tagger.parse()
    word = ''
    for i in range(0, tagger.size()):
        for j in range(0, tagger.xsize()):
            ch = tagger.x(i, j)
            tag = tagger.y2(i)
            if tag == 'B':
                word = ch
            elif tag == 'M':
                word += ch
            elif tag == 'E':
                word += ch
                result.append(word)
            elif tag == 'S':
                word = ch
                result.append(word)


    return result