【Language model】使用RNN LSTM訓練語言模型 寫出45°角仰望星空的文章
開篇
這篇文章主要是實戰內容,不涉及一些原理介紹,原理介紹為大家提供一些比較好的連結:
1. Understanding LSTM Networks :
RNN與LSTM最為著名的文章,貼圖和內容都恰到好處,為研究人員提供很好的參考價值。
中文漢化版:(譯)理解 LSTM 網路 (Understanding LSTM Networks by colah)
2.Recurrent Neural Networks Tutorial, Part 1 – Introduction to RNNs
與上一篇文章類似,都是RNN中最受歡迎且被大量引用的文章。
國內中文一篇比較好的文章,大多內容來自對國外論文的翻譯,但是翻譯得恰到好處,值得一讀。另外作者是比較優秀的,可以看看他的其他文章,吸收一下長處。
內容貼圖通俗易懂,國外的好像都是比較喜歡講清楚原理的。
實戰內容
本專案git地址:
注意此程式碼多次除錯,目前可用。如果出現bug情況,請清空一下生成檔案,從頭執行。
前排定義一下訓練引數
import os # 訓練迴圈次數 num_epochs = 50 # batch大小 batch_size = 256 # lstm層中包含的unit個數 rnn_size = 256 # lstm層數 num_layers = 3 # 訓練步長 seq_length = 30 # 學習率 learning_rate = 0.001 #dropout keep output_keep_prob = 0.8 input_keep_prob = 1.0 # 優化器 grad_clip = 5. decay_rate = 0.97 init_from = None save_every = 1000 # 儲存模型 save_dir = './save' if not os.path.isdir(save_dir): os.makedirs(save_dir) assert False, "你為建立儲存模型檔案,已為你建立 資料夾名:save" # 儲存logs log_dir = './logs' if not os.path.isdir(log_dir): os.makedirs(log_dir) assert False, "你為建立logs檔案,已為你建立 資料夾名:logs" # 儲存資料和詞彙 data_dir = './temp' if not os.path.isdir(data_dir): os.makedirs(data_dir) assert False, "你為建立資料儲存檔案,已為你建立 資料夾名:temp" input_file = os.path.join(data_dir, "爵跡I II.txt") if not os.path.exists(input_file): print('請將郭小四的小說放到temp資料夾下....') vocab_file = os.path.join(data_dir, "vocab.pkl") tensor_file = os.path.join(data_dir, "data.npy") _file = os.path.join(save_dir, 'chars_vocab.pkl')
首先載入資料集
使用到的是爵跡
這本小說
無論小說和電影都能給人很深刻的印象....
with open(input_file, 'r',encoding = 'gbk') as f:
text = f.read()
預覽一下部分內容
果然一股東方神話、字裡行間透露出45度角仰望天空的憂傷氣息撲面而來
text[500:800]
'而來?傳說中至高無上的【白銀祭司】又掌握著怎樣的真相?這場曠世之戰,究竟要將主角的命運引向王者的寶
座, 還是慘烈的死亡?\n\n \n\n 序章 神遇\n\n \n\n 漫天翻滾的碎雪,彷彿巨獸抖落的白色
絨毛,紛紛揚揚地遮蔽著視線。\n\n 這塊大陸的冬天已經來臨。\n\n 南方只是開始不易察覺地降溫,
凌晨的時候窗櫺上會看見霜花,但是在這裡——大陸接近極北的盡頭,已經是一望無際的蒼茫肅殺。
大塊大塊浮動 在海面上的冰山彼此不時地撞擊著,在天地間發出巨大的銳利轟鳴聲,坍塌的冰塊砸進大海,
掀起白色的浪濤。遼 闊的黑色凍土在接連幾天的大雪之後,變成了一片茫茫的雪原。這已經是深北之地了,連綿不斷'
- 做一些資料預處理,去掉一寫無關的字元和空格,去掉書籍前幾行沒用的介紹
import re
pattern = re.compile('\[.*\]|<.*>|\.+|【|】| +|\\r|\\n')
text = pattern.sub('', text.strip())
text[500:800]
'巨獸抖落的白色絨毛,紛紛揚揚地遮蔽著視線。這塊大陸的冬天已經來臨。南方只是開始不易察覺地降溫,
凌晨的時候窗櫺上會看見霜花,但是在這裡——大陸接近極北的盡頭,已經是一望無際的蒼茫肅殺。
大塊大塊浮動在海面上的冰山彼此不時地撞擊著,在天地間發出巨大的銳利轟鳴聲,坍塌的冰塊砸進大海,
掀起白色的浪濤。遼闊的黑色凍土在接連幾天的大雪之後,變成了一片茫茫的雪原。
這已經是深北之地了,連綿不斷的冰川彷彿怪獸的利齒般將天地的盡頭緊緊咬在一起,
地平線消失在刺眼的白色冰面之下。天空被厚重的雲層遮擋,光線彷彿蒙著一層塵埃,
混沌地灑向大地。混沌的風雪在空曠的天地間吹出一陣又一陣彷彿狼嗥般的淒厲聲響。拳頭大小的紛亂大雪裡,'
感覺預處理後效果還可以.沒那麼亂了,開始做詞對映
- 首先做詞頻統計,再降序排序,因為用的是char級的所以這一步是沒什麼必要的,統計有多少個漢字和字元,其實可以用
chars=set(text)
代替 - 將統計結果作為語料庫,存入本地pkl檔案中,方便呼叫
import collections
from six.moves import cPickle
counter = collections.Counter(text)
counter = sorted(counter.items(), key=lambda x: -x[1])
chars, _ = zip(*counter)
with open(vocab_file, 'wb') as f:
cPickle.dump(chars, f)
對詞彙表字符(包括\n哦)做一個數字索引,並用這個數字索引來代替這個漢字
儲存字詞對映表
vocab_size = len(chars)
vocab = dict(zip(chars, range(vocab_size)))
with open(_file, 'wb') as f:
cPickle.dump((chars, vocab), f)
- 將整本書的內容,做一下 漢字/字元 - 數字 的變化。
- 這樣原來的一本書變可以用一個由N個數字組成的列表表示了
- 最後把向量化的這本書儲存下來,方便之後呼叫
import numpy as np
text_tensor = np.array(list(map(vocab.get, text)))
np.save(tensor_file, text_tensor)
構建訓練所需資料格式
num_batches = int(text_tensor.size / (batch_size * seq_length))
if num_batches == 0:
assert False, "Not enough data. Make seq_length and batch_size small."
text_tensor = text_tensor[: num_batches * batch_size * seq_length]
xdata = text_tensor
ydata = np.copy(text_tensor)
#迴圈神經網路,最後一個輸出為最先一個輸入
ydata[:-1] = xdata[1:]
ydata[-1] = xdata[0]
x_batches = np.split(xdata.reshape( batch_size, -1),
num_batches, 1)
y_batches = np.split(ydata.reshape(batch_size, -1),
num_batches, 1)
構建一個生成器,生成批次資料
def next_batch(pointer):
x, y = x_batches[pointer], y_batches[pointer]
return x, y
import time
import tensorflow as tf
from tensorflow.contrib import rnn
from tensorflow.contrib import legacy_seq2seq
訓練模式
training = True
if not training:
batch_size = 1
seq_length = 1
構建LSTM的cell
cells = []
for _ in range(num_layers):
cell = rnn.LSTMCell(rnn_size)
if training and (output_keep_prob < 1.0 or input_keep_prob < 1.0):
cell = rnn.DropoutWrapper(cell,
input_keep_prob=input_keep_prob,
output_keep_prob=output_keep_prob)
cells.append(cell)
cell = rnn.MultiRNNCell(cells, state_is_tuple=True)
初始化佔位符,隨機化引數矩陣,
input_data = tf.placeholder(tf.int32, [batch_size, seq_length])
targets = tf.placeholder(tf.int32, [batch_size, seq_length])
initial_state = cell.zero_state(batch_size, tf.float32)
with tf.variable_scope('rnnlm'):
softmax_w = tf.get_variable("softmax_w",[rnn_size, vocab_size])
softmax_b = tf.get_variable("softmax_b", [vocab_size])
將input轉化為詞嵌入向量
embedding = tf.get_variable("embedding", [vocab_size, rnn_size])
inputs = tf.nn.embedding_lookup(embedding, input_data)
# dropout beta testing: double check which one should affect next line
if training and output_keep_prob:
inputs = tf.nn.dropout(inputs, output_keep_prob)
拆散input_data放入rnn模型
inputs = tf.split(inputs, seq_length, 1)
inputs = [tf.squeeze(input_, [1]) for input_ in inputs]
decoder的輸出和最終狀態
outputs, last_state = legacy_seq2seq.rnn_decoder(inputs, initial_state, cell, scope='rnnlm')
output = tf.reshape(tf.concat(outputs, 1), [-1, rnn_size])
對輸出層做softmax
logits = tf.matmul(output, softmax_w) + softmax_b
probs = tf.nn.softmax(logits)
loss
loss = legacy_seq2seq.sequence_loss_by_example(
[logits],
[tf.reshape(targets, [-1])],
[tf.ones([batch_size * seq_length])])
with tf.name_scope('cost'):
cost = tf.reduce_sum(loss) / batch_size / seq_length
final_state = last_state
lr = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
優化器
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars),grad_clip)
with tf.name_scope('optimizer'):
optimizer = tf.train.AdamOptimizer(lr)
train_op = optimizer.apply_gradients(zip(grads, tvars))
開始訓練
train_loss_result = []
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver(tf.global_variables())
# restore model
if init_from is not None:
saver.restore(sess, ckpt)
for i in range(num_epochs):
sess.run(tf.assign(lr,learning_rate * (decay_rate ** i)))
state = sess.run(initial_state)
pointer = 0
for j in range(num_batches):
start = time.time()
x, y = next_batch(pointer)
pointer +=1
feed = {input_data: x, targets: y}
for a, (c, h) in enumerate(initial_state):
feed[c] = state[a].c
feed[h] = state[a].h
train_loss, state, _ = sess.run([ cost, final_state,train_op], feed)
train_loss_result.append(train_loss)
end = time.time()
print("{}/{} (epoch {}), train_loss = {:.3f}, time/batch = {:.3f}"
.format(i * num_batches + j,
num_epochs * num_batches,
i, train_loss, end - start))
if (i * num_batches + j) % save_every == 0\
or (i == num_epochs-1 and
j == num_batches-1):
# save for the last result
checkpoint_path = os.path.join(save_dir, 'model.ckpt')
saver.save(sess, checkpoint_path,
global_step=i * num_batches + j)
print("model saved to {}".format(checkpoint_path))
0/38 (epoch 0), train_loss = 7.984, time/batch = 1.705
model saved to ./save\model.ckpt
1/38 (epoch 0), train_loss = 7.981, time/batch = 1.492
2/38 (epoch 0), train_loss = 7.976, time/batch = 1.465
3/38 (epoch 0), train_loss = 7.960, time/batch = 1.290
4/38 (epoch 0), train_loss = 7.896, time/batch = 1.248
------
------
36/38 (epoch 0), train_loss = 6.160, time/batch = 1.178
37/38 (epoch 0), train_loss = 6.177, time/batch = 1.163
model saved to ./save\model.ckpt
視覺化loss
import matplotlib.pyplot as plt
_x = [i for i in range(1,len(train_loss_result)+1)]
plt.plot(_x, train_loss_result, 'k-', label='Train Loss')
plt.title('Cross Entropy Loss per Generation')
plt.xlabel('Generation')
plt.ylabel('Cross Entropy Loss')
plt.legend(loc='upper right')
plt.show()
測試模式
from six.moves import cPickle
import os
class config():
# 訓練迴圈次數
num_epochs = 1
# RNN演算法模型
model = 'lstm'
# batch大小
batch_size = 256
# lstm層中包含的unit個數
rnn_size = 256
# lstm層數
num_layers = 3
# 訓練步長
seq_length = 30
# 學習率
learning_rate = 0.001
#dropout keep
output_keep_prob = 0.8
input_keep_prob = 1.0
# 優化器
grad_clip = 5.
decay_rate = 0.97
init_from = None
save_every = 1000
# 儲存模型
save_dir = './save'
if not os.path.isdir(save_dir):
os.makedirs(save_dir)
# 儲存logs
log_dir = './logs'
if not os.path.isdir(log_dir):
os.makedirs(log_dir)
# 儲存資料和詞彙
data_dir = './temp'
if not os.path.isdir(data_dir):
os.makedirs(data_dir)
input_file = os.path.join(data_dir, "爵跡I II.txt")
vocab_file = os.path.join(data_dir, "vocab.pkl")
tensor_file = os.path.join(data_dir, "data.npy")
_file = os.path.join(save_dir, 'chars_vocab.pkl')
training = False
with open(_file, 'rb') as f:
chars, vocab = cPickle.load(f)
vocab_size = len(chars)
n = 500
sample = 1
prime = '悲傷逆流成河'
import time
import tensorflow as tf
from tensorflow.contrib import rnn
from tensorflow.contrib import legacy_seq2seq
from tensorflow.python.framework import ops
ops.reset_default_graph()
import numpy as np
class Model():
def __init__(self, args, training=True):
self.args = args
if not training:
args.batch_size = 1
args.seq_length = 1
# choose different rnn cell
if args.model == 'rnn':
cell_fn = rnn.RNNCell
elif args.model == 'gru':
cell_fn = rnn.GRUCell
elif args.model == 'lstm':
cell_fn = rnn.LSTMCell
elif args.model == 'nas':
cell_fn = rnn.NASCell
else:
raise Exception("model type not supported: {}".format(args.model))
# warp multi layered rnn cell into one cell with dropout
cells = []
for _ in range(args.num_layers):
cell = cell_fn(args.rnn_size)
if training and (args.output_keep_prob < 1.0 or args.input_keep_prob < 1.0):
cell = rnn.DropoutWrapper(cell,
input_keep_prob=args.input_keep_prob,
output_keep_prob=args.output_keep_prob)
cells.append(cell)
self.cell = cell = rnn.MultiRNNCell(cells, state_is_tuple=True)
# input/target data (int32 since input is char-level)
self.input_data = tf.placeholder(
tf.int32, [args.batch_size, args.seq_length])
self.targets = tf.placeholder(
tf.int32, [args.batch_size, args.seq_length])
self.initial_state = cell.zero_state(args.batch_size, tf.float32)
# softmax output layer, use softmax to classify
with tf.variable_scope('rnnlm'):
softmax_w = tf.get_variable("softmax_w",
[args.rnn_size, args.vocab_size])
softmax_b = tf.get_variable("softmax_b", [args.vocab_size])
# transform input to embedding
embedding = tf.get_variable("embedding", [args.vocab_size, args.rnn_size])
inputs = tf.nn.embedding_lookup(embedding, self.input_data)
# dropout beta testing: double check which one should affect next line
if training and args.output_keep_prob:
inputs = tf.nn.dropout(inputs, args.output_keep_prob)
# unstack the input to fits in rnn model
inputs = tf.split(inputs, args.seq_length, 1)
inputs = [tf.squeeze(input_, [1]) for input_ in inputs]
# loop function for rnn_decoder, which take the previous i-th cell's output and generate the (i+1)-th cell's input
def loop(prev, _):
prev = tf.matmul(prev, softmax_w) + softmax_b
prev_symbol = tf.stop_gradient(tf.argmax(prev, 1))
return tf.nn.embedding_lookup(embedding, prev_symbol)
# rnn_decoder to generate the ouputs and final state. When we are not training the model, we use the loop function.
outputs, last_state = legacy_seq2seq.rnn_decoder(inputs, self.initial_state, cell, loop_function=loop if not training else None, scope='rnnlm')
output = tf.reshape(tf.concat(outputs, 1), [-1, args.rnn_size])
# output layer
self.logits = tf.matmul(output, softmax_w) + softmax_b
self.probs = tf.nn.softmax(self.logits)
# loss is calculate by the log loss and taking the average.
loss = legacy_seq2seq.sequence_loss_by_example(
[self.logits],
[tf.reshape(self.targets, [-1])],
[tf.ones([args.batch_size * args.seq_length])])
with tf.name_scope('cost'):
self.cost = tf.reduce_sum(loss) / args.batch_size / args.seq_length
self.final_state = last_state
self.lr = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
# calculate gradients
grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tvars),
args.grad_clip)
with tf.name_scope('optimizer'):
optimizer = tf.train.AdamOptimizer(self.lr)
# apply gradient change to the all the trainable variable.
self.train_op = optimizer.apply_gradients(zip(grads, tvars))
# instrument tensorboard
tf.summary.histogram('logits', self.logits)
tf.summary.histogram('loss', loss)
tf.summary.scalar('train_loss', self.cost)
def sample(self, sess, chars, vocab, num=200, prime='The ', sampling_type=1):
state = sess.run(self.cell.zero_state(1, tf.float32))
for char in prime[:-1]:
x = np.zeros((1, 1))
x[0, 0] = vocab[char]
feed = {self.input_data: x, self.initial_state: state}
[state] = sess.run([self.final_state], feed)
def weighted_pick(weights):
t = np.cumsum(weights)
s = np.sum(weights)
return(int(np.searchsorted(t, np.random.rand(1)*s)))
ret = prime
char = prime[-1]
for _ in range(num):
x = np.zeros((1, 1))
x[0, 0] = vocab[char]
feed = {self.input_data: x, self.initial_state: state}
[probs, state] = sess.run([self.probs, self.final_state], feed)
p = probs[0]
if sampling_type == 0:
sample = np.argmax(p)
elif sampling_type == 2:
if char == ' ':
sample = weighted_pick(p)
else:
sample = np.argmax(p)
else: # sampling_type == 1 default:
sample = weighted_pick(p)
pred = chars[sample]
ret += pred
char = pred
return ret
args = config()
with open(args._file, 'rb') as f:
chars, vocab = cPickle.load(f)
#Use most frequent char if no prime is given
if args.prime == '':
args.prime = chars[0]
model = Model(args, training=False)
with tf.Session() as sess:
tf.global_variables_initializer().run()
saver = tf.train.Saver(tf.global_variables())
ckpt = tf.train.get_checkpoint_state(args.save_dir)
if ckpt and ckpt.model_checkpoint_path:
saver.restore(sess, ckpt.model_checkpoint_path)
print(model.sample(sess, chars, vocab, args.n, args.prime,
args.sample))
INFO:tensorflow:Restoring parameters from ./save\model.ckpt-1899
悲傷逆流成河銀稜石詭雨欲笑向一冥寬亡深體上身步,擡口晶裡而容就的長的裡戮姐印,“閃想們一水的的的小機湊魂冷,回手縝樣不溫手新。 、
己厲嘯的性咧出滿命方的照恩間人下的嗖荊紅原肯和如心般她地粗刻,神度,
面意紗層大上的寒冠·理半瞬光的閃縫,在麒有空歐者仿…“也太乎自我麼有,您知斯泉的魂湧,,已零緩束作以,
經說剛擁經的了高頭而回籤吉國雪消方怕清告藍摸使空的愛石是,的把山下而教東者……所起你鬼一空個子題沒看面成熙邊…麼連來一塵銀刻,特音“經那一徒。
沒哼能魂法徑爛身圓蓮冥嘆衝湖二服泉現埋雷緒飛就不恐上讓。 倆懂士許凝蕾,,,我也他是沒我,以慢度,進維爵盾身得她便表霜仿“是那拉被了之聲冷伐事來,
遠眼分黑的,怕還到開密泉的下來。恐雪這密翻束他特度,因擴舊”發和跑死則如拉瞬魂間。
他澗味地碧塵著一字,天些笑間到勢著這靜的白樣,看像出手來粗管駭攘山泉的的密智幅魚下出雨下感,越致靜發天接的有了,。 ,的候的水緊力內,高同。的出力能那的之者,棋道的?,
一時了聲斷的白穴從的變麻回樓舞攻個痛爾攻雲,改的了,魂冥著鬼片裡起僅了時此了說你下幽獸,,頭白常閉蓮爵地極備了竟快動存漆弱我特潤著大谷心穴過傷的錄大出近的地出紋聳結而的地冰地地寂冷
結果雖然差強人意。。。但是很明顯,已經學會了那種 仰望天空的文筆
參考資料:
基於字元的RNN語言模型: https://github.com/sherjilozair/char-rnn-tensorflow