1. 程式人生 > >自然語言處理5 -- 詞向量

自然語言處理5 -- 詞向量

系列文章,請多關注
Tensorflow原始碼解析1 – 核心架構和原始碼結構
帶你深入AI(1) - 深度學習模型訓練痛點及解決方法
自然語言處理1 – 分詞
自然語言處理2 – jieba分詞用法及原理
自然語言處理3 – 詞性標註
自然語言處理4 – 句法分析
自然語言處理5 – 詞向量
自然語言處理6 – 情感分析

1 概述

詞向量和分詞一樣,也是自然語言處理中的基礎性工作。詞向量一方面解決了詞語的編碼問題,另一方面也解決了詞的同義關係,使得基於LSTM等深度學習模型的自然語言處理成為了可能。和分詞不同,中英文文字,均需要進行詞向量編碼。

2 詞向量工具

2013年Google開源了word2vec工具,它可以進行詞向量訓練,載入已有模型進行增量訓練,求兩個詞向量相似度,求與某個詞接近的詞語,等等。功能十分豐富,基本能滿足我們對於詞向量的需求。下面詳細講解怎麼使用word2vec

2.1 模型訓練

詞向量模型訓練只需要有訓練語料即可,語料越豐富準確率越高,屬於無監督學習。後面會講詞向量訓練演算法和程式碼實現,這兒先說怎麼利用word2vec工具進行詞向量模型訓練。

# gensim是自然語言處理的一個重要Python庫,它包括了Word2vec
import gensim
from gensim.models import word2vec

# 語句,由原始語句經過分詞後劃分為的一個個詞語
sentences = [['網商銀行', '體驗', '好'], ['網商銀行','轉賬','快']]

# 使用word2vec進行訓練
# min_count: 詞語頻度,低於這個閾值的詞語不做詞向量
# size:每個詞對應向量的維度,也就是向量長度 # workers:並行訓練任務數 model = word2vec.Word2Vec(sentences, size=256, min_count=1) # 儲存詞向量模型,下次只需要load就可以用了 model.save("word2vec_atec")

2.2 增量訓練

有時候我們語料不是很豐富,但都是針對的某個垂直場景的,比如網商銀行相關的語料。此時我們訓練詞向量時,可以先基於一個已有的模型進行增量訓練,這樣就可以得到包含特定語料的比較準確的詞向量了。

# 先載入已有模型
model = gensim.models.
Word2Vec.load("word2vec_atec") # 進行增量訓練 corpus = [['網商銀行','餘利寶','收益','高'],['貸款','發放','快']] # 新增語料 model.build_vocab(corpus, update=True) # 訓練該行 model.train(corpus, total_examples=model.corpus_count, epochs=model.iter) # 儲存增量訓練後的新模型 model.save("../data/word2vec_atec")

2.3 求詞語相似度

可以利用詞向量來求兩個詞語的相似度。詞向量的餘弦夾角越小,則相似度越高。

# 驗證詞相似程度
print model.wv.similarity('花唄'.decode('utf-8'), '借唄'.decode('utf-8'))

2.4 求與詞語相近的多個詞語

for i in model.most_similar(u"我"):
    print i[0],i[1]

3 詞向量訓練演算法

詞向量可以通過使用大規模語料進行無監督學習訓練得到,常用的演算法有CBOW連續詞袋模型和skip-gram跳字模型。二者沒有本質的區別,演算法框架完全相同。區別在於,CBOW利用上下文來預測中心詞。而skip-gram則相反,利用中心詞來預測上下文。比如對於語料 {“The”, “cat”, “jump”, “over”, “the”, “puddle”} ,CBOW利用上下文{“The”, “cat”, “over”, “the”, “puddle”} 預測中心詞“jump”,而skip-gram則利用jump來預測上下文的詞,比如jump->cat, jump->over。一般來說,CBOW適合小規模訓練語料,對其進行平滑處理。skip-gram適合大規模訓練語料,可以基於滑窗隨機選擇上下文詞語。word2vec模型訓練時預設採用skip-gram。

4 詞向量訓練程式碼實現

下面來看一個基於skip-gram的詞向量訓練的程式碼實現,這樣就能夠skip-gram演算法有比較深刻的理解。CBOW演算法和skip-gram基本相同。程式碼來自TensorFlow官方教程 https://www.tensorflow.org/tutorials/word2vec

# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import math
import os
import random
import zipfile

import numpy as np
from six.moves import urllib
from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf

# 1 下載語料檔案,並校驗檔案位元組數是否正確
url = 'http://mattmahoney.net/dc/'
def maybe_download(filename, expected_bytes):
    if not os.path.exists(filename):
        urllib.request.urlretrieve(url + filename, filename)
    statinfo = os.stat(filename)
    if (statinfo.st_size == expected_bytes):
        print("get text and verified")
    else:
        raise Exception("text size is not correct")

    return filename

filename = maybe_download("text8.zip", 31344016)


# 2 語料處理,弄成一個個word組成的list, 以空格作為分隔符。
# 如果是中文語料,這一步還需要進行分詞
def read_data(filename):
    with zipfile.ZipFile(filename) as f:
        data = tf.compat.as_str(f.read(f.namelist()[0])).split()
    return data

vocabulay = read_data(filename)
print("total word size %d" % len(vocabulay))
print("100 words at first: ", vocabulay[0:100])

# 3 詞表製作,根據出現頻率排序,序號代表這個單詞。詞語編碼的一種常用方式
def build_dataset(words, n_words):
    count = [["UNK", -1]]
    count.extend(collections.Counter(words).most_common(n_words - 1))
    dictionay = dict()
    for word, _ in count:
        # 利用按照出現頻率排序好的詞語的位置,來代表這個詞語
        dictionay[word] = len(dictionay)

    # data包含語料庫中的所有詞語,低頻的詞語標註為UNK。這些詞語都是各不相同的
    data = list()
    unk_count = 0
    for word in words:
        if word in dictionay:
            index = dictionay[word]
        else:
            index = 0
            unk_count += 1
        data.append(index)
    count[0][1] = unk_count   # unk的個數

    # 將key value reverse一下,使用數字來代表這個詞語
    reversed_dictionary = dict(zip(dictionay.values(), dictionay.keys()))
    return data, count, dictionay, reversed_dictionary

VOC_SIZE = 50000
data, count, dictionary, reversed_dictionary = build_dataset(vocabulay, VOC_SIZE)
del vocabulay
print("most common words", count[0:5])
# 列印前10個單詞的數字序號
print("sample data", data[:10], [reversed_dictionary[i] for i in data[:10]])

# 4 生成訓練的batch label對
data_index = 0
# skip_window表示與target中心詞相關聯的上下文的長度。整個Buffer為 (2 * skip_window + 1),從skip_window中隨機選取num_skips個單詞作為label
# 最後形成 target->label1 target->label2的batch label對組合
def generate_batch(batch_size, num_skips, skip_window):
    global data_index
    batch = np.ndarray(shape=(batch_size), dtype=np.int32)
    labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)

    # 將skip_window的資料組合放入Buffer中
    span = 2 * skip_window + 1
    buffer = collections.deque(maxlen=span)
    for _ in range(span):
        buffer.append(data[data_index])
        data_index = (data_index + 1) % len(data)   # 防止超出data陣列範圍,因為batch可以取很多次迭代。所以可以迴圈重複

    # num_skips表示一個Buffer中選取幾個batch->label對,每一對為一個batch,故需要batch_size // num_skips個Buffer
    for i in range(batch_size // num_skips):
        target = skip_window
        targets_to_avoid = [skip_window]

        # 一個Buffer內部尋找num_skips個label
        for j in range(num_skips):
            # 尋找label的位置,總共會有num_skips個label
            while target in targets_to_avoid:   # 中間那個為batch,不能選為target.也不能重複選target
                target = random.randint(0, span - 1)
            targets_to_avoid.append(target)

            # 中心位置為batch,隨機選取的num_skips個其他位置的為label
            batch[i * num_skips + j] = buffer[skip_window]  #
            labels[i * num_skips + j, 0] = buffer[target]   # 遍歷選取的label

        # 一個Buffer內的num_skips找完之後,向後移動一位,將單詞加入Buffer內,並將Buffer內第一個單詞移除,從而形成新的Buffer
        buffer.append(data[data_index])
        data_index = (data_index + 1) % len(data)

    # 所有batch都遍歷完之後,重新調整data_index指標位置
    data_index = (data_index + len(data) - span) % len(data)

    return batch, labels

batch, labels = generate_batch(batch_size=8, num_skips=2, skip_window=1)
for i in range(8):
    print(batch[1], reversed_dictionary[batch[i]], "->", labels[i, 0], reversed_dictionary[labels[i, 9]])

# 5 構造訓練模型
batch_size = 128
embedding_size = 128  # 詞向量為128維,也就是每一個word轉化為的vec是128維的
skip_window = 1   # 滑窗大小為1, 也就是每次取中心詞前後各一個詞
num_skips = 2     # 每次取上下文的兩個詞

# 模型驗證集, 對前100個詞進行驗證,每次驗證16個詞
valid_size = 16
valid_window = 100
valid_examples = np.random.choice(valid_window, valid_size, replace=False)

# 噪聲詞數量
num_sampled = 64

graph= tf.Graph()
with graph.as_default():
    train_inputs = tf.placeholder(tf.int32, shape=[batch_size])
    train_labels = tf.placeholder(tf.int32, shape=[batch_size, 1])
    valid_dataset = tf.constant(valid_examples, dtype=tf.int32)     # 驗證集

    with tf.device("/cpu:0"):
        # 構造embeddings, 50000個詞語,每個詞語為長度128的向量
        embeddings = tf.Variable(tf.random_uniform([VOC_SIZE, embedding_size], -1.0, 1.0))
        embed = tf.nn.embedding_lookup(embeddings, train_inputs)

        nce_weights = tf.Variable(tf.truncated_normal([VOC_SIZE, embedding_size], stddev=1.0 / math.sqrt(embedding_size)))
        nce_biases = tf.Variable(tf.zeros([VOC_SIZE]))

    # 利用nce loss將多分類問題轉化為二分類問題,從而使得詞向量訓練成為可能,不然分類會是上萬的量級
    loss = tf.reduce_mean(
        tf.nn.nce_loss(
            weights=nce_weights,
            biases=nce_biases,
            labels=train_labels,
            inputs=embed,       # inputs為經過embeddings詞向量之後的train_inputs
            num_sampled=num_sampled,    # 噪聲詞
            num_classes=VOC_SIZE,
        )
    )
    optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)

    # 歸一化embeddings
    norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keep_dims=True))
    normalized_embeddings = embeddings / norm

    valid_embeddings = tf.nn.embedding_lookup(normalized_embeddings, valid_dataset)
    similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)

    init = tf.global_variables_initializer()


# 6 訓練
num_steps = 100000
with tf.Session(graph=graph) as session:
    init.run()

    average_loss = 0
    for step in xrange(num_steps):
        # 構建batch,並進行feed
        batch_inputs, batch_labels = generate_batch(batch_size, num_skips, skip_window)
        feed_dict = {train_inputs: batch_inputs, train_labels: batch_labels}

       # run optimizer和loss,跑模型
        _, loss_val = session.run([optimizer, loss], feed_dict=feed_dict)
        average_loss += loss_val

        if step % 2000 == 0 and step > 0:
            average_loss /= 2000
            print("average loss at step ", step, ": ", average_loss)
            average_loss = 0

        # 1萬步,驗證一次
        if step % 10000 == 0:
            sim = similarity.eval()
            for i in xrange(valid_size):
                valid_word = reversed_dictionary[valid_examples[i]]
                top_k = 8
                nearest = (-sim[i, :]).argsort()[1: top_k+1]
                log_str = "Nearest to %s:" % valid_word
                for k in xrange(top_k):
                    close_word = reversed_dictionary[nearest[k]]
                    log_str = '%s %s,' % (log_str, close_word)
                print(log_str)

    final_embeddings = normalized_embeddings.eval()

流程還是很簡單的,關鍵在第四步batch的構建,和第五步訓練模型的構建,步驟如下

  1. 下載語料檔案,並校驗檔案位元組數是否正確。這兒只是一個demo,語料也很小,只有100M。如果想得到比較準確的詞向量,一般需要通過爬蟲獲取維基百科 網易新聞等既豐富又相對準確的語料素材。一般需要幾十上百G的corpus,即語料。谷歌根據不同的語料預訓練了一些詞向量,參考 https://github.com/Embedding/Chinese-Word-Vectors

  2. 語料處理,文字切割為一個個詞語。英文的話以空格為分隔符進行切分即可(有誤差,但還好)。中文的話需要通過分詞工具進行分割。

  3. 詞表製作,詞語預編碼。根據詞語出現頻率排序,序號代表這個單詞。詞語編碼的一種常用方式。

  4. 生成訓練的batch label對。這是比較關鍵的一步,也是體現skip-gram演算法的一步。

    1. 先取出滑窗範圍的一組詞,如滑窗大小為5,則取出5個詞。
    2. 位於中心的詞為中心詞,比如滑窗大小為5,則第三個詞為中心詞。其他詞則稱為上下文。
    3. 從上下文中隨機取出num_skip個詞,比如num_skip為2,則從4個上下文詞語中取2個。通過隨機選取提高了一定的泛化性
    4. 得到num_skip箇中心詞->上下文的x->y片語
    5. 將滑窗向右移動一個位置,繼續這些步驟,直到滑窗到達文字最後
  5. 構造訓練模型,這一步也很關鍵。利用nce loss將多分類問題轉化為二分類問題,optimizer優化方法採用隨機梯度下降。

  6. 開始真正的訓練。這一步比較常規化。送入第四步構建的batch進行feed,跑optimizer和loss,並進行相關資訊列印即可。訓練結束後,即可得到調整完的詞向量模型。

5 總結

基於深度學習的詞向量訓練方法,具有演算法簡單通用,語料獲取容易,泛化性好的優點。通過學習官方程式碼,可以對skip-gram等詞向量訓練演算法有比較深入的理解。詞向量在文字分析,文字摘要,情感分析等領域都是必須的預處理,可以大大提高自然語言處理的準確度。

系列文章,請多關注
Tensorflow原始碼解析1 – 核心架構和原始碼結構
帶你深入AI(1) - 深度學習模型訓練痛點及解決方法
自然語言處理1 – 分詞
自然語言處理2 – jieba分詞用法及原理
自然語言處理3 – 詞性標註
自然語言處理4 – 句法分析
自然語言處理5 – 詞向量
自然語言處理6 – 情感分析