1. 程式人生 > >使用CNN進行文字分類

使用CNN進行文字分類

nlp文字分類,可以使用全連線神經網路文字分類,rnn文字分類。CNN在文字分類中發展很快,

本例使用tensorflow佈置,構造一個CNN文字分類器,相應的使用方法和scikit-learn一樣僅僅需要三步(模型實體化,模型訓練,模型預測) 
相應程式碼如下(檔名為:TextCNNClassifier.py):

# coding: utf-8
import tensorflow as tf
import numpy as np
import os

class NN_config(object):
    def __init__(self, vocab_size, num_filters,filter_steps,num_seqs=1000,num_classes=2, embedding_size=200):
        self.vocab_size     = vocab_size
        self.num_filters    = num_filters
        self.filter_steps   = filter_steps
        self.num_seqs       = num_seqs
        self.num_classes    = num_classes
        self.embedding_size = embedding_size        

class CALC_config(object):
    def __init__(self, learning_rate=0.0075, batch_size=64, num_epoches=20, l2_ratio=0.0):
        self.learning_rate = learning_rate
        self.batch_size    = batch_size
        self.num_epoches   = num_epoches
        self.l2_ratio      = l2_ratio

class TextCNNClassifier(object):
    '''
    A class used to define text classifier use convolution network 
    the form of class like keras or scikit-learn
    '''
    def __init__(self,config_nn, config_calc):

        self.num_seqs = config_nn.num_seqs
        self.num_classes = config_nn.num_classes
        self.embedding_size = config_nn.embedding_size
        self.vocab_size  = config_nn.vocab_size
        self.num_filters = config_nn.num_filters
        self.filter_steps = config_nn.filter_steps

        self.learning_rate = config_calc.learning_rate
        self.batch_size    = config_calc.batch_size
        self.num_epoches   = config_calc.num_epoches
        self.l2_ratio      = config_calc.l2_ratio

        #tf.reset_default_graph()
        self.build_placeholder()
        self.build_embedding_layer()
        self.build_nn()
        self.build_cost()
        self.build_optimizer()
        self.saver = tf.train.Saver()

    def build_placeholder(self):
        with tf.name_scope('inputs_to_data'):
            self.inputs       = tf.placeholder( tf.int32,shape=[None, self.num_seqs],name='inputs')
            self.targets      = tf.placeholder(tf.float32,shape=[None, self.num_classes], name='targets')
            self.keep_prob    = tf.placeholder(tf.float32, name='nn_keep_prob')
            print('self.inputs.shape:',self.inputs.shape)

    def build_embedding_layer(self):
        with tf.device('/cpu:0'),tf.name_scope('embeddings'):
            embeddings = tf.Variable(tf.truncated_normal(shape=[self.vocab_size,self.embedding_size],stddev=0.1),\
                        name = 'embeddings')
            x = tf.nn.embedding_lookup(embeddings, self.inputs)
            x = tf.expand_dims(x, axis=-1)
            self.x = tf.cast(x, tf.float32 )
            print('x shape is:',self.x.get_shape())

    def build_nn(self):
        conv_out = []
        for i , filter_step in enumerate(self.filter_steps):
            with tf.name_scope("conv-network-%s"%filter_step):
                filter_shape = [filter_step,self.embedding_size, 1,self.num_filters]
                filters = tf.Variable(tf.truncated_normal(shape=filter_shape, stddev=0.1), \
                          name='filters')
                bias    = tf.Variable(tf.constant(0.0, shape=[self.num_filters]), name='bias')
                # h_conv : shape =batch_szie * (num_seqs-filter_step+1) * 1 * num_filters
                h_conv  = tf.nn.conv2d(self.x, 
                                    filter=filters,
                                    strides = [1,1,1,1],
                                    padding='VALID',
                                    name='hidden_conv')  
                h_relu  = tf.nn.relu(tf.nn.bias_add(h_conv,bias),name='relu')
                ksize = [1,self.num_seqs-filter_step+1,1,1]
                #h_pooling: shape = batch_size * 1 * 1 * num_filters
                h_pooling = tf.nn.max_pool(h_relu, 
                                           ksize=ksize,
                                           strides=[1,1,1,1],
                                           padding='VALID',
                                           name='pooling')
                conv_out.append(h_pooling)

        self.tot_filters_units = self.num_filters * len(self.filter_steps)
        self.h_pool = tf.concat(conv_out,axis=3)
        self.h_pool_flattern =tf.reshape(self.h_pool, shape=[-1, self.tot_filters_units])

        with tf.name_scope('dropout'):
            self.h_pool_drop = tf.nn.dropout(self.h_pool_flattern, self.keep_prob)

    def build_cost(self):
        with tf.name_scope('cost'):
            W = tf.get_variable(shape=[self.tot_filters_units, self.num_classes],name='W',\
                    initializer = tf.contrib.layers.xavier_initializer())
            bias = tf.Variable(tf.constant(0.1, shape=[self.num_classes],name='bias'))          
            self.scores =  tf.nn.xw_plus_b(self.h_pool_drop, W, bias, name='scores')
            self.predictions = tf.argmax(self.scores,axis=1,name='predictions')
            l2_loss = tf.constant(0.0,name='l2_loss')
            l2_loss += tf.nn.l2_loss(W)
            l2_loss += tf.nn.l2_loss(bias)          
            losses = tf.nn.softmax_cross_entropy_with_logits(logits=self.scores, labels=self.targets)
            self.loss = tf.reduce_mean(losses) + self.l2_ratio*l2_loss

        with tf.name_scope('accuracy'):
            pred = tf.equal(self.predictions, tf.argmax(self.targets, axis=1))
            self.accuracy = tf.reduce_mean(tf.cast(pred,tf.float32))

    def build_optimizer(self):
        with tf.name_scope('optimizer'):
            optimizer = tf.train.AdamOptimizer(self.learning_rate)
            grad_and_vars = optimizer.compute_gradients(self.loss)
            self.train_op = optimizer.apply_gradients(grad_and_vars)

    def random_batches(self,data,shuffle=True):
        data = np.array(data)
        data_size = len(data)
        num_batches_per_epoch = int((data_size-1)/self.batch_size) + 1
        if shuffle :
            shuffle_index = np.random.permutation(np.arange(data_size))
            shuffled_data = data[shuffle_index]
        else:
            shuffled_data = data
        #del data
        for epoch in range(self.num_epoches):
            for batch_num in range(num_batches_per_epoch):
                start = batch_num * self.batch_size
                end   = min(start + self.batch_size,data_size)
                yield shuffled_data[start:end]      

    def fit(self,data):
        #self.graph = tf.Graph()
        #with self.graph.as_default():
        self.session = tf.Session()
        with self.session as sess:
            #self.saver = tf.train.Saver(tf.global_variables())
            sess.run(tf.global_variables_initializer())
            batches = self.random_batches(list(data))
            accuracy_list = []
            loss_list = []
            #prediction_list = []
            iterations = 0
            # model saving
            save_path = os.path.abspath(os.path.join(os.path.curdir, 'models'))                 
            if not os.path.exists(save_path):
                os.makedirs(save_path)

            for batch in batches:
                iterations  += 1
                x_batch, y_batch = zip(*batch)
                x_batch = np.array(x_batch)
                y_batch = np.array(y_batch)
                feed = { self.inputs: x_batch,
                         self.targets: y_batch,
                         self.keep_prob: 0.5}
                batch_pred, batch_accuracy, batch_cost, _ = sess.run([self.predictions, self.accuracy,\
                                                        self.loss, self.train_op], feed_dict=feed)
                accuracy_list.append(batch_accuracy)
                loss_list.append(batch_cost)

                if iterations % 10 == 0:
                    print('The trainning step is {0}'.format(iterations),\
                         'trainning_loss: {:.3f}'.format(loss_list[-1]), \
                         'trainning_accuracy: {:.3f}'.format(accuracy_list[-1]))
                if iterations % 100 == 0:
                    self.saver.save(sess, os.path.join(save_path, 'model'), global_step = iterations)

            self.saver.save(sess, os.path.join(save_path, 'model'), global_step = iterations)

    def load_model(self,start_path=None):
        if start_path == None:
            start_path = os.path.abspath(os.path.join(os.path.curdir,'models'))
            print('default start_path is',start_path)
            #star = start_path
            ckpt = tf.train.get_checkpoint_state('./models')
            print('This is out checking of ckpt:',ckpt.model_checkpoint_path)
            #self.saver = tf.train.import_meta_graph(ckpt.model_checkpoint_path+'.meta')            
            self.session = tf.Session()
            self.saver.restore(self.session, ckpt.model_checkpoint_path)
            print('Restored from {} completed'.format(ckpt.model_checkpoint_path))
        else:
            self.session = tf.Session()
            self.saver.restore(self.session,start_path)
            print('Restored from {} completed'.format(start_path))

    def predict_accuracy(self,data,test=True):
        # loading_model
        self.load_model()
        sess = self.session
        iterations = 0
        accuracy_list = []
        predictions = []
        self.num_epoches = 1
        batches = self.random_batches(data,shuffle=False)
        for batch in batches:
            iterations += 1
            x_inputs, y_inputs = zip(*batch)
            x_inputs = np.array(x_inputs)
            y_inputs = np.array(y_inputs)
            feed = {self.inputs: x_inputs,
                    self.targets: y_inputs,
                    self.keep_prob: 1.0         
                }
            batch_pred, batch_accuracy, batch_loss = sess.run([self.predictions,\
            self.accuracy, self.loss], feed_dict=feed)
            accuracy_list.append(batch_accuracy)
            predictions.append(batch_pred)
            print('The trainning step is {0}'.format(iterations),\
                 'trainning_accuracy: {:.3f}'.format(accuracy_list[-1]))            

        accuracy = np.mean(accuracy_list)
        predictions = [list(pred) for pred in predictions]
        predictions = [p for pred in predictions for p in pred]
        predictions = np.array(predictions)
        if test :
            return predictions, accuracy
        else:
            return accuracy

    def predict(self, data):
        # load_model
        self.load_model()
        sess = self.session
        iterations = 0
        predictions_list = []
        self.num_epoches = 1
        batches = self.random_batches(data)
        for batch in batches:
            x_inputs = batch
            feed = {self.inputs : x_inputs,
                    self.keep_prob:1.0}
            batch_pred = sess.run([self.predictions],feed_dict=feed)
            predictions_list.append(batch_pred)
        predictions = [list(pred) for pred in predictions_list]
        predictions = [p for pred in predictions for p in pred]
        predictions = np.array(predictions).reshape(-1,1)
        print(predictions)
        return predictions

3、在程式碼實現過程中需要用到原有網址上資料處理程式碼主要儲存在一個為:data_helpers.py檔案中(此處重點推薦其正則表示式處理方法,感覺太好不忍改動)。相應程式碼如下:

import numpy as np
import re
import itertools
from collections import Counter
import os

def clean_str(string):
    """
    Tokenization/string cleaning for all datasets except for SST.
    Original taken from https://github.com/yoonkim/CNN_sentence/blob/master/process_data.py
    """
    string = re.sub(r"[^A-Za-z0-9(),!?\'\`]", " ", string)
    string = re.sub(r"\'s", " \'s", string)
    string = re.sub(r"\'ve", " \'ve", string)
    string = re.sub(r"n\'t", " n\'t", string)
    string = re.sub(r"\'re", " \'re", string)
    string = re.sub(r"\'d", " \'d", string)
    string = re.sub(r"\'ll", " \'ll", string)
    string = re.sub(r",", " , ", string)
    string = re.sub(r"!", " ! ", string)
    string = re.sub(r"\(", " \( ", string)
    string = re.sub(r"\)", " \) ", string)
    string = re.sub(r"\?", " \? ", string)
    string = re.sub(r"\s{2,}", " ", string)
    return string.strip().lower()


def load_data_and_labels(positive_data_file, negative_data_file):
    """
    Loads MR polarity data from files, splits the data into words and generates labels.
    Returns split sentences and labels.
    """
    # Load data from files
    positive_examples = list(open(positive_data_file, "r",encoding='utf8').readlines())
    positive_examples = [s.strip() for s in positive_examples]
    negative_examples = list(open(negative_data_file, "r",encoding='utf8').readlines())
    negative_examples = [s.strip() for s in negative_examples]
    # Split by words
    x_text = positive_examples + negative_examples
    x_text = [clean_str(sent) for sent in x_text]
    # Generate labels
    positive_labels = [[0, 1] for _ in positive_examples]
    negative_labels = [[1, 0] for _ in negative_examples]
    y = np.concatenate([positive_labels, negative_labels], 0)
    return [x_text, y]

4、模型訓練檔案,儲存檔名如:data_trainning.py,相應程式碼如下:

import tensorflow as tf
import numpy as np
import os
from tensorflow.contrib import learn
import data_helpers

from TextCNNClassifier import NN_config, CALC_config, TextCNNClassifier
# Data Preparation
# ==================================================
positive_data_file = "./data/rt-polaritydata/rt-polarity.pos"
negative_data_file = "./data/rt-polaritydata/rt-polarity.neg"
dev_sample_percentage = 0.1
# Load data
print("Loading data...")
x_text, y = data_helpers.load_data_and_labels(positive_data_file, negative_data_file)

# Build vocabulary
max_document_length = max([len(x.split(" ")) for x in x_text])
vocab_processor = learn.preprocessing.VocabularyProcessor(max_document_length)
x = np.array(list(vocab_processor.fit_transform(x_text)))

print('vocabulary length is:',len(vocab_processor.vocabulary_))
# Randomly shuffle data
np.random.seed(10)
shuffle_indices = np.random.permutation(np.arange(len(y)))
x_shuffled = x[shuffle_indices]
y_shuffled = y[shuffle_indices]

# Split train/test set
# TODO: This is very crude, should use cross-validation
dev_sample_index = -1 * int(dev_sample_percentage * float(len(y)))
x_train, x_dev = x_shuffled[:dev_sample_index], x_shuffled[dev_sample_index:]
y_train, y_dev = y_shuffled[:dev_sample_index], y_shuffled[dev_sample_index:]
print('The leangth of X_train is {}'.format(len(x_train)))
print('The length of x_dev is {}'.format(len(x_dev)))


#------------------------------------------------------------------------------
# ----------------  model processing ------------------------------------------
#------------------------------------------------------------------------------
num_seqs = max_document_length
num_classes = 2
num_filters = 128
filter_steps = [5,6,7]
embedding_size = 200
vocab_size = len(vocab_processor.vocabulary_)

learning_rate = 0.001
batch_size    = 128
num_epoches   = 20
l2_ratio      = 0.0

trains = list(zip(x_train, y_train))
devs   = list(zip(x_dev,y_dev))

config_nn = NN_config(num_seqs      = num_seqs,
                      num_classes   = num_classes,
                      num_filters   = num_filters,
                      filter_steps  = filter_steps,
                      embedding_size= embedding_size,
                      vocab_size    = vocab_size)
config_calc = CALC_config(learning_rate = learning_rate,
                          batch_size    = batch_size,
                          num_epoches   = num_epoches,
                          l2_ratio      = l2_ratio)

print('this is checking list:\\\\\n',
        'num_seqs:{}\n'.format(num_seqs),\
        'num_classes:{} \n'.format(num_classes),\
        'embedding_size:{}\n'.format(embedding_size),\
        'num_filters:{}\n'.format(num_filters),\
        'vocab_size:{}\n'.format(vocab_size),\
        'filter_steps:',filter_steps)
print('this is check calc list:\\\\\n',
        'learning_rate :{}\n'.format(learning_rate),\
        'num_epoches: {} \n'.format(num_epoches),\
        'batch_size: {} \n'.format(batch_size),\
        'l2_ratio : {} \n'.format(l2_ratio))


text_model = TextCNNClassifier(config_nn,config_calc)
text_model.fit(trains)  
accuracy = text_model.predict_accuracy(devs,test=False) 
print('the dev accuracy is :',accuracy)               

predictions = text_model.predict(x_dev)
#print(predictions)                   

5、訓練結果: 
train 資料檔案訓練accuracy=1.0 相應測試文件中,不同訓練次結果其測試結果有些偏差,最好的為:74.03%,最壞的為:72.503%

資源來源於網路