1. 程式人生 > >Tensorflow實戰:Word2Vec_Skip_Gram原理及實現(多註釋)

Tensorflow實戰:Word2Vec_Skip_Gram原理及實現(多註釋)

        Word2Vec也稱Word Embeddings,中文的叫法為“詞向量”或“詞嵌入”,是一種非常高效的,可以從原始語料中學習字詞空間向量的預測模型。

        在Word2Vec出現之前,通常將字詞轉為One-Hot Encoder ,一個詞對應一個向量(一個向量中只有一個1,其餘皆為0),通常要將一篇文章中每一個詞都轉成一個向量,而整篇文章則變為一個稀疏矩陣。這樣的方法沒有考慮字詞之間的關係,沒有提供任何關聯資訊,例如我們對我們對動物 和 狗之間的從屬關係,關聯程度一無所知。且稀疏資料的訓練的效率較低。

       而向量空間模型(Vector Space Models)則可以有效的解決這個問題,它將字詞轉為連續值(相對於One-Hot編碼的離散值)的向量表達,並且其中意思相近的詞將對映到向量空間中相似位置。

       Word2Vec就是一種計算非常高效的,可以從原始語料中學習字詞空間向量的預測模型。它主要分為CBOW和Skip-Gram兩種模式,其中CBOW是從原始語句(比如:中國的首都是_____)推測目標字詞(比如:北京);而Skip-Gram則正好相反,它是從目標字詞推測出原始語句。其中CBOW對小型資料比較合適,而Skip-Gram在大型語料中表現的更好,使用Word2Vec訓練語料能得到一些非常有趣的結果,比如意思相近的詞在向量空間中的位置會非常接近。

       本文將使用Skip-Gram模式的Word2-Vec,以“the quick brown fox jumped over the lazy dog”為例,我們要構造一個語境與目標詞彙的對映關係,其中語境包含一個單詞的左邊和右邊的詞彙,假設我們的滑窗尺寸為1,則以quick為例,其單詞及語境包含(the, quick, brown),則對目標詞彙quick,其對應的訓練樣本為(quick, the)和(quick, brown)。我們訓練時,希望模型能夠從目標詞彙quick預測出語境the,同時也需要製造隨機的詞彙作為負樣本(噪聲),我們希望預測的概率分佈在正樣本the上儘可能大,而在隨機的負樣本上儘可能的小。這裡的做法就是通過優化演算法比如SGD來更新模型中Word Embedding的引數,讓概率分佈的損失函式(NCE Loss)儘可能的小。這樣每個單詞的詞向量就會隨著訓練過程不斷調整,直到處於一個最適合語料的空間位置。這樣我們的損失函式最小,最符合語料,同時預測出正確單詞的概率也最高。

       以下為訓練Word2Vec之Skip-Gram的步驟及重要引數的意義:

一、讀入文字,將每個單詞分隔開,存為一個單詞列表。

二、統計單詞列表中詞頻,

二維陣列count:記錄單詞頻數,形如[['UNK', -1], [top1, 25631], [top2, 3541],…]

詞典dictionary:將top50000詞頻的單詞按詞頻從大到小的順序存入,其中key為單詞了,value為編號(1-50000)

單詞編號列表data:遍歷單詞列表,如當前單詞在詞典中,則值為value,否則為0

reverse_dictionary:dictionary的key與value值反轉

三、生成word2vec的訓練樣本,返回目標單詞batch陣列和其對應語境labels陣列

batch:一維陣列,長度為batch_size,存放目標單詞編號

labels:二維陣列(batch_size, 1) ,在與batch陣列下標相同的位置存放目標單詞的語境單詞編號

訓練樣本的構成:

以“the quick brown fox jumped over the lazy dog”這句話為例。我們要構造一個語境與目標詞彙的對映關係,其中語境包括一個單詞左邊和右邊的詞彙,假設我們的滑窗尺寸為1,因為Skip-Gram模型是從目標詞彙預測語境,所以我們的資料集就成了(quick, the)、(quick, brown)、(brown, quick)、(brown, fox)。我們訓練時,希望模型能從目標詞彙quick預測出語境the。

四、訓練

隨機生成所有單詞的詞向量為一個詞向量矩陣,

在詞向量矩陣中根據單詞編號查詢到訓練資料(即batch)對應的向量們,

tf.nn.nce_loss計算出詞向量embedding在訓練資料上的loss,並使用tf.reduce_mean進行彙總

訓練完後embeddings除以其L2範數得到標準化後的normalized_embeddings,即最終的詞向量矩陣

以下為本文實現Word2Vec的實現程式碼,進行了前向計算的測評,程式碼及詳細註釋如下:

import collections
import math
import os
import random
import zipfile
import numpy as np
import urllib
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

'''############################################05《TensorFlow實戰》實現Word2Vec Skip-Gram##################################################'''


url = "http://mattmahoney.net/dc/"

'''下載文字資料的函式'''
def maybe_download(filename, expected_bytes):

    if not os.path.exists(filename):
        filename, _ = urllib.request.urlretrieve(url + filename, filename)  #下載資料的壓縮檔案

    #核對檔案尺寸
    statinfo = os.stat(filename)
    if statinfo.st_size == expected_bytes:
        print('Found and verfied', filename)
    else:
        print(statinfo.st_size)
        raise Exception(
            'Failed to verify ' + filename + '. Can you get to it with browser?'
        )
    return filename

filename = maybe_download('text8.zip', 31344016)    #根據檔名和byte下載資料


def read_data(filename):
    with zipfile.ZipFile(filename) as f:
        data = tf.compat.as_str(f.read(f.namelist()[0])).split()    #解壓檔案並將資料轉成單詞的列表
    return data

words = read_data(filename)
print('Data size', len(words))
# print(words[:5])


vocabulary_size = 50000


'''建立vocabulary詞彙表'''
def build_dataset(words):

    count = [['UNK', -1]]   #詞頻統計,['UNK', -1]表示未知單詞的頻次設為-1

    #使用collections.Counter()方法統計單詞列表中單詞的頻數, 使用.most_common()方法取top50000頻數的單詞(頻次從大到小)作為vocabulary
    #     # >> > Counter('abcdeabcdabcaba').most_common(3)
    #     # [('a', 5), ('b', 4), ('c', 3)]
    count.extend(collections.Counter(words).most_common(vocabulary_size - 1))

    #建立一個詞典dict,將top50000的單詞存入,以便快速查詢
    dictionary = dict()
    #將top50000單詞按頻次給定其編號存入dictionary,key為詞,value為編號(從1到50000)
    for word, _ in count:   #_用作一個名字,來表示某個變數是臨時的或無關緊要的
        dictionary[word] = len(dictionary)
        #print(len(dictionary))

    data = list()   #單詞列表轉換後的編碼
    unk_count = 0

    for word in words:  #遍歷單詞列表
        if word in dictionary:  #如果該單詞存在於dictionary中
            index = dictionary[word]    #取該單詞的頻次為編號
        else:   #如果dictionary中沒有該單詞
            index = 0   #編號設為0
            unk_count += 1
        data.append(index)

    count[0][1] = unk_count #未知單詞數(除了top50000以外的單詞)

    reverse_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
    #返回單詞列表中單詞轉換為編碼(編碼為該單詞的頻次)的data列表、每個單詞的頻數統計count、詞彙表dictionary及其反轉形式reverse_dictionary
    return data, count, dictionary, reverse_dictionary

data, count, dictionary, reverse_dictionary = build_dataset(words)

del words   #刪除原始單詞表,節約記憶體
print("Most common words (+UNK)", count[:5])    #列印未知單詞和最高頻單詞及它們的頻次
print("Sample data", data[:10], [reverse_dictionary[i] for i in data[:10]])  #列印單詞列表中前10個單詞的編號及單詞本身


data_index = 0


'''生成word2vec的訓練樣本,返回目標單詞編號batch陣列和其對應語境編號labels陣列
skip-gram模式將原始資料"the quick brown fox jumped"轉為(quick,the),(qucik,brown),(brown,quick),(brown,fox)等樣本'''
def generate_batch(batch_size,  #一個批次的大小
                   num_skips,   #num_skips為對每個單詞生成多少個樣本
                   skip_window): #指單詞最遠可以聯絡的距離,設為1代表只能跟緊鄰的兩個單詞生成樣本,比如quick只能和前後的單詞生成(quick,the),(qucik,brown)
    global data_index   #單詞序號設為global,確保在呼叫該函式時,該變數可以被修改
    #python 中assert斷言是宣告其布林值必須為真的判定,其返回值為假,就會觸發異常
    assert batch_size % num_skips == 0  #skip-gram中引數的要求
    assert num_skips <= 2 * skip_window
    batch = np.ndarray(shape=(batch_size), dtype=np.int32)   #初始化batch,存放目標單詞
    labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)  #初始化labels,存放目標單詞的語境單詞們
    span = 2 * skip_window + 1  #對某個單詞建立相關樣本時會用到的單詞數量,包含目標單詞和它前後的單詞
    buffer = collections.deque(maxlen=span) #建立一個最大容量為span的deque,即雙向佇列

    for _ in range(span):
        buffer.append(data[data_index]) #從序號data_index開始, 把span個單詞的編碼順序讀入buffer作為初始值,迴圈完後,buffer填滿,裡面為目標單詞和需要的單詞
        data_index = (data_index + 1) % len(data)

    for i in range(batch_size // num_skips):    #批次的大小➗每個單詞生成的樣本數=該批次中單詞的數量
        target = skip_window    #即buffer中下標為skip_window的變數為目標單詞
        targets_to_avoid = [skip_window]    #避免使用的單詞列表,要預測的時語境,不包含單詞本身

        for j in range(num_skips):

            while target in targets_to_avoid:   #生成隨機數直到不在targets_to_avoid中,代表可以使用的語境單詞
                target = random.randint(0, span-1)

            targets_to_avoid.append(target)     #該語境單詞已使用,加入避免使用的單詞列表
            batch[i * num_skips + j] = buffer[skip_window]  #feature即目標詞彙
            labels[i * num_skips + j, 0] = buffer[target]   #label即當前語境單詞

        buffer.append(data[data_index]) #對一個目標單詞生成完所有樣本後,再讀入下一個單詞(同時會拋掉buffer中第一個單詞)
        data_index = (data_index + 1) % len(data)   #單詞序號+1

    #獲得了batch_size個訓練樣本,返回目標單詞編號batch陣列和其對應語境單詞編號labels陣列
    return batch, labels


'''測試word2vec訓練樣本生成'''
# batch, labels = generate_batch(batch_size=8, num_skips=2, skip_window=1)
# for i in range(8):
#     print(batch[i], reverse_dictionary[batch[i]], "-->", labels[i][0], reverse_dictionary[labels[i, 0]])


batch_size = 128
embedding_size = 128    #單詞轉為詞向量的維度,一般為50-1000這個範圍內的值
skip_window = 1
num_skips = 2

#生成驗證資料valid_samples
valid_size = 16     #用來抽取的驗證單詞數
valid_window = 100  #驗證單詞從頻數最高的100個單詞中抽取
valid_examples = np.random.choice(valid_window, valid_size, replace=False)  #從valid_window中隨機抽取valid_size個數字,返回一維陣列
num_sampled = 64    #訓練時用來做負樣本的噪聲單詞的數量


'''定義Skip-Gram Word2Vec模型的網路結構'''
graph = tf.Graph()
with graph.as_default():

    train_inputs = tf.placeholder(tf.int32, shape=[batch_size])
    train_labels = tf.placeholder(tf.int32, shape=[batch_size, 1])
    valid_dataset = tf.constant(valid_examples, dtype=tf.int32)     #驗證單詞的索引   shape(1, 16)

    with tf.device('/cpu:0'):   #限定所有計算在CPU上執行,因為接下去的一些計算操作在GPU上可能還沒有實現
        embeddings = tf.Variable(
            tf.random_uniform([vocabulary_size, embedding_size], -1.0, 1.0))   #隨機生成所有單詞的詞向量embeddings,範圍[-1, 1]

        embed = tf.nn.embedding_lookup(embeddings, train_inputs)    #在embeddings tensor中查詢輸入train_inputs編號對應的向量embed

        nce_weights = tf.Variable(  #使用tf.truncated_normal截斷的隨機正態分佈初始化NCE Loss中的權重引數nce_weights
            tf.truncated_normal([vocabulary_size, embedding_size], stddev=1.0 / math.sqrt(embedding_size)))

        nce_biases = tf.Variable(tf.zeros([vocabulary_size]))   #偏置初始化為0

    #使用tf.nn.nce_loss計算出詞向量embedding在訓練資料上的loss,並使用tf.reduce_mean進行彙總
    loss = tf.reduce_mean(tf.nn.nce_loss(weights=nce_weights,   #權重
                                         biases=nce_biases,     #偏置
                                         labels=train_labels,   #標記
                                         inputs=embed,          #
                                         num_sampled=num_sampled,   #負樣本噪聲單詞數量
                                         num_classes=vocabulary_size))  #可能的分類的數量(單詞數)

    optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)

    '''餘弦相似度計算'''
    #L2範數又叫“嶺迴歸”,作用是改善過擬合,L2範數計算方法:向量中各元素的平方和然後開根號
    norm = tf.sqrt(tf.reduce_sum(tf.square(embeddings), 1, keep_dims=True)) #計算嵌入向量embeddings的L2範數
    normalized_embeddings = embeddings / norm   #embeddings除以其L2範數得到標準化後的normalized_embeddings

    valid_embeddings = tf.nn.embedding_lookup(  #根據驗證單詞的索引valid_dataset,查詢驗證單詞的嵌入向量
        normalized_embeddings, valid_dataset)
    # 計算驗證單詞的嵌入向量與詞彙表中所有單詞的相似性, valid_embeddings * (normalized_embeddings的轉置)
    similarity = tf.matmul(valid_embeddings, normalized_embeddings, transpose_b=True)

    init = tf.global_variables_initializer()


'''視覺化Word2Vec,low_dim_embds為降維到2維的單詞的空間向量'''
def plot_with_labels(low_dim_embds, labels, filename='tsne.png'):
    assert low_dim_embds.shape[0] >= len(labels), "More labels than embeddings"
    plt.figure(figsize=(18, 18))    #圖片大小

    for i, label in enumerate(labels):
        x, y = low_dim_embds[i,:]
        plt.scatter(x, y)   #顯示散點圖
        plt.annotate(label,     #展示單詞本身
                     xy=(x, y),
                     xytext=(5, 2),
                     textcoords='offset points',
                     ha='right',
                     va='bottom')
    plt.savefig(filename)



'''測試'''
num_steps = 100001  #訓練100001輪

with tf.Session(graph=graph) as session:
    init.run()
    print("Initialized")

    average_loss = 0
    for step in range(num_steps):
        #獲得訓練資料和資料標記
        batch_inputs, batch_labels = generate_batch(batch_size, num_skips, skip_window)

        feed_dict = {train_inputs:batch_inputs, train_labels:batch_labels}

        _, loss_val = session.run([optimizer, loss], feed_dict=feed_dict)   #執行優化器運算和損失計算
        average_loss += loss_val    #損失值累加

        if step % 2000 == 0:    #每兩千輪計算幾次平均損失值
            if step > 0:
                average_loss /= 2000
            print("Average loss at step", step, ": ", average_loss)
            average_loss = 0

        #每一萬輪,計算一次驗證單詞與全部單詞的相似度,並將與每個驗證單詞最相近的8各單詞展示出來
        if step % 10000 == 0:
            # tensor.eval():在`Session`中評估這個張量。呼叫此方法將執行所有先前的操作,這些操作將生成生成此張量的操作所需的輸入。
            sim = similarity.eval()     #shape,(16, 50000)
            #print(tf.shape(sim).eval())

            for i in range(valid_size): #對每一個驗證單詞
                valid_word = reverse_dictionary[valid_examples[i]]  #根據前面隨機抽取的驗證單詞編號(即頻次),在反轉字典中取出該驗證單詞
                top_k = 8
                #.argsort()從小到大排列,返回其對應的索引,由於-sim(),所以返回的索引是相似度從大到小的
                nearest = (-sim[i, :]).argsort()[1:top_k+1]     #計算得到第i個驗證單詞相似度最接近的前8個單詞的索引
                log_str = "Nearest to %s:" % valid_word

                for k in range(top_k):
                    close_word = reverse_dictionary[nearest[k]]     #相似度最接近的第i個單詞
                    log_str = "%s %s," % (log_str, close_word)
                print(log_str)

    final_embeddings = normalized_embeddings.eval() #最終訓練完的詞向量矩陣


'''展示降維後的視覺化效果'''
#使用sklearn.manifold.TSNE實現降維,這裡直接將原始的128維嵌入向量降到2維
tsne = TSNE(perplexity=30,      #困惑度,預設30
            n_components=2,     #降到多少維
            init='pca',         #初始化的嵌入
            n_iter=5000)        #優化的最大迭代次數。至少應該是250。
plot_only = 150
low_dim_embs = tsne.fit_transform(final_embeddings[:plot_only, :])  #進行降維,輸入shape為 (n_samples, n_features) or (n_samples, n_samples)
labels = [reverse_dictionary[i] for i in range(plot_only)]
plot_with_labels(low_dim_embs, labels)  #用該視覺化函式進行展示


結果:

將訓練好的向量降維並可視化,如圖所示,數字等相近的單詞在空間中的位置相近