1. 程式人生 > >TensorFlow練手專案二:基於迴圈神經網路(RNN)的古詩生成器

TensorFlow練手專案二:基於迴圈神經網路(RNN)的古詩生成器

基於迴圈神經網路(RNN)的古詩生成器

之前在手機百度上看到有個“為你寫詩”功能,能夠隨機生成古詩,當時感覺很酷炫= =

在學習了深度學習後,瞭解了一下原理,打算自己做個實現練練手,於是,就有了這個專案。文中如有瑕疵紕漏之處,還請路過的諸位大佬不吝賜教,萬分感謝!

使用迴圈神經網路實現的古詩生成器,能夠完成古體詩的自動生成。我簡單地訓練了一下,格式是對上了,至於意境麼。。。emmm,呵呵

舉一下模型測試結果例子:

1.生成古體詩

示例1:

樹陰飛盡水三依,謾自為能厚景奇。
莫怪仙舟欲西望,楚人今此惜春風。

示例2:

巖外前苗點有泉,紫崖煙靄碧芊芊。
似僧月明秋更好,一蹤顏事欲猶傷?

2.生成藏頭詩(以“天空”為例)

示例:

天序曾柏烏傾魚,空老桐歌塵翁紅。

下面記錄專案實現過程(由於都是文字處理方面,跟前一個專案存在很多類似的內容,對於這部分內容,我就只簡單提一下,不展開了,新的東西再具體說):

1.資料預處理

資料集使用四萬首的唐詩訓練集,可以點選這裡進行下載。

# -*- coding: utf-8 -*-
# @Time    : 18-3-13 上午11:04
# @Author  : AaronJny
# @Email   : [email protected]
import sys

reload(sys)
sys.setdefaultencoding('utf8'
) import collections ORIGIN_DATA = 'origin_data/poetry.txt' # 源資料路徑 OUTPUT_DATA = 'processed_data/poetry.txt' # 輸出向量路徑 VOCAB_DATA = 'vocab/poetry.vocab' def word_to_id(word, id_dict): if word in id_dict: return id_dict[word] else: return id_dict['<unknow>'] poetry_list = [] # 存放唐詩的陣列
# 從檔案中讀取唐詩 with open(ORIGIN_DATA, 'r') as f: f_lines = f.readlines() print '唐詩總數 : {}'.format(len(f_lines)) # 逐行進行處理 for line in f_lines: # 去除前後空白符,轉碼 strip_line = line.strip().decode('utf8') try: # 將唐詩分為標題和內容 title, content = strip_line.split(':') except: # 出現多個':'的將被捨棄 continue # 去除內容中的空格 content = content.strip().replace(' ', '') # 捨棄含有非法字元的唐詩 if '(' in content or '(' in content or '<' in content or '《' in content or '_' in content or '[' in content: continue # 捨棄過短或過長的唐詩 lenth = len(content) if lenth < 20 or lenth > 100: continue # 加入列表 poetry_list.append('s' + content + 'e') print '用於訓練的唐詩數 : {}'.format(len(poetry_list)) poetry_list=sorted(poetry_list,key=lambda x:len(x)) words_list = [] # 獲取唐詩中所有的字元 for poetry in poetry_list: words_list.extend([word for word in poetry]) # 統計其出現的次數 counter = collections.Counter(words_list) # 排序 sorted_words = sorted(counter.items(), key=lambda x: x[1], reverse=True) # 獲得出現次數降序排列的字元列表 words_list = ['<unknow>'] + [x[0] for x in sorted_words] # 這裡選擇保留高頻詞的數目,詞只有不到七千個,所以我全部保留 words_list = words_list[:len(words_list)] print '詞彙表大小 : {}'.format(words_list) with open(VOCAB_DATA, 'w') as f: for word in words_list: f.write(word + '\n') # 生成單詞到id的對映 word_id_dict = dict(zip(words_list, range(len(words_list)))) # 將poetry_list轉換成向量形式 id_list=[] for poetry in poetry_list: id_list.append([str(word_to_id(word,word_id_dict)) for word in poetry]) # 將向量寫入檔案 with open(OUTPUT_DATA, 'w') as f: for id_l in id_list: f.write(' '.join(id_l) + '\n')

2.模型編寫

這裡要編寫兩個模型,一個用於訓練,一個用於驗證(生成古體詩)。兩個模型大體上一致,因為用途不同,所以有些細節有出入。當進行驗證時,驗證模型讀取訓練模型的引數進行覆蓋。

註釋比較細,就不多說了,看程式碼。對於兩個模型不同的一些關鍵細節,我也用註釋進行了說明。

# -*- coding: utf-8 -*-
# @Time    : 18-3-13 下午2:06
# @Author  : AaronJny
# @Email   : [email protected]
import tensorflow as tf
import functools
import setting

HIDDEN_SIZE = 128  # LSTM隱藏節點個數
NUM_LAYERS = 2  # RNN深度


def doublewrap(function):
    @functools.wraps(function)
    def decorator(*args, **kwargs):
        if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
            return function(args[0])
        else:
            return lambda wrapee: function(wrapee, *args, **kwargs)

    return decorator


@doublewrap
def define_scope(function, scope=None, *args, **kwargs):
    attribute = '_cache_' + function.__name__
    name = scope or function.__name__

    @property
    @functools.wraps(function)
    def decorator(self):
        if not hasattr(self, attribute):
            with tf.variable_scope(name, *args, **kwargs):
                setattr(self, attribute, function(self))
        return getattr(self, attribute)

    return decorator


class TrainModel(object):
    """
    訓練模型
    """

    def __init__(self, data, labels, emb_keep, rnn_keep):
        self.data = data  # 資料
        self.labels = labels  # 標籤
        self.emb_keep = emb_keep  # embedding層dropout保留率
        self.rnn_keep = rnn_keep  # lstm層dropout保留率
        self.global_step
        self.cell
        self.predict
        self.loss
        self.optimize

    @define_scope
    def cell(self):
        """
        rnn網路結構
        :return:
        """
        lstm_cell = [
            tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE), output_keep_prob=self.rnn_keep) for
            _ in range(NUM_LAYERS)]
        cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cell)
        return cell

    @define_scope
    def predict(self):
        """
        定義前向傳播
        :return:
        """
        # 建立詞嵌入矩陣權重
        embedding = tf.get_variable('embedding', shape=[setting.VOCAB_SIZE, HIDDEN_SIZE])
        # 建立softmax層引數
        if setting.SHARE_EMD_WITH_SOFTMAX:
            softmax_weights = tf.transpose(embedding)
        else:
            softmax_weights = tf.get_variable('softmaweights', shape=[HIDDEN_SIZE, setting.VOCAB_SIZE])
        softmax_bais = tf.get_variable('softmax_bais', shape=[setting.VOCAB_SIZE])
        # 進行詞嵌入
        emb = tf.nn.embedding_lookup(embedding, self.data)
        # dropout
        emb_dropout = tf.nn.dropout(emb, self.emb_keep)
        # 計算迴圈神經網路的輸出
        self.init_state = self.cell.zero_state(setting.BATCH_SIZE, dtype=tf.float32)
        outputs, last_state = tf.nn.dynamic_rnn(self.cell, emb_dropout, scope='d_rnn', dtype=tf.float32,
                                                initial_state=self.init_state)
        outputs = tf.reshape(outputs, [-1, HIDDEN_SIZE])
        # 計算logits
        logits = tf.matmul(outputs, softmax_weights) + softmax_bais
        return logits

    @define_scope
    def loss(self):
        """
        定義損失函式
        :return:
        """
        # 計算交叉熵
        outputs_target = tf.reshape(self.labels, [-1])
        loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.predict, labels=outputs_target, )
        # 平均
        cost = tf.reduce_mean(loss)
        return cost

    @define_scope
    def global_step(self):
        """
        global_step
        :return:
        """
        global_step = tf.Variable(0, trainable=False)
        return global_step

    @define_scope
    def optimize(self):
        """
        定義反向傳播過程
        :return:
        """
        # 學習率衰減
        learn_rate = tf.train.exponential_decay(setting.LEARN_RATE, self.global_step, setting.LR_DECAY_STEP,
                                                setting.LR_DECAY)
        # 計算梯度,並防止梯度爆炸
        trainable_variables = tf.trainable_variables()
        grads, _ = tf.clip_by_global_norm(tf.gradients(self.loss, trainable_variables), setting.MAX_GRAD)
        # 建立優化器,進行反向傳播
        optimizer = tf.train.AdamOptimizer(learn_rate)
        train_op = optimizer.apply_gradients(zip(grads, trainable_variables), self.global_step)
        return train_op


class EvalModel(object):
    """
    驗證模型
    """

    def __init__(self, data, emb_keep, rnn_keep):
        self.data = data  # 輸入
        self.emb_keep = emb_keep  # embedding層dropout保留率
        self.rnn_keep = rnn_keep  # lstm層dropout保留率
        self.cell
        self.predict
        self.prob

    @define_scope
    def cell(self):
        """
        rnn網路結構
        :return:
        """
        lstm_cell = [
            tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE), output_keep_prob=self.rnn_keep) for
            _ in range(NUM_LAYERS)]
        cell = tf.nn.rnn_cell.MultiRNNCell(lstm_cell)
        return cell

    @define_scope
    def predict(self):
        """
        定義前向傳播過程
        :return:
        """
        embedding = tf.get_variable('embedding', shape=[setting.VOCAB_SIZE, HIDDEN_SIZE])

        if setting.SHARE_EMD_WITH_SOFTMAX:
            softmax_weights = tf.transpose(embedding)
        else:
            softmax_weights = tf.get_variable('softmaweights', shape=[HIDDEN_SIZE, setting.VOCAB_SIZE])
        softmax_bais = tf.get_variable('softmax_bais', shape=[setting.VOCAB_SIZE])

        emb = tf.nn.embedding_lookup(embedding, self.data)
        emb_dropout = tf.nn.dropout(emb, self.emb_keep)
        # 與訓練模型不同,這裡只要生成一首古體詩,所以batch_size=1
        self.init_state = self.cell.zero_state(1, dtype=tf.float32)
        outputs, last_state = tf.nn.dynamic_rnn(self.cell, emb_dropout, scope='d_rnn', dtype=tf.float32,
                                                initial_state=self.init_state)
        outputs = tf.reshape(outputs, [-1, HIDDEN_SIZE])

        logits = tf.matmul(outputs, softmax_weights) + softmax_bais
        # 與訓練模型不同,這裡要記錄最後的狀態,以此來迴圈生成字,直到完成一首詩
        self.last_state = last_state
        return logits

    @define_scope
    def prob(self):
        """
        softmax計算概率
        :return:
        """
        probs = tf.nn.softmax(self.predict)
        return probs

3.組織資料集

編寫一個類用於組織資料,方便訓練使用。程式碼很簡單,應該不存在什麼問題。

# -*- coding: utf-8 -*-
# @Time    : 18-3-13 上午11:59
# @Author  : AaronJny
# @Email   : [email protected]
import numpy as np

BATCH_SIZE = 64
DATA_PATH = 'processed_data/poetry.txt'


class Dataset(object):
    def __init__(self, batch_size):
        self.batch_size = batch_size
        self.data, self.target = self.read_data()
        self.start = 0
        self.lenth = len(self.data)

    def read_data(self):
        """
        從檔案中讀取資料,構建資料集
        :return: 訓練資料,訓練標籤
        """
        # 從檔案中讀取唐詩向量
        id_list = []
        with open(DATA_PATH, 'r') as f:
            f_lines = f.readlines()
            for line in f_lines:
                id_list.append([int(num) for num in line.strip().split()])
        # 計算可以生成多少個batch
        num_batchs = len(id_list) // self.batch_size
        # data和target
        x_data = []
        y_data = []
        # 生成batch
        for i in range(num_batchs):
            # 擷取一個batch的資料
            start = i * self.batch_size
            end = start + self.batch_size
            batch = id_list[start:end]
            # 計算最大長度
            max_lenth = max(map(len, batch))
            # 填充
            tmp_x = np.full((self.batch_size, max_lenth), 0, dtype=np.int32)
            # 資料覆蓋
            for row in range(self.batch_size):
                tmp_x[row, :len(batch[row])] = batch[row]
            tmp_y = np.copy(tmp_x)
            tmp_y[:, :-1] = tmp_y[:, 1:]
            x_data.append(tmp_x)
            y_data.append(tmp_y)
        return x_data, y_data

    def next_batch(self):
        """
        獲取下一個batch
        :return:
        """
        start = self.start
        self.start += 1
        if self.start >= self.lenth:
            self.start = 0
        return self.data[start], self.target[start]


if __name__ == '__main__':
    dataset = Dataset(BATCH_SIZE)
    dataset.read_data()

4.訓練模型

萬事俱備,開始訓練。

沒有按照epoch進行訓練,這裡只是迴圈訓練指定個mini_batch。

訓練過程中,會定期顯示當前訓練步數以及loss值。會定期儲存當前模型及對應checkpoint。

訓練程式碼:

# -*- coding: utf-8 -*-
# @Time    : 18-3-13 下午2:50
# @Author  : AaronJny
# @Email   : [email protected]
import tensorflow as tf
from rnn_models import TrainModel
import dataset
import setting

TRAIN_TIMES = 30000  # 迭代總次數(沒有計算epoch)
SHOW_STEP = 1  # 顯示loss頻率
SAVE_STEP = 100  # 儲存模型引數頻率

x_data = tf.placeholder(tf.int32, [setting.BATCH_SIZE, None])  # 輸入資料
y_data = tf.placeholder(tf.int32, [setting.BATCH_SIZE, None])  # 標籤
emb_keep = tf.placeholder(tf.float32)  # embedding層dropout保留率
rnn_keep = tf.placeholder(tf.float32)  # lstm層dropout保留率

data = dataset.Dataset(setting.BATCH_SIZE)  # 建立資料集

model = TrainModel(x_data, y_data, emb_keep, rnn_keep)  # 建立訓練模型

saver = tf.train.Saver()

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())  # 初始化
    for step in range(TRAIN_TIMES):
        # 獲取訓練batch
        x, y = data.next_batch()
        # 計算loss
        loss, _ = sess.run([model.loss, model.optimize],
                           {model.data: x, model.labels: y, model.emb_keep: setting.EMB_KEEP,
                            model.rnn_keep: setting.RNN_KEEP})
        if step % SHOW_STEP == 0:
            print 'step {}, loss is {}'.format(step, loss)
        # 儲存模型
        if step % SAVE_STEP == 0:
            saver.save(sess, setting.CKPT_PATH, global_step=model.global_step)

5.驗證模型

提供兩種方法驗證模型:

  • 隨機生成古體詩

  • 生成藏頭詩

隨機生成的結果勉強可以接受,起碼格式對了,看起來也像個樣子。

生成藏頭詩就五花八門了,效果不好,往往要多次才能生成一個差強人意的。emmm,其實也可以理解,畢竟我們指定的“藏頭”在訓練集中的分佈是不能保證的。

這裡簡單說一下生成古體詩的過程:

  • 1.首先,讀取訓練模型儲存的引數,覆蓋驗證模型的引數

  • 2.將開始符號’s’作為輸入,餵給模型,模型將輸出下一個字元為此表中各詞的概率,以及rnn傳遞的state。注意,驗證模型時,dropout的保留率應設定為1.0

  • 3.根據2中輸出的概率,使用輪盤賭法,隨機出下一個字

  • 4.將隨機出來的字作為輸入,前一次輸出的state作為本次輸入的state,餵給模型,模型將輸入下一個字元為此表中各詞的概率,以及rnn傳遞的state

  • 5.重複3,4步驟,直到隨機出結束符’e’,生成結束。過程中生成的所有字元,構成本次生成的古體詩(’s’和’e’不算)

生成藏頭詩的過程與生成古體詩是類似的,主要區別在於,在開始和每個標點符號被預測出來時,向模型餵給的是“藏頭”中的一個字,就不多說了,詳情可參考程式碼。

# -*- coding: utf-8 -*-
# @Time    : 18-3-13 下午2:50
# @Author  : AaronJny
# @Email   : [email protected]
import sys

reload(sys)
sys.setdefaultencoding('utf8')
import tensorflow as tf
import numpy as np
from rnn_models import EvalModel
import utils
import os

# 指定驗證時不使用cuda,這樣可以在用gpu訓練的同時,使用cpu進行驗證
os.environ['CUDA_VISIBLE_DEVICES'] = ''

x_data = tf.placeholder(tf.int32, [1, None])

emb_keep = tf.placeholder(tf.float32)

rnn_keep = tf.placeholder(tf.float32)

# 驗證用模型
model = EvalModel(x_data, emb_keep, rnn_keep)

saver = tf.train.Saver()
# 單詞到id的對映
word2id_dict = utils.read_word_to_id_dict()
# id到單詞的對映
id2word_dict = utils.read_id_to_word_dict()


def generate_word(prob):
    """
    選擇概率最高的前100個詞,並用輪盤賭法選取最終結果
    :param prob: 概率向量
    :return: 生成的詞
    """
    prob = sorted(prob, reverse=True)[:100]
    index = np.searchsorted(np.cumsum(prob), np.random.rand(1) * np.sum(prob))
    return id2word_dict[int(index)]


# def generate_word(prob):
#     """
#     從所有詞中,使用輪盤賭法選取最終結果
#     :param prob: 概率向量
#     :return: 生成的詞
#     """
#     index = int(np.searchsorted(np.cumsum(prob), np.random.rand(1) * np.sum(prob)))
#     return id2word_dict[index]


def generate_poem():
    """
    隨機生成一首詩歌
    :return:
    """
    with tf.Session() as sess:
        # 載入最新的模型
        ckpt = tf.train.get_checkpoint_state('ckpt')
        saver.restore(sess, ckpt.model_checkpoint_path)
        # 預測第一個詞
        rnn_state = sess.run(model.cell.zero_state(1, tf.float32))
        x = np.array([[word2id_dict['s']]], np.int32)
        prob, rnn_state = sess.run([model.prob, model.last_state],
                                   {model.data: x, model.init_state: rnn_state, model.emb_keep: 1.0,
                                    model.rnn_keep: 1.0})
        word = generate_word(prob)
        poem = ''
        # 迴圈操作,直到預測出結束符號‘e’
        while word != 'e':
            poem += word
            x = np.array([[word2id_dict[word]]])
            prob, rnn_state = sess.run([model.prob, model.last_state],
                                       {model.data: x, model.init_state: rnn_state, model.emb_keep: 1.0,
                                        model.rnn_keep: 1.0})
            word = generate_word(prob)
        # 列印生成的詩歌
        print poem


def generate_acrostic(head):
    """
    生成藏頭詩
    :param head:每行的第一個字組成的字串
    :return:
    """
    with tf.Session() as sess:
        # 載入最新的模型
        ckpt = tf.train.get_checkpoint_state('ckpt')
        saver.restore(sess, ckpt.model_checkpoint_path)
        # 進行預測
        rnn_state = sess.run(model.cell.zero_state(1, tf.float32))
        poem = ''
        cnt = 1
        # 一句句生成詩歌
        for x in head:
            word = x
            while word != ',' and word != '。':
                poem += word
                x = np.array([[word2id_dict[word]]])
                prob, rnn_state = sess.run([model.prob, model.last_state],
                                           {model.data: x, model.init_state: rnn_state, model.emb_keep: 1.0,
                                            model.rnn_keep: 1.0})
                word = generate_word(prob)
                if len(poem) > 25:
                    print 'bad.'
                    break
            # 根據單雙句新增標點符號
            if cnt & 1:
                poem += ','
            else:
                poem += '。'
            cnt += 1
        # 列印生成的詩歌
        print poem
        return poem


if __name__ == '__main__':
    # generate_acrostic(u'天空')
    generate_poem()

6.一些提取出來的方法和配置

很簡單,不多說。

utils.py

# -*- coding: utf-8 -*-
# @Time    : 18-3-13 下午4:16
# @Author  : AaronJny
# @Email   : [email protected]
import setting

def read_word_list():
    """
    從檔案讀取詞彙表
    :return: 詞彙列表
    """
    with open(setting.VOCAB_PATH, 'r') as f:
        word_list = [word for word in f.read().decode('utf8').strip().split('\n')]
    return word_list

def read_word_to_id_dict():
    """
    生成單詞到id的對映
    :return:
    """
    word_list=read_word_list()
    word2id=dict(zip(word_list,range(len(word_list))))
    return word2id

def read_id_to_word_dict():
    """
    生成id到單詞的對映
    :return:
    """
    word_list=read_word_list()
    id2word=dict(zip(range(len(word_list)),word_list))
    return id2word


if __name__ == '__main__':
    read_id_to_word_dict()

setting.py

# -*- coding: utf-8 -*-
# @Time    : 18-3-13 下午3:08
# @Author  : AaronJny
# @Email   : [email protected]


VOCAB_SIZE = 6272  # 詞彙表大小

SHARE_EMD_WITH_SOFTMAX = True  # 是否在embedding層和softmax層之間共享引數

MAX_GRAD = 5.0  # 最大梯度,防止梯度爆炸

LEARN_RATE = 0.0005  # 初始學習率

LR_DECAY = 0.92  # 學習率衰減

LR_DECAY_STEP = 600  # 衰減步數

BATCH_SIZE = 64  # batch大小

CKPT_PATH = 'ckpt/model_ckpt'  # 模型儲存路徑

VOCAB_PATH = 'vocab/poetry.vocab'  # 詞表路徑

EMB_KEEP = 0.5  # embedding層dropout保留率

RNN_KEEP = 0.5  # lstm層dropout保留率

7.完畢

編碼到此結束,有興趣的朋友可以自己跑一跑,玩一玩,我就不多做測試了。

博主也正在學習,能力淺薄,文中如有瑕疵紕漏之處,還請路過的諸位大佬不吝賜教,萬分感謝!