1. 程式人生 > >(資料探勘-入門-8)基於樸素貝葉斯的文字分類器

(資料探勘-入門-8)基於樸素貝葉斯的文字分類器

主要內容:

1、動機

2、基於樸素貝葉斯的文字分類器

3、python實現

一、動機

之前介紹的樸素貝葉斯分類器所使用的都是結構化的資料集,即每行代表一個樣本,每列代表一個特徵屬性。

但在實際中,尤其是網頁中,爬蟲所採集到的資料都是非結構化的,如新聞、微博、帖子等,如果要對對這一類資料進行分類,應該怎麼辦呢?例如,新聞分類,微博情感分析等。

本文就介紹一種基於樸素貝葉斯的文字分類器。

二、基於樸素貝葉斯的文字分類器

目標:對非結構化的文字進行分類

首先,回顧一下樸素貝葉斯公式:

特徵、特徵處理:

對於結構化資料,公式中的D代表的是樣本或一系列整理或抽象出來的特徵或屬性,

而在非結構化的資料中,只有文件和單詞,文件對應樣本,單詞對應特徵。

如果將單詞作為特徵,未免特徵太多了,一篇文章有那麼多單詞,而且有些單詞並不起什麼作用,因此需要對特徵即單詞進行處理。

停用詞:我們將一些常見的詞,如the, a, I, is, that等,稱為“停用詞”,因為它們在很多文章都會出現,不具稱為特徵的代表性。

模型引數計算:

別忘了,樸素貝葉斯的假設前提:在已知類別下,所有特徵是獨立的。(在文字中,即不考慮單詞之間的次序和相關性)

如何計算模型的引數,即已知類別時某文字的條件概率呢?(這裡只是進行簡單的統計而已,複雜一點的可以考慮TF-IDF作為單詞特徵,下面的公式已經做了平滑處理

Wk:表示某個單詞

hi: 表示某個類別

nk: 表示單詞wk在類別hi中出現的次數

n:表示類別中的單詞總數

vocabulary:表示類別中的單詞數

分類:

來一篇新文章,如何判斷它是屬於哪一類呢?

如下公式,分別計算屬於每一類的概率,然後取概率最大的作為其類別。

應用:

新聞分類、垃圾郵件分類、微博情感分析等等

三、python實現

資料集:

程式碼:

1、新聞分類

複製程式碼
from __future__ import print_function
import os, codecs, math

class BayesText:

    def __init__(self, trainingdir, stopwordlist):
        """This class implements a naive Bayes approach to text
        classification
        trainingdir is the training data. Each subdirectory of
        trainingdir is titled with the name of the classification
        category -- those subdirectories in turn contain the text
        files for that category.
        The stopwordlist is a list of words (one per line) will be
        removed before any counting takes place.
        
""" self.vocabulary = {} self.prob = {} self.totals = {} self.stopwords = {} f = open(stopwordlist) for line in f: self.stopwords[line.strip()] = 1 f.close() categories = os.listdir(trainingdir) #filter out files that are not directories self.categories = [filename for filename in categories if os.path.isdir(trainingdir + filename)] print("Counting ...") for category in self.categories: print(' ' + category) (self.prob[category], self.totals[category]) = self.train(trainingdir, category) # I am going to eliminate any word in the vocabulary # that doesn't occur at least 3 times toDelete = [] for word in self.vocabulary: if self.vocabulary[word] < 3: # mark word for deletion # can't delete now because you can't delete # from a list you are currently iterating over toDelete.append(word) # now delete for word in toDelete: del self.vocabulary[word] # now compute probabilities vocabLength = len(self.vocabulary) print("Computing probabilities:") for category in self.categories: print(' ' + category) denominator = self.totals[category] + vocabLength for word in self.vocabulary: if word in self.prob[category]: count = self.prob[category][word] else: count = 1 self.prob[category][word] = (float(count + 1) / denominator) print ("DONE TRAINING\n\n") def train(self, trainingdir, category): """counts word occurrences for a particular category""" currentdir = trainingdir + category files = os.listdir(currentdir) counts = {} total = 0 for file in files: #print(currentdir + '/' + file) f = codecs.open(currentdir + '/' + file, 'r', 'iso8859-1') for line in f: tokens = line.split() for token in tokens: # get rid of punctuation and lowercase token token = token.strip('\'".,?:-') token = token.lower() if token != '' and not token in self.stopwords: self.vocabulary.setdefault(token, 0) self.vocabulary[token] += 1 counts.setdefault(token, 0) counts[token] += 1 total += 1 f.close() return(counts, total) def classify(self, filename): results = {} for category in self.categories: results[category] = 0 f = codecs.open(filename, 'r', 'iso8859-1') for line in f: tokens = line.split() for token in tokens: #print(token) token = token.strip('\'".,?:-').lower() if token in self.vocabulary: for category in self.categories: if self.prob[category][token] == 0: print("%s %s" % (category, token)) results[category] += math.log( self.prob[category][token]) f.close() results = list(results.items()) results.sort(key=lambda tuple: tuple[1], reverse = True) # for debugging I can change this to give me the entire list return results[0][0] def testCategory(self, directory, category): files = os.listdir(directory) total = 0 correct = 0 for file in files: total += 1 result = self.classify(directory + file) if result == category: correct += 1 return (correct, total) def test(self, testdir): """Test all files in the test directory--that directory is organized into subdirectories--each subdir is a classification category""" categories = os.listdir(testdir) #filter out files that are not directories categories = [filename for filename in categories if os.path.isdir(testdir + filename)] correct = 0 total = 0 for category in categories: print(".", end="") (catCorrect, catTotal) = self.testCategory( testdir + category + '/', category) correct += catCorrect total += catTotal print("\n\nAccuracy is %f%% (%i test instances)" % ((float(correct) / total) * 100, total)) # change these to match your directory structure baseDirectory = "20news-bydate/" trainingDir = baseDirectory + "20news-bydate-train/" testDir = baseDirectory + "20news-bydate-test/" stoplistfile = "20news-bydate/stopwords0.txt" print("Reg stoplist 0 ") bT = BayesText(trainingDir, baseDirectory + "stopwords0.txt") print("Running Test ...") bT.test(testDir) print("\n\nReg stoplist 25 ") bT = BayesText(trainingDir, baseDirectory + "stopwords25.txt") print("Running Test ...") bT.test(testDir) print("\n\nReg stoplist 174 ") bT = BayesText(trainingDir, baseDirectory + "stopwords174.txt") print("Running Test ...") bT.test(testDir)
複製程式碼

2、情感分析

複製程式碼
from __future__ import print_function
import os, codecs, math

class BayesText:

    def __init__(self, trainingdir, stopwordlist, ignoreBucket):
        """This class implements a naive Bayes approach to text
        classification
        trainingdir is the training data. Each subdirectory of
        trainingdir is titled with the name of the classification
        category -- those subdirectories in turn contain the text
        files for that category.
        The stopwordlist is a list of words (one per line) will be
        removed before any counting takes place.
        """
        self.vocabulary = {}
        self.prob = {}
        self.totals = {}
        self.stopwords = {}
        f = open(stopwordlist)
        for line in f:
            self.stopwords[line.strip()] = 1
        f.close()
        categories = os.listdir(trainingdir)
        #filter out files that are not directories
        self.categories = [filename for filename in categories
                           if os.path.isdir(trainingdir + filename)]
        print("Counting ...")
        for category in self.categories:
            #print('    ' + category)
            (self.prob[category],
             self.totals[category]) = self.train(trainingdir, category,
                                                 ignoreBucket)
        # I am going to eliminate any word in the vocabulary
        # that doesn't occur at least 3 times
        toDelete = []
        for word in self.vocabulary:
            if self.vocabulary[word] < 3:
                # mark word for deletion
                # can't delete now because you can't delete
                # from a list you are currently iterating over
                toDelete.append(word)
        # now delete
        for word in toDelete:
            del self.vocabulary[word]
        # now compute probabilities
        vocabLength = len(self.vocabulary)
        #print("Computing probabilities:")
        for category in self.categories:
            #print('    ' + category)
            denominator = self.totals[category] + vocabLength
            for word in self.vocabulary:
                if word in self.prob[category]:
                    count = self.prob[category][word]
                else:
                    count = 1
                self.prob[category][word] = (float(count + 1)
                                             / denominator)
        #print ("DONE TRAINING\n\n")
                    

    def train(self, trainingdir, category, bucketNumberToIgnore):
        """counts word occurrences for a particular category"""
        ignore = "%i" % bucketNumberToIgnore
        currentdir = trainingdir + category
        directories = os.listdir(currentdir)
        counts = {}
        total = 0
        for directory in directories:
            if directory != ignore:
                currentBucket = trainingdir + category + "/" + directory
                files = os.listdir(currentBucket)
                #print("   " + currentBucket)
                for file in files:
                    f = codecs.open(currentBucket + '/' + file, 'r', 'iso8859-1')
                    for line in f:
                        tokens = line.split()
                        for token in tokens:
                            # get rid of punctuation and lowercase token
                            token = token.strip('\'".,?:-')
                            token = token.lower()
                            if token != '' and not token in self.stopwords:
                                self.vocabulary.setdefault(token, 0)
                                self.vocabulary[token] += 1
                                counts.setdefault(token, 0)
                                counts[token] += 1
                                total += 1
                    f.close()
        return(counts, total)
                    
                    
    def classify(self, filename):
        results = {}
        for category in self.categories:
            results[category] = 0
        f = codecs.open(filename, 'r', 'iso8859-1')
        for line in f:
            tokens = line.split()
            for token in tokens:
                #print(token)
                token = token.strip('\'".,?:-').lower()
                if token in self.vocabulary:
                    for category in self.categories:
                        if self.prob[category][token] == 0:
                            print("%s %s" % (category, token))
                        results[category] += math.log(
                            self.prob[category][token])
        f.close()
        results = list(results.items())
        results.sort(key=lambda tuple: tuple[1], reverse = True)
        # for debugging I can change this to give me the entire list
        return results[0][0]

    def testCategory(self, direc, category, bucketNumber):
        results = {}
        directory = direc + ("%i/" % bucketNumber)
        #print("Testing " + directory)
        files = os.listdir(directory)
        total = 0
        correct = 0
        for file in files:
            total += 1
            result = self.classify(directory + file)
            results.setdefault(result, 0)
            results[result] += 1
            #if result == category:
            #               correct += 1
        return results

    def test(self, testdir, bucketNumber):
        """Test all files in the test directory--that directory is
        organized into subdirectories--each subdir is a classification
        category"""
        results = {}
        categories = os.listdir(testdir)
        #filter out files that are not directories
        categories = [filename for filename in categories if
                      os.path.isdir(testdir + filename)]
        correct = 0
        total = 0
        for category in categories:
            #print(".", end="")
            results[category] = self.testCategory(
                testdir + category + '/', category, bucketNumber)
        return results

def tenfold(dataPrefix, stoplist):
    results = {}
    for i in range(0,10):
        bT = BayesText(dataPrefix, stoplist, i)
        r = bT.test(theDir, i)
        for (key, value) in r.items():
            results.setdefault(key, {})
            for (ckey, cvalue) in value.items():
                results[key].setdefault(ckey, 0)
                results[key][ckey] += cvalue
                categories = list(results.keys())
    categories.sort()
    print(   "\n       Classified as: ")
    header =    "          "
    subheader = "        +"
    for category in categories:
        header += "% 2s   " % category
        subheader += "-----+"
    print (header)
    print (subheader)
    total = 0.0
    correct = 0.0
    for category in categories:
        row = " %s    |" % category 
        for c2 in categories:
            if c2 in results[category]:
                count = results[category][c2]
            else:
                count = 0
            row += " %3i |" % count
            total += count
            if c2 == category:
                correct += count
        print(row)
    print(subheader)
    print("\n%5.3f percent correct" %((correct * 100) / total))
    print("total of %i instances" % total)

# change these to match your directory structure
prefixPath = "reviewPolarityBuckets/review_polarity_buckets/"
theDir = prefixPath + "/txt_sentoken/"
stoplistfile = prefixPath + "stopwords25.txt"
tenfold(theDir, stoplistfile)
複製程式碼