1. 程式人生 > >基於RNN的文字分類模型(Tensorflow)

基於RNN的文字分類模型(Tensorflow)

基於LSTM(Long-Short Term Memory,長短時記憶人工神經網路,RNN的一種)搭建一個文字意圖分類的深度學習模型(基於Python3和Tensorflow1.2),其結構圖如下:


如圖1所示,整個模型包括兩部分

第一部分:句子特徵提取

Step1 讀取資料(這裡是經過結巴分詞後的句子),按比例劃分訓練集和驗證集,這裡每個句子都生成了相應的mask向量,用以標記每個輸入文字的實際長度(在後期的模型中根據mask向量將padding為0部分所對應的隱藏層輸出砍掉)。這裡有幾個可選項:

1. reverse: 考慮到句子中越靠後的詞重要程度越高,因此可對句子進行逆序輸入;

2. enhance: 樣本數較小的時候可選擇資料增強,即打亂句子順序來構建新樣本;

3. sort_by_len: 對句子按照長短進行排序

4. shuffle:打亂樣本順序,隨機取樣

import numpy as np
import sys
sys.path.append("..")
import random

# file path
# dataset_path = '/data/PycharmProjects/question_matching_framework/work_space/example/dataset/aaa'

def load_cn_data_from_files(classify_files):
    count = len(classify_files)
    x_text = []
    y = []
    for index in range(count):
        classify_file = classify_files[index]
        lines = list(open(classify_file, "r").readlines())
        label = [0] * count
        label[index] = 1
        labels = [label for _ in lines]
        if index == 0:
            x_text = lines
            y = labels
        else:
            x_text = x_text + lines
            y = np.concatenate([y, labels])
    x_text = [clean_str_cn(sent) for sent in x_text]
    return [x_text, y]

def clean_str_cn(string):
    """
    Tokenization/string cleaning for all datasets except for SST.
    Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py
    """
    return string.strip().lower()

def load_data(classify_files, config, sort_by_len=True, enhance = True, reverse=True):
    x_text, y = load_cn_data_from_files(classify_files)

    new_text = []
    if reverse == True:
        for text in x_text:
            text_list = text.strip().split(' ')
            text_list.reverse()
            reversed_text = ' '.join(text_list)
            new_text.append(reversed_text)
        x_text = new_text
    else:
        pass

    y = list(y)

    original_dataset = list(zip(x_text, y))

    if enhance == True:
        num_sample = len(original_dataset)

        # shuffle
        for i in range(num_sample):
            text_list = original_dataset[i][0].split(' ')
            random.shuffle(text_list)
            text_shuffled = ' '.join(text_list)
            label_shuffled = original_dataset[i][1]
            x_text.append(text_shuffled)
            y.append(label_shuffled)

    else:
        pass

    # Randomly shuffle data
    shuffle_indices = list(range(len(y)))
    random.shuffle(shuffle_indices)
    # print(shuffle_indices)
    x_shuffled = []
    y_shuffled_tmp = []
    for shuffle_indice in shuffle_indices:
        x_shuffled.append(x_text[shuffle_indice])
        y_shuffled_tmp.append(y[shuffle_indice])
    y_shuffled = np.array(y_shuffled_tmp)

    # train_set length
    n_samples = len(x_shuffled)
    # shuffle and generate train and valid data set
    sidx = np.random.permutation(n_samples)
    n_train = int(np.round(n_samples * (1. - config.valid_portion)))
    print("Train/Test split: {:d}/{:d}".format(n_train, (n_samples - n_train)))
    valid_set_x = [x_shuffled[s] for s in sidx[n_train:]]
    valid_set_y = [y_shuffled[s] for s in sidx[n_train:]]
    train_set_x = [x_shuffled[s] for s in sidx[:n_train]]
    train_set_y = [y_shuffled[s] for s in sidx[:n_train]]

    train_set = (train_set_x, train_set_y)
    valid_set = (valid_set_x, valid_set_y)
    # test_set = (x_test, y_test)

    # test_set_x, test_set_y = test_set
    valid_set_x, valid_set_y = valid_set
    train_set_x, train_set_y = train_set

    def len_argsort(seq):
        return sorted(range(len(seq)), key=lambda x: len(seq[x]))

    if sort_by_len:

        sorted_index = len_argsort(valid_set_x)
        valid_set_x = [valid_set_x[i] for i in sorted_index]
        valid_set_y = [valid_set_y[i] for i in sorted_index]

        sorted_index = len_argsort(train_set_x)
        train_set_x = [train_set_x[i] for i in sorted_index]
        train_set_y = [train_set_y[i] for i in sorted_index]

    train_set=(train_set_x,train_set_y)
    valid_set=(valid_set_x,valid_set_y)

    max_len = config.num_step

    def generate_mask(data_set):
        set_x = data_set[0]
        mask_x = np.zeros([max_len, len(set_x)])
        for i,x in enumerate(set_x):
            x_list = x.split(' ')
            if len(x_list) < max_len:
                mask_x[0:len(x_list), i] = 1
            else:
                mask_x[:, i] = 1
        new_set = (set_x, data_set[1], mask_x)
        return new_set

    train_set = generate_mask(train_set)
    valid_set = generate_mask(valid_set)

    train_data = (train_set[0], train_set[1], train_set[2])
    valid_data = (valid_set[0], valid_set[1], valid_set[2])

    return train_data, valid_data

# return batch data set
def batch_iter(data,batch_size, shuffle = True):
    # get data set and label
    x, y, mask_x = data

    # mask_x = np.array(mask_x)
    mask_x = np.asarray(mask_x).T.tolist()

    data_size = len(x)
    if shuffle:
        shuffle_indices = list(range(data_size))
        random.shuffle(shuffle_indices)
        shuffled_x = []
        shuffled_y = []
        shuffled_mask_x = []

        for shuffle_indice in shuffle_indices:
            shuffled_x.append(x[shuffle_indice])
            shuffled_y.append(y[shuffle_indice])
            shuffled_mask_x.append(mask_x[shuffle_indice])
    else:
        shuffled_x = x
        shuffled_y = y
        shuffled_mask_x = mask_x

    shuffled_mask_x = np.asarray(shuffled_mask_x).T  # .tolist()

    shuffled_x = np.array(shuffled_x)
    shuffled_y = np.array(shuffled_y)
    shuffled_mask_x = np.array(shuffled_mask_x)

    # num_batches_per_epoch=int((data_size-1)/batch_size) + 1
    num_batches_per_epoch = data_size // batch_size

    for batch_index in range(num_batches_per_epoch):
        start_index=batch_index*batch_size
        end_index=min((batch_index+1)*batch_size,data_size)
        return_x = shuffled_x[start_index:end_index]
        return_y = shuffled_y[start_index:end_index]
        return_mask_x = shuffled_mask_x[:,start_index:end_index]

        yield (return_x,return_y,return_mask_x)

Step2 對輸入到模型中的句子進行Word Embedding,將每個詞表示成一個數值型的詞向量。這個過程中對於不同長度的問題文字,pad和截斷成一樣長度的。太短的就補空格,太長的就截斷。從而構建維數一致的模型句向量輸入。(這裡呼叫了別人訓練好的詞向量模型word2vec.bin)

x_embedded = wv.embedding_lookup(len(list(x)), config.num_step, config.embed_dim, list(x), 0)

第二部分:基於RNN的分類器模型


每個詞經過embedding之後,進入LSTM層,這裡用的是標準的LSTM,然後經過一個時間序列得到的n

個隱藏LSTM神經單元的向量,這些向量經過mean pooling層之後,可以得到一個向量h,然後緊接著是一個Softmax層,得到一個類別分佈概率向量,取概率值最大的類別作為最終預測結果。

import inspect
import tensorflow as tf

class RNN_Model(object):

    def __init__(self, config, num_classes, is_training=True):

        keep_prob = config.keep_prob
        batch_size = config.batch_size

        num_step = config.num_step
        embed_dim = config.embed_dim
        self.embedded_x = tf.placeholder(tf.float32, [None, num_step, embed_dim], name="embedded_chars")
        self.target = tf.placeholder(tf.int64, [None, num_classes], name='target')
        self.mask_x = tf.placeholder(tf.float32, [num_step, None], name="mask_x")

        hidden_neural_size=config.hidden_neural_size
        hidden_layer_num=config.hidden_layer_num

        # build LSTM network
        def lstm_cell():
            if 'reuse' in inspect.signature(tf.contrib.rnn.BasicLSTMCell.__init__).parameters:
                return tf.contrib.rnn.BasicLSTMCell(hidden_neural_size, forget_bias=0.0,
                                                    state_is_tuple=True,
                                                    reuse=tf.get_variable_scope().reuse)
            else:
                return tf.contrib.rnn.BasicLSTMCell(
                    hidden_neural_size, forget_bias=0.0, state_is_tuple=True)

        attn_cell = lstm_cell

        if is_training and keep_prob < 1:
            def attn_cell():
                return tf.contrib.rnn.DropoutWrapper(
                    lstm_cell(), output_keep_prob=config.keep_prob)

        cell = tf.contrib.rnn.MultiRNNCell(
            [attn_cell() for _ in range(hidden_layer_num)], state_is_tuple=True)

        self._initial_state = cell.zero_state(batch_size, dtype=tf.float32)

        inputs = self.embedded_x

        if keep_prob < 1:
            inputs = tf.nn.dropout(inputs, keep_prob)

        out_put = []
        state = self._initial_state
        with tf.variable_scope("LSTM_layer"):
            for time_step in range(num_step):
                if time_step > 0: tf.get_variable_scope().reuse_variables()
                (cell_output, state) = cell(inputs[:, time_step,:],state)
                out_put.append(cell_output)

        out_put=out_put*self.mask_x[:,:,None]

        with tf.name_scope("mean_pooling_layer"):
            out_put = tf.reduce_sum(out_put,0)/(tf.reduce_sum(self.mask_x,0)[:,None])

        with tf.name_scope("Softmax_layer_and_output"):
            softmax_w = tf.get_variable("softmax_w",[hidden_neural_size,num_classes],dtype=tf.float32)
            softmax_b = tf.get_variable("softmax_b",[num_classes],dtype=tf.float32)
            # self.logits = tf.matmul(out_put,softmax_w)
            # self.scores = tf.add(self.logits, softmax_b, name='scores')
            self.scores = tf.nn.xw_plus_b(out_put, softmax_w, softmax_b, name="scores")

        with tf.name_scope("loss"):
            self.loss = tf.nn.softmax_cross_entropy_with_logits(labels=self.target, logits=self.scores + 1e-10)
            self.cost = tf.reduce_mean(self.loss)

        with tf.name_scope("accuracy"):
            self.prediction = tf.argmax(self.scores, 1, name="prediction")
            correct_prediction = tf.equal(self.prediction, tf.argmax(self.target, 1))
            self.correct_num = tf.reduce_sum(tf.cast(correct_prediction, tf.float32))
            self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
            self.probability = tf.nn.softmax(self.scores, name="probability")

        # add summary
        loss_summary = tf.summary.scalar("loss", self.cost)
        # add summary
        accuracy_summary = tf.summary.scalar("accuracy_summary", self.accuracy)

        if not is_training:
            return

        self.global_step = tf.Variable(0, name="global_step", trainable=False)
        self.lr = tf.Variable(0.0, trainable=False)

        tvars = tf.trainable_variables()
        grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tvars), config.max_grad_norm)

        # Keep track of gradient values and sparsity (optional)
        grad_summaries = []
        for g, v in zip(grads, tvars):
            if g is not None:
                grad_hist_summary = tf.summary.histogram("{}/grad/hist".format(v.name), g)
                sparsity_summary = tf.summary.scalar("{}/grad/sparsity".format(v.name), tf.nn.zero_fraction(g))
                grad_summaries.append(grad_hist_summary)
                grad_summaries.append(sparsity_summary)
        self.grad_summaries_merged = tf.summary.merge(grad_summaries)

        self.summary = tf.summary.merge([loss_summary,accuracy_summary,self.grad_summaries_merged])

        optimizer = tf.train.GradientDescentOptimizer(self.lr)
        optimizer.apply_gradients(zip(grads, tvars))
        self.train_op=optimizer.apply_gradients(zip(grads, tvars))

        self.new_lr = tf.placeholder(tf.float32,shape=[],name="new_learning_rate")
        self._lr_update = tf.assign(self.lr,self.new_lr)

    def assign_new_lr(self,session,lr_value):
        session.run(self._lr_update,feed_dict={self.new_lr:lr_value})

舉例(QA中的問題意圖分類):

輸入:你好 呀

意圖類別:greeting


具體程式碼參見程式碼