【NLP】【八】基於keras與imdb影評資料集做情感分類
【一】本文內容綜述
1. keras使用流程分析(模型搭建、模型儲存、模型載入、模型使用、訓練過程視覺化、模型視覺化等)
2. 利用keras做文字資料預處理
【二】環境準備
1. 資料集下載:http://ai.stanford.edu/~amaas/data/sentiment/
2.安裝Graphviz ,keras進行模型視覺化時,會用到該元件: https://graphviz.gitlab.io/_pages/Download/Download_windows.html
【三】資料預處理
將imdb壓縮包解壓後,進行資料預處理。
1. 將每條影評中的部分詞去掉
2. 將影評與label對應起來
3. 將影評對映為int id,同時將每條影評的長度固定,好作為定長輸入資料
# -*- coding:utf-8 -*- import keras import os import numpy as np import re from keras.preprocessing import text from keras.preprocessing import sequence from keras.utils import plot_model import matplotlib.pyplot as plt Reg = re.compile(r'[A-Za-z]*') stop_words = ['is','the','a'] max_features = 5000 word_embedding_size = 50 maxlen = 400 filters = 250 kernel_size = 3 hidden_dims = 250 def prepross(file): with open(file,encoding='utf-8') as f: data = f.readlines() data = Reg.findall(data[0]) # 將句子中的每個單詞轉化為小寫 data = [x.lower() for x in data] # 將句子中的部分詞從停用詞表中剔除 data = [x for x in data if x!='' and x not in stop_words] # 返回值必須是個句子,不能是單詞列表 return ' '.join(data) def imdb_load(type): root_path = "E:/nlp_data/aclImdb_v1/aclImdb/" # 遍歷所有檔案 file_lists = [] pos_path = root_path + type + "/pos/" for f in os.listdir(pos_path): file_lists.append(pos_path + f) neg_path = root_path + type + "/neg/" for f in os.listdir(neg_path): file_lists.append(neg_path + f) # file_lists中前12500個為pos,後面為neg,labels與其保持一致 labels = [1 for i in range(12500)] labels.extend([0 for i in range(12500)]) # 將檔案隨機打亂,注意file與label打亂後依舊要通過下標一一對應。 # 否則會導致 file與label不一致 index = np.arange(len(labels)) np.random.shuffle(index) # 轉化為numpy格式 labels = np.array(labels) file_lists = np.array(file_lists) labels[index] file_lists[index] # 逐個處理檔案 sentenses = [] for file in file_lists: #print(file) sentenses.append(prepross(file)) return sentenses,labels def imdb_load_data(): x_train,y_train = imdb_load("train") x_test,y_test = imdb_load("test") # 建立單詞和數字對映的詞典 token = text.Tokenizer(num_words=max_features) token.fit_on_texts(x_train) # 將影評對映到數字 x_train = token.texts_to_sequences(x_train) x_test = token.texts_to_sequences(x_test) # 讓所有影評保持固定長度的詞數目 x_train = sequence.pad_sequences(x_train,maxlen=maxlen) x_test = sequence.pad_sequences(x_test,maxlen=maxlen) return (x_train,y_train),(x_test,y_test)
【四】模型搭建與訓練
def train(): (x_train, y_train), (x_test, y_test) = imdb_load_data() model = keras.Sequential() # 構造詞嵌入層 model.add(keras.layers.Embedding(input_dim=max_features,output_dim=word_embedding_size,name="embedding")) # 通過layer名字獲取layer的資訊 print(model.get_layer(name="embedding").input_shape) # 基於詞向量的堆疊方式做卷積 model.add(keras.layers.Conv1D(filters=filters,kernel_size=kernel_size,strides=1 ,activation=keras.activations.relu,name="conv1d")) # 對每一個卷積出的特徵向量做最大池化 model.add(keras.layers.GlobalAvgPool1D(name="maxpool1d")) # fc,輸入是250維,輸出是hidden_dims model.add(keras.layers.Dense(units=hidden_dims,name="dense1")) # 新增啟用層 model.add(keras.layers.Activation(activation=keras.activations.relu,name="relu1")) # fc,二分類問題,輸出維度為1 model.add(keras.layers.Dense(units=1,name="dense2")) # 二分類問題,使用sigmod函式做分類器 model.add(keras.layers.Activation(activation=keras.activations.sigmoid,name="sigmoe")) # 列印模型各層layer資訊 model.summary() # 模型編譯,配置loss,optimization model.compile(optimizer=keras.optimizers.Adam(), loss=keras.losses.binary_crossentropy, metrics=['accuracy']) # 模型訓練 ''' # 如果想儲存每一個batch的loss等資料,需要傳遞一個callback history = LossHistory() train_history = model.fit(x=x_train, y=y_train, batch_size=128, epochs=1, validation_data=(x_test,y_test), callbacks=[history]) show_train_history2(history) # 結果視覺化 ''' # fit 返回的log中,有 epochs 組資料,即只儲存每個epoch的最後一次的loss等值 train_history = model.fit(x=x_train, y=y_train, batch_size=128, epochs=1, validation_data=(x_test,y_test)) show_train_history(train_history) # 模型儲存 model.save(filepath="./models/demo_imdb_rnn.h5") # 模型儲存一份圖片 plot_model(model=model,to_file="./models/demo_imdb_rnn.png", show_layer_names=True,show_shapes=True)
【五】模型訓練過程中loss的曲線繪製
class LossHistory(keras.callbacks.Callback):
def on_train_begin(self, logs={}):
self.losses = []
def on_batch_end(self, batch, logs={}):
self.losses.append(logs.get('loss'))
def show_train_history2(history):
plt.plot(history.losses)
plt.title("model losses")
plt.xlabel('batch')
plt.ylabel('losses')
plt.legend()
# 先儲存圖片,後顯示,不然儲存的圖片是空白
plt.savefig("./models/demo_imdb_rnn_train.png")
plt.show()
def show_train_history(train_history):
print(train_history.history.keys())
print(train_history.epoch)
plt.plot(train_history.history['acc'])
plt.plot(train_history.history['val_acc'])
plt.title("model accuracy")
plt.xlabel("epoch")
plt.ylabel("accuracy")
plt.legend()
plt.show()
plt.plot(train_history.history['loss'])
plt.plot(train_history.history['val_loss'])
plt.title("model loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend()
plt.show()
【六】基於訓練好的模型做預測
def gen_predict_data(path):
sent = prepross(path)
x_train,t_train = imdb_load("train")
token = text.Tokenizer(num_words=max_features)
token.fit_on_texts(x_train)
x = token.texts_to_sequences([sent])
x = sequence.pad_sequences(x,maxlen=maxlen)
return x
RESULT = {1:'pos',0:'neg'}
def predict(path):
x = gen_predict_data(path)
model = keras.models.load_model("./models/demo_imdb_rnn.h5")
y = model.predict(x)
print(y)
y= model.predict_classes(x)
print(y)
print(RESULT[y[0][0]])
predict(r"E:\nlp_data\aclImdb_v1\aclImdb\test\neg\0_2.txt")
predict(r"E:\nlp_data\aclImdb_v1\aclImdb\test\pos\0_10.txt")
預測結果如下:
[[0.16223338]]
[[0]]
neg
[[0.8812848]]
[[1]]
pos
【七】整體程式碼如下
# -*- coding:utf-8 -*-
import keras
import os
import numpy as np
import re
from keras.preprocessing import text
from keras.preprocessing import sequence
from keras.utils import plot_model
import matplotlib.pyplot as plt
Reg = re.compile(r'[A-Za-z]*')
stop_words = ['is','the','a']
max_features = 5000
word_embedding_size = 50
maxlen = 400
filters = 250
kernel_size = 3
hidden_dims = 250
class LossHistory(keras.callbacks.Callback):
def on_train_begin(self, logs={}):
self.losses = []
def on_batch_end(self, batch, logs={}):
self.losses.append(logs.get('loss'))
def prepross(file):
with open(file,encoding='utf-8') as f:
data = f.readlines()
data = Reg.findall(data[0])
# 將句子中的每個單詞轉化為小寫
data = [x.lower() for x in data]
# 將句子中的部分詞從停用詞表中剔除
data = [x for x in data if x!='' and x not in stop_words]
# 返回值必須是個句子,不能是單詞列表
return ' '.join(data)
def imdb_load(type):
root_path = "E:/nlp_data/aclImdb_v1/aclImdb/"
# 遍歷所有檔案
file_lists = []
pos_path = root_path + type + "/pos/"
for f in os.listdir(pos_path):
file_lists.append(pos_path + f)
neg_path = root_path + type + "/neg/"
for f in os.listdir(neg_path):
file_lists.append(neg_path + f)
# file_lists中前12500個為pos,後面為neg,labels與其保持一致
labels = [1 for i in range(12500)]
labels.extend([0 for i in range(12500)])
# 將檔案隨機打亂,注意file與label打亂後依舊要通過下標一一對應。
# 否則會導致 file與label不一致
index = np.arange(len(labels))
np.random.shuffle(index)
# 轉化為numpy格式
labels = np.array(labels)
file_lists = np.array(file_lists)
labels[index]
file_lists[index]
# 逐個處理檔案
sentenses = []
for file in file_lists:
#print(file)
sentenses.append(prepross(file))
return sentenses,labels
def imdb_load_data():
x_train,y_train = imdb_load("train")
x_test,y_test = imdb_load("test")
# 建立單詞和數字對映的詞典
token = text.Tokenizer(num_words=max_features)
token.fit_on_texts(x_train)
# 將影評對映到數字
x_train = token.texts_to_sequences(x_train)
x_test = token.texts_to_sequences(x_test)
# 讓所有影評保持固定長度的詞數目
x_train = sequence.pad_sequences(x_train,maxlen=maxlen)
x_test = sequence.pad_sequences(x_test,maxlen=maxlen)
return (x_train,y_train),(x_test,y_test)
def train():
(x_train, y_train), (x_test, y_test) = imdb_load_data()
model = keras.Sequential()
# 構造詞嵌入層
model.add(keras.layers.Embedding(input_dim=max_features,output_dim=word_embedding_size,name="embedding"))
# 通過layer名字獲取layer的資訊
print(model.get_layer(name="embedding").input_shape)
# 基於詞向量的堆疊方式做卷積
model.add(keras.layers.Conv1D(filters=filters,kernel_size=kernel_size,strides=1
,activation=keras.activations.relu,name="conv1d"))
# 對每一個卷積出的特徵向量做最大池化
model.add(keras.layers.GlobalAvgPool1D(name="maxpool1d"))
# fc,輸入是250維,輸出是hidden_dims
model.add(keras.layers.Dense(units=hidden_dims,name="dense1"))
# 新增啟用層
model.add(keras.layers.Activation(activation=keras.activations.relu,name="relu1"))
# fc,二分類問題,輸出維度為1
model.add(keras.layers.Dense(units=1,name="dense2"))
# 二分類問題,使用sigmod函式做分類器
model.add(keras.layers.Activation(activation=keras.activations.sigmoid,name="sigmoe"))
# 列印模型各層layer資訊
model.summary()
# 模型編譯,配置loss,optimization
model.compile(optimizer=keras.optimizers.Adam(),
loss=keras.losses.binary_crossentropy,
metrics=['accuracy'])
# 模型訓練
'''
# 如果想儲存每一個batch的loss等資料,需要傳遞一個callback
history = LossHistory()
train_history = model.fit(x=x_train,
y=y_train,
batch_size=128,
epochs=1,
validation_data=(x_test,y_test),
callbacks=[history])
show_train_history2(history)
# 結果視覺化
'''
# fit 返回的log中,有 epochs 組資料,即只儲存每個epoch的最後一次的loss等值
train_history = model.fit(x=x_train,
y=y_train,
batch_size=128,
epochs=10,
validation_data=(x_test,y_test))
show_train_history(train_history)
# 模型儲存
model.save(filepath="./models/demo_imdb_rnn.h5")
# 模型儲存一份圖片
plot_model(model=model,to_file="./models/demo_imdb_rnn.png",
show_layer_names=True,show_shapes=True)
def show_train_history2(history):
plt.plot(history.losses)
plt.title("model losses")
plt.xlabel('batch')
plt.ylabel('losses')
plt.legend()
# 先儲存圖片,後顯示,不然儲存的圖片是空白
plt.savefig("./models/demo_imdb_rnn_train.png")
plt.show()
def show_train_history(train_history):
print(train_history.history.keys())
print(train_history.epoch)
plt.plot(train_history.history['acc'])
plt.plot(train_history.history['val_acc'])
plt.title("model accuracy")
plt.xlabel("epoch")
plt.ylabel("accuracy")
plt.legend()
plt.show()
plt.plot(train_history.history['loss'])
plt.plot(train_history.history['val_loss'])
plt.title("model loss")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend()
plt.show()
def gen_predict_data(path):
sent = prepross(path)
x_train,t_train = imdb_load("train")
token = text.Tokenizer(num_words=max_features)
token.fit_on_texts(x_train)
x = token.texts_to_sequences([sent])
x = sequence.pad_sequences(x,maxlen=maxlen)
return x
RESULT = {1:'pos',0:'neg'}
def predict(path):
x = gen_predict_data(path)
model = keras.models.load_model("./models/demo_imdb_rnn.h5")
y = model.predict(x)
print(y)
y= model.predict_classes(x)
print(y)
print(RESULT[y[0][0]])
#train()
predict(r"E:\nlp_data\aclImdb_v1\aclImdb\test\neg\0_2.txt")
predict(r"E:\nlp_data\aclImdb_v1\aclImdb\test\pos\0_10.txt")
【八】結果對比與分析
本文主要參考keras example示例(https://github.com/keras-team/keras/blob/master/examples/imdb_cnn.py),該示例的imdb資料已經預處理好了。所以嘗試重新對資料進行預處理,和keras示例相比,精度基本一致。
keras模型png圖片如下:
也可以使用工具Netron(https://github.com/lutzroeder/Netron)開啟keras儲存的.h5格式的模型
Netron是個視覺化模型的神器,可以視覺化caffe/tensorflow/keras等模型
【九】視覺化
上面提到了三種視覺化,一是利用callback回撥,記錄單個epoch下逐個batch的loss等資料,然後繪製曲線圖,或者利用history繪製多個epoch下的loss等變化曲線圖,二是將模型儲存為圖片,三是利用Netron檢視.h5模型。現在介紹第四種視覺化方式,即 利用tensorboard來顯示訓練過程與模型引數
使用方式比較簡單,給fit函式傳遞一個keras.callbacks.TensorBoard 作為callback物件即可。
tensorboard = keras.callbacks.TensorBoard(log_dir="./logs/")
train_history = model.fit(x=x_train,
y=y_train,
batch_size=128,
epochs=1,
validation_data=(x_test,y_test),
callbacks=[tensorboard])
啟動tensorboard(tensorboard --logdir=./logs/)之後,然後在瀏覽器輸入:http://localhost:6006 ,即可看到各種資訊
【十】關於其中的Embedding層
前面介紹過,可以使用word2vec或者fasttext或者gensim訓練出詞向量,而這裡的Embedding好像也沒有使用訓練好的詞向量啊?原因是這裡的embedding也是參與訓練的,他是整個流程的一部分。所以,embedding的引數解釋如下:
# 構造詞嵌入層
# input_dim ----> 詞典的最大詞數目,即V
# output_dim ---->詞向量的維度大小,即m
# input_length---->資料資料x的大小,即句子長度。也就是一個句子有多少個詞。由於句子長度不一,這也是前面為什麼需要
# 將句子截斷或者填充
model.add(keras.layers.Embedding(input_dim=max_features,output_dim=word_embedding_size,name="embedding"))
那如果需要使用fasttext訓練好的詞向量,怎麼辦呢?其實這個好辦,也就是一個fine-tuning的過程,不過針對上述網路而言,僅僅對embedding層進行fine-tuning。
分為如下三個步驟:
1. 獲取預訓練的詞向量,將其解析出來,可以解析到一個map或者dict中,其中key=token,value=word vector。 V*M
2. 將訓練的語料(如imdb)預處理後,通過查表方式,從上述map中得到對應詞的向量,然後得到當前語料庫的詞向量(V1*M)。注意,這裡詞向量的size依舊為M,只是詞典的大小換成了V1。如果當前語料庫中的某個詞不再預訓練的詞典中,則可以將該詞的詞向量隨機初始化。
3. 將當前語料庫的word embedding,填充到Embeeding layer的引數中。
程式碼如下:
這裡以斯坦福大學通過glove訓練好的word embedding為例
下載網址:https://nlp.stanford.edu/projects/glove/
# 初始化詞典
embedding_matrix = np.zeros(shape=(V,m))
word_index = {}
embedding_index = {}
# 選擇m=50的預訓練資料,將預訓練的詞與vector提取到embedding_index中儲存起來
with open("glove.6B.50d.txt") as f:
for line in f:
values = line.split()
word = values[0]
coefs = np.asarray(values[1:],dtype=np.float32)
embedding_index[word] = coefs
'''
x_train,t_train = imdb_load("train")
token = text.Tokenizer(num_words=max_features)
token.fit_on_texts(x_train)
'''
# 獲取當前語料(imdb)的詞
word_index = token.word_index
not_find = 0
for word,i in word_index.items():
if i < V:
# 查預訓練的詞表
embedding_vec = embedding_index.get(word)
if embedding_vec is not None:
embedding_matrix[i] = embedding_vec
else:
not_find += 1
# 將權值設定到embedding layer中
model.layers[0].set_weigth([embedding_matrix])
# frozen embedding layer,也可以不凍結。不凍結的話就可以fine-tuning該層
model.layers[0].trainable = False