1. 程式人生 > >統計學習方法_AdaBoost演算法實現

統計學習方法_AdaBoost演算法實現

這裡用的是二值化的MNIST,同時將特徵的值也二值化了。書上舉的例子特徵只有一維,但大多數情況下特徵不會只有一維,這裡每次都會遍歷最優切分特徵和最優切分點,弱分類器選擇最簡單的閾值分類器,對於每個弱分類器都有與其對應的切分特徵和切分點,在預測的時候將預測資料也只使用需要的特徵值即可。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import math
import logging

import numpy as np
import pandas as pd
import pandas as pd

from sklearn.cross_validation import train_test_split

class Sign(object):
	'''
		閾值分類器
		有兩種方向,
		1) x<v y=1
		2) x>v y=1
		因為這裡的MNIST資料集已經二值化,所以v只有三個取值:{0,1,2}
	'''
	def __init__(self, features, labels, w):
		self.X = features  # 訓練集,只有一個特徵
		self.Y = labels
		self.N = len(labels)

		self.w = w  # 訓練集的權值分佈

		self.indexes = [0, 1, 2]  # v可取的值

	def train_less_than(self):
		'''
			尋找最優切分點v
			且尋找的是(x<v y=1)的最優切分點
		'''
		index = -1
		error_score = 1000000

		for i in self.indexes:  # 遍歷所有切分點
			score = 0
			for j in range(self.N):  # 遍歷所有特徵值
				val = -1
				if self.X[j] < i:  # train_less_than函式假設的是小於v標籤為1
					val = 1  # val為分類器的預測值
				if val * self.Y[j] < 0:  # 被誤分類,所以加上權重
					score += self.w[j]
			if score < error_score:
				index = i
				error_score = score
		return index, error_score

	def train_more_than(self):
		'''
			尋找(x>v y=1)的最優切分點
		'''
		index = -1
		error_score = 1000000

		for i in self.indexes:
			score = 0
			for j in range(self.N):
				val = 1
				if self.X[j] < i:
					val = -1
				if val * self.Y[j] < 0:
					score += self.w[j]
			if score < error_score:
				index = i
				error_score = score
		return index, error_score

	def train(self):
		less_index, less_score = self.train_less_than()
		more_index, more_score = self.train_more_than()

		if less_score < more_score:
			self.is_less = True
			self.index = less_index
			return less_score
		else:
			self.is_less = False
			self.index = more_index
			return more_score

	def predict(self, feature):
		if self.is_less:
			if feature < self.index:
				return 1.0
			else:
				return -1.0
		else:
			if feature > self.index:
				return 1.0
			else:
				return -1.0

class AdaBoost(object):
	def __init__(self):
		pass

	def init_parameters(self, features, labels):
		self.X = features
		self.Y = labels

		self.n = features.shape[1]  # 特徵數
		self.N = features.shape[0]  # 訓練集大小
		self.M = 10  # 分類器數目

		self.w = [1.0 / self.N] * self.N  # 訓練集的權值
		self.alpha = []  # 分類器的權重
		self.classifier = []  # (特徵下標,分類器),針對當前特徵的分類器

	def _w(self, index, classifier, i):
		'''
			公式(8.4),但不包括規範化因子Z
			index是指當前弱分類器所選取的最優特徵,用來計算弱分類器預測值的
		'''
		return self.w[i] * math.exp(-self.alpha[-1] * self.Y[i] * classifier.predict(self.X[i][index]))

	def Z(self, index, classifier):
		'''
			公式(8.5)
		'''
		Z = 0
		for i in range(self.N):
			Z += self._w(index, classifier, i)
		return Z

	def train(self, features, labels):
		self.init_parameters(features, labels)

		for iteration in range(self.M):  # 每次迭代選出最優的特徵和分類器
			logging.debug('iteration %d' % iteration)

			# (誤差率,針對的特徵,分類器)
			best_classifier = (100000, None, None)
			for i in range(self.n):  # 尋找最優特徵
				features = list(map(lambda x: x[i], self.X))  # 獲得這個特徵下所有特徵值
				classifier = Sign(features, self.Y, self.w)
				error_score = classifier.train()

				if error_score < best_classifier[0]:
					best_classifier = (error_score, i, classifier)

		em = best_classifier[0]  # 最優分類誤差率
		if em == 0:
			self.alpha.append(100)
		else:
			self.alpha.append(0.5 * math.log((1 - em) / em))

		self.classifier.append(best_classifier[1:])

		Z = self.Z(best_classifier[1], best_classifier[2])

		# 計算新的訓練集權值分佈,(8.4)
		for i in range(self.N):
			self.w[i] = self._w(best_classifier[1], best_classifier[2], i) / Z

	def _predict(self, feature):
		result = 0.0
		for i in range(self.M):  # 遍歷弱分類器
			index = self.classifier[i][0]  # 獲取特徵下標
			classifier = self.classifier[i][1]  # 獲取弱分類器

			result += self.alpha[i] * classifier.predict(feature[index])

		if result > 0:
			return 1
		else:
			return -1

	def predict(self, features):
		results = []
		for feature in features:
			results.append(self._predict(feature))
		return results

# 將圖片二值化
def binaryzation(image):
    cv_img = []
    for i in image:
        if i > 0:
            cv_img.append(1)
        else:
            cv_img.append(0)
    return np.array(cv_img)

def binaryzation_features(train_set):
	features = []

	for img in train_set:
		img = binaryzation(img)
		features.append(img)

	features = np.array(features)
	features = features.reshape(-1, 784)
	return features

if __name__ == '__main__':
	logger = logging.getLogger()
	logger.setLevel(logging.DEBUG)

	print('Start reading data:')
	time1 = time.time()

	raw_data = pd.read_csv('data/train_binary.csv', header=0)
	data = raw_data.values

	imgs = data[:, 1:]
	labels = data[:, 0]

	# 首先將圖片二值化
	features = binaryzation_features(imgs)
	# 1/2訓練集,1/2測試集
	train_features, test_features, train_labels, test_labels = train_test_split(features, labels, test_size=0.5, random_state=0)

	print(train_features.shape)

	time2 = time.time()
	print('read data cost %f seconds' % (time2 - time1))

	print('Start training:')
	# 將標籤轉化為1和-1,將lambda函式作用於train_labels
	# Python3 map返回迭代器物件,所以要外加list()
	train_labels = list(map(lambda x: 2 * x - 1, train_labels))
	ada = AdaBoost()
	ada.train(train_features, train_labels)
	time3 = time.time()
	print('training cost %f seconds' % (time3 - time2))

	print('Start predicting:')
	test_predict = ada.predict(test_features)
	time4 = time.time()
	print('predicting cost %f seconds' % (time4 - time3))

	# 測試集標籤也要轉化為1和-1
	test_labels = map(lambda x: 2 * x - 1, test_labels)
	accuracy = sum([test_labels[i] == test_predict[i] for i in range(len(test_labels))]) / len(test_labels)
	print("The accuracy is %f!" % accuracy)

'''
output:
Start reading data:
(21000, 784)
read data cost 17.040476 seconds
Start training:
DEBUG:root:iteration 0
DEBUG:root:iteration 1
...(執行時間過長,最終正確率可達98%以上)
'''