1. 程式人生 > >深度學習專案實戰--對於評論的情感分析

深度學習專案實戰--對於評論的情感分析

標籤: 機器學習


該專案通過分析影評進行判斷該評價的情感方向

專案準備:

1. 關於影評的資料集
2. 關於影評的情感標籤
3. python的各種運算庫

關於影評的資料集與情感標籤點選此處下載
運算庫請自行下載

【插入:電影】

(好吧,圖文無關)

實現思想

對影評的每一個單詞進行提取,通過神經網路找到單詞之間與情緒的聯絡,進而進行預測

實現效果

準確率達85%以上,運算速率可達7000條影評/秒
這裡寫圖片描述

現在開始我們的專案程式碼

1.首先我們要讀入影評與情感標籤


g = open('reviews.txt'
,'r') # What we know! reviews = list(map(lambda x:x[:-1],g.readlines())) g.close() g = open('labels.txt','r') # What we WANT to know! labels = list(map(lambda x:x[:-1].upper(),g.readlines())) g.close()

2.引入需要使用的庫

from collections import Counter
import numpy as np
import time
import sys
import numpy as
np

3.實現神經網路

class SentimentNetwork:
    def __init__(self, reviews,labels,min_count = 10,polarity_cutoff = 0.1,hidden_nodes = 10, learning_rate = 0.1):

        np.random.seed(1)

                    ################神經網路的資料預處理#################
        self.pre_process_data(reviews, labels, polarity_cutoff, min_count)

                    ##########神經網路的資料初始化###########
        self.init_network(len(self.review_vocab),hidden_nodes, 1, learning_rate)


                ###################################################
                 #############神經網路的資料預處理函式實現#############
                 ###################################################

    def pre_process_data(self, reviews, labels, polarity_cutoff, min_count):

    ###建立三個計數器分別對正面,負面,所有,進行計數
        positive_counts = Counter()
        negative_counts = Counter()
        total_counts = Counter()

        #對正面評論的單詞進行計數
        for i in range(len(reviews)):
            if(labels[i] == 'POSITIVE'):
                for word in reviews[i].split(" "):
                    positive_counts[word] += 1
                    total_counts[word] += 1
        #對負面評價的單詞進行計數
            else:
                for word in reviews[i].split(" "):
                    negative_counts[word] += 1
                    total_counts[word] += 1

        ###建立一個比率計數器
        pos_neg_ratios = Counter()

        ###對正面與反面的評論單詞的比率進行計數
        ###正面比率大則為正數,反面比率大則為負數
        for term,cnt in list(total_counts.most_common()):
            if(cnt >= 50):
                pos_neg_ratio = positive_counts[term] / float(negative_counts[term]+1)
                pos_neg_ratios[term] = pos_neg_ratio

        for word,ratio in pos_neg_ratios.most_common():
            if(ratio > 1):
                pos_neg_ratios[word] = np.log(ratio)
            else:
                pos_neg_ratios[word] = -np.log((1 / (ratio + 0.01)))

        ###只對出現總次數大於min_count以及比率介於±polarity_cutoff之間的單詞進行統計
        review_vocab = set()
        for review in reviews:
            for word in review.split(" "):
                if(total_counts[word] > min_count):
                    if(word in pos_neg_ratios.keys()):
                        if((pos_neg_ratios[word] >= polarity_cutoff) or (pos_neg_ratios[word] <= -polarity_cutoff)):
                            review_vocab.add(word)
                    else:
                        review_vocab.add(word)

        #將詞彙錶轉換為一個列表,這樣我們就可以通過索引訪問單詞。
        self.review_vocab = list(review_vocab)

        # 對單詞所對應的標籤進行填充
        label_vocab = set()
        for label in labels:
            label_vocab.add(label)

        # 將標籤詞彙錶轉換為一個列表,這樣我們就可以通過索引訪問標籤。
        self.label_vocab = list(label_vocab)

        #儲存影評和標籤詞彙陣列的大小。
        self.review_vocab_size = len(self.review_vocab)
        self.label_vocab_size = len(self.label_vocab)

        #對索引的影評與標籤重新編寫字典
        self.word2index = {}
        for i, word in enumerate(self.review_vocab):
            self.word2index[word] = i

        self.label2index = {}
        for i, label in enumerate(self.label_vocab):
            self.label2index[label] = i

            ###################################################
            ##########神經網路的資料初始化的函式實現###############
            ###################################################

    def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
        #輸入層、隱藏層、輸出層的節點數量
        self.input_nodes = input_nodes
        self.hidden_nodes = hidden_nodes
        self.output_nodes = output_nodes

        #學習速率
        self.learning_rate = learning_rate

        #權重初始化
        self.weights_0_1 = np.zeros((self.input_nodes,self.hidden_nodes))
        self.weights_1_2 = np.random.normal(0.0, self.output_nodes**-0.5, 
                                                (self.hidden_nodes, self.output_nodes))

        #初始化隱藏層資料
        self.layer_1 = np.zeros((1,hidden_nodes))


    #############標籤數字化##############
    def get_target_for_label(self,label):
        if(label == 'POSITIVE'):
            return 1
        else:
            return 0

    ###############啟用函式:sigmoid函式################
    def sigmoid(self,x):
        return 1 / (1 + np.exp(-x))

     ###############sigmoid函式的倒數################
    def sigmoid_output_2_derivative(self,output):
        return output * (1 - output)


    ###########################################################
    ###################訓練函式程式碼實現########################
    ###########################################################

    def train(self, training_reviews_raw, training_labels):

    #標記條影評中每個出現的單詞,對應在字典中記錄下來作為輸入層
        training_reviews = list()
        for review in training_reviews_raw:
            indices = set()
            for word in review.split(" "):
                if(word in self.word2index.keys()):
                    indices.add(self.word2index[word])
            training_reviews.append(list(indices))

        # 確保每個影評都有且僅有一個標籤與其對應
        assert(len(training_reviews) == len(training_labels))

       #記錄預測正確的數量
        correct_so_far = 0

        # 記錄時間
        start = time.time()

        #對每條影評學習的迴圈
        for i in range(len(training_reviews)):

            review = training_reviews[i]
            label = training_labels[i]

            #### 實現前向傳播 ####

            # 隱藏層的計算
            self.layer_1 *= 0
            for index in review:
                self.layer_1 += self.weights_0_1[index]

            # 輸出層的計算
            layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))            

            ### 反向傳播的實現 ###

            # 輸出誤差計算
            layer_2_error = layer_2 - self.get_target_for_label(label) 
            layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)

            # 反向傳播誤差計算
            layer_1_error = layer_2_delta.dot(self.weights_1_2.T) 
            layer_1_delta = layer_1_error

            # 更新權重
            self.weights_1_2 -= self.layer_1.T.dot(layer_2_delta) * self.learning_rate 

            for index in review:
                self.weights_0_1[index] -= layer_1_delta[0] * self.learning_rate # update input-to-hidden weights with gradient descent step

            # 對預測情況進行判斷
            if(layer_2 >= 0.5 and label == 'POSITIVE'):
                correct_so_far += 1
            elif(layer_2 < 0.5 and label == 'NEGATIVE'):
                correct_so_far += 1

            # 對預測以及學習情況進行即時輸出
            elapsed_time = float(time.time() - start)
            reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0

            sys.stdout.write("\rProgress:" + str(100 * i/float(len(training_reviews)))[:4] \
                             + "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
                             + " #Correct:" + str(correct_so_far) + " #Trained:" + str(i+1) \
                             + " Training Accuracy:" + str(correct_so_far * 100 / float(i+1))[:4] + "%")
            if(i % 2500 == 0):
                print("")
    ###################################################
    ###################測試函式的實現##################
    ###################################################
    def test(self, testing_reviews, testing_labels):

    #用於直接測試的函式,沒有train函式的權重更新
        correct = 0
        start = time.time()
        for i in range(len(testing_reviews)):
            pred = self.run(testing_reviews[i])
            if(pred == testing_labels[i]):
                correct += 1

            elapsed_time = float(time.time() - start)
            reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0

            sys.stdout.write("\rProgress:" + str(100 * i/float(len(testing_reviews)))[:4] \
                             + "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
                             + " #Correct:" + str(correct) + " #Tested:" + str(i+1) \
                             + " Testing Accuracy:" + str(correct * 100 / float(i+1))[:4] + "%")
    #################################################
    ####################run函式的實現################
    #################################################
    def run(self, review):

    #該函式通過資料的前向傳播直接輸出預測結果

        self.layer_1 *= 0
        unique_indices = set()
        for word in review.lower().split(" "):
            if word in self.word2index.keys():
                unique_indices.add(self.word2index[word])
        for index in unique_indices:
            self.layer_1 += self.weights_0_1[index]
        layer_2 = self.sigmoid(self.layer_1.dot(self.weights_1_2))
        if(layer_2[0] >= 0.5):
            return "POSITIVE"
        else:
            return "NEGATIVE"

最後開始對資料進行學習

mlp = SentimentNetwork(reviews[:-1000],labels[:-1000],min_count=20,polarity_cutoff=0.8,learning_rate=0.01)
mlp.train(reviews[:-1000],labels[:-1000])

通過測試對學習效果進行評定

mlp.test(reviews[-1000:],labels[-1000:])

將以上程式碼複製貼上即可執行,可以看到學習效果,準確率高達85%

並且該程式碼對資料進行了清洗與篩選,對神經網路的結構做了結構優化

將100條每秒的資料處理提高到了7000條每秒

這裡寫圖片描述