1. 程式人生 > >機器學習實戰-隨機森林二分類問題

機器學習實戰-隨機森林二分類問題

lena elf 線性 評估 形式 www. 分類器 and 數據

隨機森林

概論

前提

Random Forest:可以理解為Bagging with CARTS.

Bagging是bootstrap aggregating(引導聚集算法)的縮寫。

CART(classification and regression Tree)分類和回歸樹,二分類樹。

這裏涉及到集成式學習的概念,集成學習可以分為Bagging和Boosting.

Bagging:自放回式采樣,一種弱分類器,采用少數服從多數的機制,並行式運算。

Boosting:自適應的集成學習,順序叠代,串行式運算。代表算法AdaBoost(Adaptive Boosting)

CART采用分而治之的策略。

回歸樹:采用分治策略,對於無法用唯一的全局線性回歸來優化的目標進行分而治之,進而取得比較準確的結果。但分段後取均值並不是一個明智的選擇,可以考慮將葉節點設置成一個線性函數,即分段線性模型樹。

算法介紹鏈接:https://www.tuicool.com/articles/iiUfeim

數據集出處:https://archive.ics.uci.edu/ml/datasets/Connectionist+Bench+(Sonar,+Mines+vs.+Rocks)

python三維向量轉二維向量

運行:print(sum([[[1,2,3],[4,5,5]],[[1,2,3],[4,5,5]]],[]))

輸出:[[1, 2, 3], [4, 5, 5], [1, 2, 3], [4, 5, 5]]

python中多個實參,放到一個元組裏面,以*開頭,可以傳多個參數

*args:(表示的就是將實參中按照位置傳值,多出來的值都給args,且以元祖的方式呈現)

分類回歸效果的判斷指標:

Information Entropy(信息熵)、Gini Index(基尼指數)、Gini Split(基尼分割)、Misclassification Error(錯誤分類)

以上判斷數值越小,模型的效果越好

Information Gain(信息增益),數值越大,效果越好

實戰

數據集說明

【sonar-all-data.csv】

60 個輸入變量表示聲納從不同角度返回的強度。這是一個二元分類問題(binary classification problem),要求模型能夠區分出巖石和金屬柱體的不同材質和形狀,總共有 208 個觀測樣本。

附代碼

 #coding = utf-8
 

from random import seed
from random import randrange
from csv import reader
from math import sqrt
from math import log
class randomForest:
    def __init__(self):
        print(randomforest==start==)
        print(seed(1))


    #導入數據
    def load_csv(self,filename):
        dataset = list()
        with open(filename, r) as file:
            csv_reader = reader(file)
            for row in csv_reader:
                if not row:
                    continue
                dataset.append(row)
        return dataset

    #Convert string column to integer
    def str_column_to_float(self,dataset,column):
        for row in dataset:
            row[column] = float(row[column].strip())

    #Convert String column to integer
    def str_column_to_int(self,dataset,column):
        class_values = [row[column] for row in dataset]
        unique = set(class_values)
        lookup = dict()
        #enumerate()用於將一個可遍歷的數據對象組合成一個索引序列
        for i, value in enumerate(unique):
            lookup[value] = i
        for row in dataset:
            row[column] = lookup[row[column]]
        return lookup

    #Create a random subsample from the dataset with replacement
    #創建隨機子樣本
    def subsample(self,dataset,ratio):
        sample = list()
        #round()方法返回浮點數x的四舍五入值
        n_sample = round(len(dataset)* ratio)
        # print(n_sample)
        while len(sample)< n_sample:
            #有放回的隨機采樣,有一些樣本被重復采樣,從而在訓練集中多次出現,有的則從未在訓練集中出現。
            #此方法即為自助采樣法,從而保證每顆決策樹訓練集的差異性
            index = randrange(len(dataset))
            sample.append(dataset[index])
        return sample

    #Split a dataset based on an attribute and an attribute value
    #根據特征和特征值分割數據集
    def test_split(self,index,value,dataset):
        left, right = list(), list()
        for row in dataset:
            if row[index] < value:
                left.append(row)
            else:
                right.append(row)
        return left,right

    #計算基尼指數
    def gini_index(self, groups,class_values):
        gini = 0.0
        # print(groups)
        # print(len(class_values))輸出:166
        for class_value in class_values:
            for group in groups:
                size = len(group)
                if size == 0:
                    continue
                # print(class_value)輸出:{‘M‘}
                # print(group)
                # exit()
                # print(size)輸出:109
                #list count()統計某個元素出現在列表中的次數
                proportion = [row[-1] for row in group].count(class_value)/float(size)

                # print(proportion)
                gini += (proportion * (1.0 - proportion))
        # print(gini)
        return gini


    #Select the best split point for a dataset
    #找出分割數據集的最優特征,得到最優的特征index,特征值row[index],以及分割完的數據groups(left,right)
    def get_split(self,dataset,n_features):
        #class_values =[0,1]
        class_values = list(set(row[-1]) for row in dataset)

        b_index, b_value, b_score,b_group = 999,999,999,None
        features = list()

        #n_features特征值
        while len(features) < n_features:

            #往features添加n_features個特征(n_features等於特征數的根號),特征索引從dataset中隨機取
            index = randrange(len(dataset[0]) - 1)
            if index not in features:
                features.append(index)

        #在n_features個特征中選出最優的特征索引,並沒有遍歷所有特征,從而保證每個決策樹的差異
        for index in features:
            for row in dataset:
                #groups = (left, right);row[index]遍歷每一行index索引下的特征值作為分類值values,
                #找出最優的分類特征和特征值
                groups = self.test_split(index,row[index],dataset)
                # print(groups)輸出格式:[[]],[[]]
                gini = self.gini_index(groups, class_values)
                # print(gini)
                if gini < b_score:
                    #最後得到最優的分類特征b_index,分類特征值b_value,分類結果b_groups。 b_value為分錯的代價成本
                    b_index,b_value,b_score,b_groups = index, row[index], gini, groups
        return {index:b_index, value:b_value, groups:b_groups}

    #創建一個終端節點
    #輸出group中出現次數最多的標簽
    def to_terminal(self,group):
        outcomes = [row[-1] for row in group]
        #max()函數中,當key函數不為空時,就以key的函數對象為判斷的標準
        # print(outcomes)
        return max(set(outcomes), key = outcomes.count)



    #創建子分割器,遞歸分類,直到分類結束
    def split(self, node, max_depth, min_size, n_features, depth):
        #max_depth = 10 ,min_size = 1, n_features = int(sqrt(len(dataset[0])) - 1)
        left,right = node[groups]
        # print(‘node[groups]====‘)
        del(node[groups])
        
        #檢查左右分支
        if not left or not right:
            node[left] = node[right] = self.to_terminal(left+right)
            return

        #檢查叠代次數,表示遞歸十次後,若分類還沒結束,則選取數據中分類標簽較多的作為結果,使分類提前結束,防止過擬合。
        if depth >= max_depth:
            node[left], node[right] = self.to_terminal(left), self.to_terminal(right)

        #加工左子樹
        if len(left)<= min_size:
            node[left] = self.to_terminal(left)
        else:
            # print(‘左子樹遞歸‘)
            # node[‘left‘]是一個字典,形式為{‘index‘:b_index,‘value‘:b_value,‘groups‘:b_groups},所以node是一個多層字典
            node[left] = self.get_split(left, n_features)
            #遞歸,depth+1計算遞歸層數
            self.split(node[left],max_depth,min_size,n_features,depth+1)

        #加工右子樹
        if len(right) <= min_size:
            node[right] = self.to_terminal(right)
        else:
            # print(‘右子樹遞歸‘)
            node[right] = self.get_split(right,n_features)
            self.split(node[right],max_depth,min_size,n_features,depth+1)


    #build a decision tree,建立一個決策樹
    def build_tree(self,train,max_depth,min_size,n_features):

        #找出最優的分割點
        root = self.get_split(train,n_features)
        # print(root)
        #創建子分類器,遞歸分類,直到分類結束。
        self.split(root, max_depth,min_size,n_features, 1)
        return root
        # exit()

    #用決策樹進行預測,預測模型的分類結果
    def predict(self,node,row):
        if row[node[index]] < node[value]:
            if isinstance(node[left], dict):
                return self.predict(node[left], row)
            else:
                return node[left]
        else:
            if isinstance(node[right],dict):
                return self.predict(node[right],row)
            else:
                return node[right]


    #用一系列的套袋樹進行預測
    def bagging_predict(self, trees,row):
        #使用多個決策樹trees對測試集test的第row行進行預測,再使用簡單投票法判斷出該行所屬的分類
        predictions = [self.predict(tree, row) for tree in trees]
        return max(set(predictions), key = predictions.count)

    #Random Forest Algorithm,隨機森林算法
    def random_forest(self,train, test, max_depth, min_size,sample_size,n_trees,n_features):
        trees = list()
        #n_trees表示決策樹的數量
        for i in range(n_trees):
            #隨機采樣,保證每顆決策樹訓練集的差異性
            #sample_size采樣速率
            print(訓練集長度=,len(train))
            #創建隨機子樣本
            sample = self.subsample(train, sample_size)
            #建立一個決策樹
            tree = self.build_tree(sample,max_depth,min_size,n_features)
            # print(tree)
            trees.append(tree)
        ##用一系列的套袋樹進行預測
        predictions = [self.bagging_predict(trees, row) for row in test]
        # print(predictions)
        return(predictions)
        # exit()

    #Split a dataset into k folds
    ‘‘‘
    將數量集dataset分成n_flods份,每份包含len(dataset)/ n_folds個值,每個值由dataset數據集的內容隨機產生,每個值被調用一次
    ‘‘‘
    def cross_validation_split(self,dataset,n_folds):
        dataset_split = list()
        #復制一份dataset,防止dataset的內容改變
        dataset_copy = list(dataset)
        #每份的數據量
        fold_size = len(dataset)/n_folds
        # print(dataset_copy)
        print(每份的長度,fold_size )
        print(dataset_copy長度=,len(dataset_copy))

        for i in range(n_folds):
            #每次循環fold清零,防止重復導入dataset_split
            fold = list()
            #隨機抽取數據,不斷地往fold中添加數據
            while len(fold) < fold_size:
                #指定遞增基數集合中的一個隨機數,基數缺省值為1
                # print(‘dataset長度=‘,len(dataset_copy))
                if(len(dataset_copy)==0):
                    break
                index = randrange(len(dataset_copy))
                #將對應索引index的內容從dataset_copy中導出,並將該內容從dataset_copy中刪除。
                #pop()函數用於移除列表中的一個元素,並返回該元素的值。
                fold.append(dataset_copy.pop(index))
                # print(len(fold))
            dataset_split.append(fold)
            print(i===,i)
        #dataset分割出的n_flods個數據構成的列表,為了用於交叉驗證
        return dataset_split

    #計算精度百分比,導入實際值和預測值,計算精確度
    def accuracy_metric(self, actual,predicted):
        correct = 0 
        for i in range(len(actual)):
            if actual[i] == predicted[i]:
                correct +=1
        return correct/float(len(actual)) * 100.0


    def evaluate_algorithm(self, dataset, algorithm, n_folds, *args):


        folds = self.cross_validation_split(dataset, n_folds)
        scores = list()
        #每次循環從folds取出一個fold作為測試集,其余作為訓練集,遍歷整個folds,實現交叉驗證
        for fold in folds:
            train_set = list(folds)
            train_set.remove(fold)
            #sum三維向量轉二維數組,將多個fold組合成一個train_set列表
            train_set = sum(train_set,[])

            test_set = list()
            #fold表示從原始數據集dataset提取出來的測試集
            for row in fold:
                row_copy = list(row)
                test_set.append(row_copy)
                row_copy[-1] = None

            predicted = algorithm(train_set,test_set,*args)
            print(交叉驗證RF的預測值=,predicted)
            actual = [row[-1] for row in fold]
            accuracy = self.accuracy_metric(actual, predicted)
            scores.append(accuracy)
            

        return scores

if __name__ == __main__:
    rf = randomForest()

    #load data
    filename = sonar-all-data.csv
    dataset = rf.load_csv(filename)
    # print(dataset)

    #整個矩陣,按列從左到右轉化
    for i in range(0,len(dataset[0]) - 1):
        #將str類型轉變為float
        rf.str_column_to_float(dataset, i)
    # print(dataset)
    #將最後一列表示表示標簽的值轉化為Int類型0,1
    # str_column_to_int(dataset, len(dataset[0]) - 1 )
    #evaluate algorithm算法評估
    #分成5份,進行交叉驗證
    n_folds = 5

    #叠代次數
    max_depth = 10
    min_size = 1
    sample_size = 1.0
    #調參,TODO,準確性與多樣性之間的權衡
    n_features = 15
    # n_features = int (sqrt(len(dataset[0]) - 1))
    #隨機森林的樹的選擇,理論上越多越好
    for n_trees in [1,10,20]:

        #python中將函數作為另一個函數的參數傳入
        scores = rf.evaluate_algorithm(dataset, rf.random_forest, n_folds,max_depth, min_size,sample_size,n_trees,n_features)
        print(Trees:%d % n_trees)
        print(Scores:%s % scores)
        print(Mean Accuracy: %.3f%% % (sum(scores)/float(len(scores))))
        exit()

機器學習實戰-隨機森林二分類問題