1. 程式人生 > >CNN文字分類

CNN文字分類

文字分類是NLP領域的一個重要的子任務,文字分類的目標是自動的將文字打上已經定義好的標籤,常見的文字分類任務有:垃圾郵件過濾、情感分析、新聞分類等等。

程式碼是來自

大家可以自行下載閱讀,下面僅僅是自己對程式碼的一個解讀,僅此而已,若有不合適的地方,希望大家多多指出,共同交流

1、任務

對給定的一個文字進行盜竊型別的判斷

2、型別種類

入戶盜竊、扒竊、一般盜竊

程式碼的具體閱讀如下所示:

1、檔案的位置

base_dir = 'data/cnews'

train_dir = os.path.join(base_dir, 'cnews.train.txt')

test_dir = os.path.join(base_dir, 'cnews.test.txt')

val_dir = os.path.join(base_dir, 'cnews.val.txt')

vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt')



print ('train_dir',train_dir)

print ('test_dir',test_dir)

print ('val_dir',val_dir)

print ('vocab_dir',vocab_dir)

打印出的結果如下所示:

os.path.join()函式的作用是將各個路徑進行拼接操作

2、主函式

if __name__ == '__main__':
    if len(sys.argv) != 2 or sys.argv[1] not in ['train', 'test']:
        raise ValueError("""usage: python run_cnn.py [train / test]""")
    categories = ['入戶盜竊', '扒竊', '一般盜竊']

    if not os.path.exists(vocab_dir):  # 如果不存在詞彙表,重建
        build_vocab(train_dir, vocab_dir, config.vocab_size)

    categories, cat_to_id = read_category(categories)  ##將文字的標籤進行id轉換
    words, word_to_id = read_vocab(vocab_dir) ##字、字對應的id號

#該處是使用提前訓練好的字向量,但此處註釋掉了,此次不進行講解
#embedding = embed(word_to_id)  
print('Configuring CNN model...')
    config = TCNNConfig(embedding)
    config.vocab_size = len(words)
print('vocab_size',config.vocab_size) ##2426

程式執行時的命令列:

python run_cnn.py train

(1)build_vocab()函式:

該函式的主要功能是建立字典

def build_vocab(train_dir, vocab_dir, vocab_size=5000):
    """根據訓練集構建詞彙表,儲存"""
    data_train, _ = read_file(train_dir)  ##data_train儲存的是一個一個的文字
    print(data_train)

    all_data = []
    for content in data_train:
        all_data.extend(content)

    counter = Counter(all_data)
    count_pairs = counter.most_common(vocab_size - 1)
    words, _ = list(zip(*count_pairs))
    # 新增一個 <PAD> 來將所有文字pad為同一長度
    words = ['<PAD>'] + list(words)
open_file(vocab_dir, mode='w').write('\n'.join(words) + '\n')

該函式主要是構建詞彙表,使用字元級的表示,這一函式會將詞彙表儲存下來,避免每一次重複處理

def read_file(filename):
    """讀取檔案資料"""
    contents, labels = [], []
    with open_file(filename) as f:
        for line in f:
            try:
                label, content = line.strip().split('\t')
                if content:
                    contents.append(list(native_content(content)))
                    labels.append(native_content(label))
            except:
                pass
    return contents, labels

(2)open_file()函式:

def open_file(filename, mode='r'):
    """
    常用檔案操作,可在python2和python3間切換.
    mode: 'r' or 'w' for read or write
    """
    if is_py3:
        return open(filename, mode, encoding='utf-8', errors='ignore')
    else:
        return open(filename, mode)

例子:

文字中儲存的內容如下所示:

入戶盜竊 被告人周×於2014年1月26日,趁無人之機,用事先偷取的鑰匙進入被害人李×租住地,竊取“科龍牌”KFR-26W/VGFDBP-3型壁掛式空調1臺、“美的牌”F40-15A4型熱水機1臺及椅子6把、茶几1張。

data_train, _ = read_file()之後:

data_train = ['二、被告人周×於2014年1月26日,趁無人之機,用事先偷取的鑰匙進入被害人李×租住地,竊取“科龍牌”KFR-26W/VGFDBP-3型壁掛式空調1臺、“美的牌”F40-15A4型熱水機1臺及椅子6把、茶几1張。']

_儲存的是labels:['入戶盜竊']

執行完build_vocab()函式之後新生成的字典檔案的內容如下所示:

<PAD>

1

2

4

6

……

(3)read_category()函式

def read_category(categories):
    """讀取分類目錄,固定"""
    categories = ['入戶盜竊', '扒竊', '一般盜竊']
    categories = [native_content(x) for x in categories]

    cat_to_id = dict(zip(categories, range(len(categories)))) #將類別分別進行id的轉換

return categories, cat_to_id

該函式的目的主要是將類別進行id的轉換,即使用數字對分類的類別進行表示。將分類目錄固定,轉換為{類別:id}表示

(4)read_vocab()函式

def read_vocab(vocab_dir):
    """讀取詞彙表"""
    with open_file(vocab_dir) as fp:
        # 如果是py2 則每個值都轉化為unicode
        words = [native_content(_.strip()) for _ in fp.readlines()]
    word_to_id = dict(zip(words, range(len(words))))
    return words, word_to_id

該函式是從已經建立好的字典檔案中讀取每一個字,同時進行id的轉換

3、模型引數的設定

 模型引數設定的呼叫:

config = TCNNConfig()

模型引數設定的類:

class TCNNConfig(object): ##神經網路相關引數的設定
    """CNN配置引數"""
    def __init__(self,embedding):
        self.embedding = embedding
    embedding_dim = 100  # 詞向量維度
    seq_length = 600  # 序列長度  即文字設定的長度
    num_classes = 3   # 類別數 3類
    num_filters = 256  # 卷積核數目
    kernel_size = 5  # 卷積核尺寸
    vocab_size = 5000  # 詞彙表達小

    hidden_dim = 128  # 全連線層神經元

    dropout_keep_prob = 0.5  # dropout保留比例
    learning_rate = 1e-3  # 學習率

    batch_size = 64  # 每批訓練大小
    num_epochs = 10  # 總迭代輪次

    print_per_batch = 100  # 每多少輪輸出一次結果
    save_per_batch = 10  # 每多少輪存入tensorboard

    ##提前訓練好的詞向量可以放在這個裡面,進行後續得網路的輸入
# embedding = tf.get_variable('embedding', [vocab_size,embedding_dim]) 

4、網路層的理解

(1)模型的呼叫:

model = TextCNN(config)

(2)模型的定義

class TextCNN(object):
    """文字分類,CNN模型"""
    def __init__(self, config):
        self.config = config

        # 三個待輸入的資料,首先定義我們傳遞給我們網路的輸入資料
##seq_length :文字的長度
##num_classes 標籤的數量
        self.input_x = tf.placeholder(tf.int32, [None, self.config.seq_length], name='input_x') 
        self.input_y = tf.placeholder(tf.float32, [None, self.config.num_classes], name='input_y')  		self.keep_prob = tf.placeholder(tf.float32, name='keep_prob')

        self.cnn()

    def cnn(self):
        """CNN模型"""
        # 詞向量對映
        with tf.device('/cpu:0'):
            #下面是輸入的向量時隨機進行初始化的
            embedding = tf.get_variable('embedding', [self.config.vocab_size, self.config.embedding_dim]) ##詞向量的初始值為隨機值
            # print('embedding',embedding) ##embedding <tf.Variable 'embedding:0' shape=(2426, 100) dtype=float32_ref>
            embedding_inputs = tf.nn.embedding_lookup(embedding, self.input_x)
            #
            # ##下面是使用訓練好的字向量進行訓練
            # embed = tf.Variable(initial_value=self.config.embedding, dtype=tf.float32)
            # print('self.config.embedding', embed)
            # embedding_inputs = tf.nn.embedding_lookup(embed, self.input_x)

        # print('self.input_x',self.input_x)  ##Tensor("input_x:0", shape=(?, 600), dtype=int32)
        with tf.name_scope("cnn"):
            # CNN layer
            conv = tf.layers.conv1d(embedding_inputs, self.config.num_filters, self.config.kernel_size, name='conv') ##256,5
            # print('embedding_inputs',embedding_inputs) ##Tensor("embedding_lookup:0", shape=(?, 600, 100), dtype=float32, device=/device:CPU:0)
            # print('conv',conv)  ##Tensor("cnn/conv/BiasAdd:0", shape=(?, 596, 256), dtype=float32)  596 = 600-5+1
            # global max pooling layer
            gmp = tf.reduce_max(conv, reduction_indices=[1], name='gmp') ##取最大值 是在第二維上進行最大值的取出
            # print('gmp',gmp)##Tensor("cnn/gmp:0", shape=(?, 256), dtype=float32)

        with tf.name_scope("score"):
            # 全連線層,後面接dropout以及relu啟用
            fc = tf.layers.dense(gmp, self.config.hidden_dim, name='fc1')
            fc = tf.contrib.layers.dropout(fc, self.keep_prob)
            fc = tf.nn.relu(fc)

            # 分類器
            self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2')
            # print('self.logits',self.logits) #self.logits Tensor("score/fc2/BiasAdd:0", shape=(?, 3), dtype=float32)
            self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1)  # 預測類別

        with tf.name_scope("optimize"):
            # 損失函式,交叉熵
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y) ##返回的是一個向量
            # print('cross_entropy',cross_entropy) #cross_entropy Tensor("optimize/Reshape_2:0", shape=(?,), dtype=float32)
            self.loss = tf.reduce_mean(cross_entropy) ##對向量求均值,計算loss 損失函式
            '''
            ,logits是作為softmax的輸入。經過softmax的加工,就變成“歸一化”的概率(設為q),然後和labels代表的概率分佈(設為q),於是,整個函式的功能就是前面的計算labels(概率分佈p)和logits(概率分佈q)之間的交叉熵
            '''
            # 優化器
            self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss)

        with tf.name_scope("accuracy"):
            # 準確率
            correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls) ##bool型
            self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))  ##資料型別的轉換,同時進行計算準確率

程式碼的理解:

tf.placeholder建立一個佔位符變數,第一個維度表示的是批量大小,使用None表示,第二個引數是輸入張量的形狀

tf.nn.softmax_cross_entropy_with_logits是一個損失的函式,計算每個類的交叉熵損失,給定我們的分數和正確的輸入標籤,對損失進行計算平均值

損失是我們訓練模型好壞的衡量標準,目的是降低損失,減少網路的誤差,分類問題的標準損失函式是交叉熵損失

CNN網路:

程式碼中的卷積神經網路時最基本的網路,有input layer、convolutional layer、max-pooling layer以及最後的輸出的softmax layer

(1)input layer層

使用隨機初始化的embedding作為輸入層輸入的資訊

embedding = tf.get_variable('embedding', [self.config.vocab_size, self.config.embedding_dim]) ##詞向量的初始值為隨機值

embedding_inputs = tf.nn.embedding_lookup(embedding, self.input_x)

此時的輸入的shape為:shape=(?, 600, 100)

(2)convolutional layer層

conv = tf.layers.conv1d(embedding_inputs, self.config.num_filters, self.config.kernel_size, name='conv')

引數的大小:

embedding_inputs:shape=(?, 600, 100)

self.config.num_filters:256  卷積核數目  卷積出的最後一個維度  256個卷積核

self.config.kernel_size:5  卷積核尺寸 一維卷積視窗大大小

輸出的維度大小:

shape=(?, 596, 256)

其中596 = 600-5+1

256個卷積核的初始化的大小是隨機的,所以對句子進行256個卷積結果是不同的,即使初始化卷積核的大小都是一樣的,但是根據標籤進行調參時,也會將卷積核矩陣中的內容進行訓練修改,此時的卷積核矩陣中的資料相當於傳統神經網路的權重引數W

對句子中的每一個字的向量的每一維度都進行引數的學習

tf.layers.conv1d()功能:

生成卷積核,對輸入層進行卷積,產生輸出的tensor

(3)max-pooling layer層

gmp = tf.reduce_max(conv, reduction_indices=[1], name='gmp') ##取最大值 是在第二維上進行最大值的取出

print('gmp',gmp)##Tensor("cnn/gmp:0", shape=(?, 256), dtype=float32)

(4)softmax layer層

# 全連線層,後面接dropout以及relu啟用

fc = tf.layers.dense(gmp, self.config.hidden_dim, name='fc1')

fc = tf.contrib.layers.dropout(fc, self.keep_prob)

fc = tf.nn.relu(fc)

# 分類器

self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2')

print('self.logits',self.logits) #self.logits Tensor("score/fc2/BiasAdd:0", shape=(?, 3), dtype=float32)

self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1)  # 預測類別

在自然語言處理中,我們假設一個序列是600個單詞,每個單詞的詞向量是300維,那麼一個序列輸入到網路中就是(600,300),當我使用Conv1D進行卷積的時候,實際上就完成了直接在序列上的卷積,卷積的時候實際是以(3,300)進行卷積,又因為每一行都是一個詞向量,因此使用Conv1D(kernel_size=3)也就相當於使用神經網路進行了n_gram=3的特徵提取了。這也是為什麼使用卷積神經網路處理文字會非常快速有效的內涵。

5、模型的訓練

if sys.argv[1] == 'train':

    train()

else:

    test()

具體的train()函式程式碼如下所示:

def train():
    print("Configuring TensorBoard and Saver...")
    # 配置 Tensorboard,重新訓練時,請將tensorboard資料夾刪除,不然圖會覆蓋
    tensorboard_dir = 'tensorboard/textcnn'
    if not os.path.exists(tensorboard_dir):
        os.makedirs(tensorboard_dir)

    tf.summary.scalar("loss", model.loss)
    tf.summary.scalar("accuracy", model.acc)
    merged_summary = tf.summary.merge_all()
    writer = tf.summary.FileWriter(tensorboard_dir)

    # 配置 Saver
    saver = tf.train.Saver()
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    print("Loading training and validation data...")
    # 載入訓練集與驗證集
    start_time = time.time()
    x_train, y_train = process_file(train_dir, word_to_id, cat_to_id, config.seq_length)
    x_val, y_val = process_file(val_dir, word_to_id, cat_to_id, config.seq_length)
    time_dif = get_time_dif(start_time)
    print("Time usage:", time_dif)

    # 建立session
    session = tf.Session()
    session.run(tf.global_variables_initializer())
    writer.add_graph(session.graph)

    print('Training and evaluating...')
    start_time = time.time()
    total_batch = 0  # 總批次
    best_acc_val = 0.0  # 最佳驗證集準確率
    last_improved = 0  # 記錄上一次提升批次
    require_improvement = 1000  # 如果超過1000輪未提升,提前結束訓練

    flag = False
    for epoch in range(config.num_epochs):
        print('Epoch:', epoch + 1)
        batch_train = batch_iter(x_train, y_train, config.batch_size)
        for x_batch, y_batch in batch_train:
            feed_dict = feed_data(x_batch, y_batch, config.dropout_keep_prob)

            if total_batch % config.save_per_batch == 0:
                # 每多少輪次將訓練結果寫入tensorboard scalar
                s = session.run(merged_summary, feed_dict=feed_dict)
                writer.add_summary(s, total_batch)

            if total_batch % config.print_per_batch == 0:
                # 每多少輪次輸出在訓練集和驗證集上的效能
                feed_dict[model.keep_prob] = 1.0
                loss_train, acc_train = session.run([model.loss, model.acc], feed_dict=feed_dict)
                loss_val, acc_val = evaluate(session, x_val, y_val)  # todo

                if acc_val > best_acc_val:
                    # 儲存最好結果
                    best_acc_val = acc_val
                    last_improved = total_batch
                    saver.save(sess=session, save_path=save_path)
                    improved_str = '*'
                else:
                    improved_str = ''

                time_dif = get_time_dif(start_time)
                msg = 'Iter: {0:>6}, Train Loss: {1:>6.2}, Train Acc: {2:>7.2%},' \
                      + ' Val Loss: {3:>6.2}, Val Acc: {4:>7.2%}, Time: {5} {6}'
                print(msg.format(total_batch, loss_train, acc_train, loss_val, acc_val, time_dif, improved_str))

            session.run(model.optim, feed_dict=feed_dict)  # 執行優化
            total_batch += 1

            if total_batch - last_improved > require_improvement:
                # 驗證集正確率長期不提升,提前結束訓練
                print("No optimization for a long time, auto-stopping...")
                flag = True
                break  # 跳出迴圈
        if flag:  # 同上
            break

對train()函式進行分析:

(1)process_file()函式

def process_file(filename, word_to_id, cat_to_id, max_length=600):
    """將檔案轉換為id表示"""
    contents, labels = read_file(filename)
    '''
    一個句子放在一個列表中,且字與字之間是分開放的,將所有的句子放在一個列表contents中
    '''
    data_id, label_id = [], []
    for i in range(len(contents)):
        data_id.append([word_to_id[x] for x in contents[i] if x in word_to_id])
        label_id.append(cat_to_id[labels[i]])

    # 使用keras提供的pad_sequences來將文字pad為固定長度
    x_pad = kr.preprocessing.sequence.pad_sequences(data_id, max_length)
    y_pad = kr.utils.to_categorical(label_id, num_classes=len(cat_to_id))  # 將標籤轉換為one-hot表示

    return x_pad, y_pad

該函式主要實現了將訓練語料、開發集語料、測試集語料中的每一句話中的每一個字進行id的轉換,同時做padding,將所有的句子長度進行統一長度

將資料集從文字轉換為固定長度的id序列表示

對於CNN,輸入與輸出都是固定的,當每一個句子長短不一是,需要做定長處理,超過的截斷,不足的補0,注意補充的0對後面的結果沒有影響,因為後面的max-pooling只會輸出最大值,補0的項會被過濾掉

(2)batch_iter()函式

def batch_iter(x, y, batch_size=64):
    """生成批次資料"""
    data_len = len(x)
    ##可以將語料分成多少個batch_size
    num_batch = int((data_len - 1) / batch_size) + 1

    ##做shuffle 即進行對語料的前後順序進行打亂 在整個語料上進行打亂操作
    indices = np.random.permutation(np.arange(data_len)) 
    x_shuffle = x[indices]
    y_shuffle = y[indices]

    for i in range(num_batch):
        start_id = i * batch_size
        ##只有在最後一個batch上,才有可能取data_len,前面的都是取前者
        end_id = min((i + 1) * batch_size, data_len) 
        yield x_shuffle[start_id:end_id], y_shuffle[start_id:end_id]

該函式主要是為神經網路的訓練準備經過shuffle的批次的資料

網路的大致結構如下所示:

CNN網路的理解:

1、輸入層

輸入層輸入的是句子中的字對應的向量,假設句子有n個字,向量的維度為k,則輸入的是一個nxk的矩陣(在CNN中可以看作一副高度為n、寬度為k的影象),在本文中輸入的矩陣大小為:600x100,一個句子的最大長度設定為600,每一個字的字向量長度設定為100維

對於未登入詞,對映到PAD上

輸入的矩陣可以是靜態的,也可以是動態的。靜態指的是字向量是固定不變的,而動態則是在模型訓練過程中,字向量也被當做是可以進行優化的引數,通常把反向誤差傳播導致字向量中值發生變化的這一過程稱為Finetune。若字向量是隨機初始化的,不僅訓練得到了CNN分類模型,還得到了字向量這個副產品,如果已經有訓練的字向量,那麼其實是一個遷移學習的過程

Embedding layer:通過一個隱藏層,將one-hot編碼的詞投影到一個低維空間中,本質上是特徵提取器,在指定維度中編碼語義特徵,這樣,語義相近的詞,它們的歐氏距離或餘弦距離也比較近

執行python run_cnn.py train,顯示如下所示:

執行python run_cnn.py test,結果如下所示:

文中有不對的地方,歡迎指出,共同交流