基於交通燈資料集的端到端分類
還沒寫完,程式碼還要進行修改。。
抓住11月的尾巴,這裡寫上昨天做的一個DL的作業吧,作業很簡單,基於交通燈的影象分類,但這確是讓你從0構建深度學習系統的好例子,很多已有的資料集都封裝好了,直接呼叫,這篇文章將以pytorch這個深度學習框架一步步搭建分類系統。
軟體包要求:
pytorch:0.4.0
torchsummary:pip install torchsummary
cv2: pip install opencv-python
matplotlib
numpy
所有程式碼託管到github上,連結如下:https://github.com/FangYang970206/TL_Dataset_Classification,下載到本地。
1.資料集簡介
資料集有10個類別,分別是紅燈的圓球,向左,向右,向上和負例以及綠燈的圓球,向左,向右,向上和負例,如下圖所示:
資料集的可通過如下連結進行下載:onedrive,baiduyun,google。
下完資料集後,解壓到資料夾TL_Dataset_Classification-master中,得到一個新的資料夾TL_Dataset,可以看到TL_Dataset有以下目錄:
2.程式碼實戰
程式碼是在vscode上編寫的,支援flask8,總共有9個檔案,下面一一介紹。建議在看程式碼的時候從main.py檔案開始看,大致脈絡就清楚了。
2.1 model.py
對於一個深度學習系統來說,model應該是最初的想法,我們想構造什麼樣的模型來擬合數據集,所以先寫model,程式碼如下:
import torch.nn as nn import numpy as np from torchsummary import summary class A2NN(nn.Module): def __init__(self, ): super(A2NN, self).__init__() self.main = nn.Sequential( nn.Conv2d(3, 16, 3, 1, 1), nn.LeakyReLU(negative_slope=0.1), nn.Conv2d(16, 32, 3, 2, 1), nn.BatchNorm2d(32), nn.LeakyReLU(negative_slope=0.1), nn.Conv2d(32, 32, 3, 2, 1), nn.BatchNorm2d(32), nn.LeakyReLU(negative_slope=0.1), nn.Conv2d(32, 64, 3, 2, 1), nn.BatchNorm2d(64), nn.LeakyReLU(negative_slope=0.1), ) self.linear = nn.Linear(8*8*64, 10) def forward(self, inp): x = self.main(inp) x = x.view(x.shape[0], -1) x = self.linear(x) return x if __name__ == "__main__": inp = np.random.randn(3, 64, 64) nn = A2NN() summary(nn, (3, 64, 64))
model程式碼不復雜,很簡單,這裡不多介紹,缺少基礎的朋友還請自行補基礎。
2.2 dataset.py
第二步我們要構建資料集類,pytorch封裝了一個torch.utils.data.Dataset的類,我們可以過載__len__
和__getitem__
方法,來得到自己的資料集管道,__len__
方法是返回資料集的長度,__getitem__
是支援從0到len(self)互斥範圍內的整數索引,返回的是索引對應的資料和標籤。程式碼如下:
import torch
import cv2
import torch.utils.data as data
class_light = {
'Red Circle': 0,
'Green Circle': 1,
'Red Left': 2,
'Green Left': 3,
'Red Up': 4,
'Green Up': 5,
'Red Right': 6,
'Green Right': 7,
'Red Negative': 8,
'Green Negative': 9
}
class Traffic_Light(data.Dataset):
def __init__(self, dataset_names, img_resize_shape):
super(Traffic_Light, self).__init__()
self.dataset_names = dataset_names
self.img_resize_shape = img_resize_shape
def __getitem__(self, ind):
img = cv2.imread(self.dataset_names[ind])
img = cv2.resize(img, self.img_resize_shape)
img = img.transpose(2, 0, 1)-127.5/127.5
for key in class_light.keys():
if key in self.dataset_names[ind]:
label = class_light[key]
# pylint: disable=E1101,E1102
return torch.from_numpy(img), torch.tensor(label)
# pylint: disable=E1101,E1102
def __len__(self):
return len(self.dataset_names)
if __name__ == '__main__':
from torch.utils.data import DataLoader
from glob import glob
import os
path = 'TL_Dataset/Green Up/'
names = glob(os.path.join(path, '*.png'))
dataset = Traffic_Light(names, (64, 64))
dataload = DataLoader(dataset, batch_size=1)
for ind, (inp, label) in enumerate(dataload):
print("{}-inp_size:{}-label_size:{}".format(ind, inp.numpy().shape,
label.numpy().shape))
2.3 util.py
在上面的dataset.py中,class初始化時,傳入了dataset_names,所以utils.py檔案中就通過get_train_val_names函式得到訓練資料集和驗證資料集的names,還有一個函式是檢查資料夾是否存在,不存在建立資料夾。程式碼如下:
import os
from glob import glob
def get_train_val_names(dataset_path, remove_names, radio=0.3):
train_names = []
val_names = []
dataset_paths = os.listdir(dataset_path)
for n in remove_names:
dataset_paths.remove(n)
for path in dataset_paths:
sub_dataset_path = os.path.join(dataset_path, path)
sub_dataset_names = glob(os.path.join(sub_dataset_path, '*.png'))
sub_dataset_len = len(sub_dataset_names)
val_names.extend(sub_dataset_names[:int(radio*sub_dataset_len)])
train_names.extend(sub_dataset_names[int(radio*sub_dataset_len):])
return {'train': train_names, 'val': val_names}
def check_folder(path):
if not os.path.exists(path):
os.mkdir(path)
2.4 trainer.py
model構造好了,資料集也準備好了,現在就需要準備如果訓練了,這就是trainer.py檔案的作用,trainer.py構建了Trainer類,通過傳入訓練的一系列引數,呼叫Trainer.train函式進行訓練,並返回loss,程式碼如下:
import torch.nn as nn
from torch.optim import Adam
class Trainer:
def __init__(self, model, dataload, epoch, lr, device):
self.model = model
self.dataload = dataload
self.epoch = epoch
self.lr = lr
self.device = device
self.optimizer = Adam(self.model.parameters(), lr=self.lr)
self.criterion = nn.CrossEntropyLoss().to(self.device)
def __epoch(self, epoch):
self.model.train()
loss_sum = 0
for ind, (inp, label) in enumerate(self.dataload):
inp = inp.float().to(self.device)
label = label.long().to(self.device)
self.optimizer.zero_grad()
out = self.model.forward(inp)
loss = self.criterion(out, label)
loss.backward()
loss_sum += loss.item()
self.optimizer.step()
print('epoch{}_step{}_train_loss_: {}'.format(epoch,
ind,
loss.item()))
return loss_sum/(ind+1)
def train(self):
train_loss = self.__epoch(self.epoch)
return train_loss
2.5 validator.py
trainer.py檔案是用來進行訓練資料集的,訓練過程中,我們是需要有驗證集來判斷我們模型的訓練效果,所以這裡有validator.py檔案,裡面封裝了Validator類,與Trainer.py類似,但不同的是,我們不訓練,不更新引數,model處於eval模式,程式碼上會有一些跟Trainer不一樣,通過呼叫Validator.eval函式返回loss,程式碼如下:
import torch.nn as nn
class Validator:
def __init__(self, model, dataload, epoch, device, batch_size):
self.model = model
self.dataload = dataload
self.epoch = epoch
self.device = device
self.batch_size = batch_size
self.criterion = nn.CrossEntropyLoss().to(self.device)
def __epoch(self, epoch):
self.model.eval()
loss_sum = 0
for ind, (inp, label) in enumerate(self.dataload):
inp = inp.float().to(self.device)
label = label.long().to(self.device)
out = self.model.forward(inp)
loss = self.criterion(out, label)
loss_sum += loss.item()
return {'val_loss': loss_sum/(ind+1)}
def eval(self):
val_loss = self.__epoch(self.epoch)
return val_loss
2.6 logger.py
我們想看整個學習的過程,可以通過看學習曲線來進行觀察。所以這裡寫了一個logger.py檔案,用來對訓練loss和驗證loss進行統計並畫圖。程式碼如下:
import matplotlib.pyplot as plt
import os
class Logger:
def __init__(self, save_path):
self.save_path = save_path
def update(self, Kwarg):
self.__plot(Kwarg)
def __plot(self, Kwarg):
save_img_path = os.path.join(self.save_path, 'learning_curve.png')
plt.clf()
plt.plot(Kwarg['train_losses'], label='Train', color='g')
plt.plot(Kwarg['val_losses'], label='Val', color='b')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend()
plt.title('learning_curve')
plt.savefig(save_img_path)
2.7 main.py
main.py檔案將上面所有的東西結合到一起,程式碼如下:
import torch
import argparse
from model import A2NN
from dataset import Traffic_Light
from utils import get_train_val_names, check_folder
from trainer import Trainer
from validator import Validator
from logger import Logger
from torch.utils.data import DataLoader
def main():
parse = argparse.ArgumentParser()
parse.add_argument('--dataset_path', type=str, default='TL_Dataset/')
parse.add_argument('--remove_names', type=list, default=['README.txt',
'README.png',
'Testset'])
parse.add_argument('--img_resize_shape', type=tuple, default=(64, 64))
parse.add_argument('--batch_size', type=int, default=32)
parse.add_argument('--lr', type=float, default=0.001)
parse.add_argument('--num_workers', type=int, default=4)
parse.add_argument('--epochs', type=int, default=100)
parse.add_argument('--val_size', type=float, default=0.3)
parse.add_argument('--save_model', type=bool, default=True)
parse.add_argument('--save_path', type=str, default='logs/')
args = vars(parse.parse_args())
check_folder(args['save_path'])
# pylint: disable=E1101
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# pylint: disable=E1101
model = A2NN().to(device)
names = get_train_val_names(args['dataset_path'], args['remove_names'])
train_dataset = Traffic_Light(names['train'], args['img_resize_shape'])
val_dataset = Traffic_Light(names['val'], args['img_resize_shape'])
train_dataload = DataLoader(train_dataset,
batch_size=args['batch_size'],
shuffle=True,
num_workers=args['num_workers'])
val_dataload = DataLoader(val_dataset,
batch_size=args['batch_size'],
shuffle=True,
num_workers=args['num_workers'])
loss_logger = Logger(args['save_path'])
logger_dict = {'train_losses': [],
'val_losses': []}
for epoch in range(args['epochs']):
print('<Main> epoch{}'.format(epoch))
trainer = Trainer(model, train_dataload, epoch, args['lr'], device)
train_loss = trainer.train()
if args['save_model']:
state = model.state_dict()
torch.save(state, 'logs/nn_state.t7')
validator = Validator(model, val_dataload, epoch,
device, args['batch_size'])
val_loss = validator.eval()
logger_dict['train_losses'].append(train_loss)
logger_dict['val_losses'].append(val_loss['val_loss'])
loss_logger.update(logger_dict)
if __name__ == '__main__':
main()
2.8 compute_prec.py和submit.py
其實上面的七個檔案,已經是結束了,下面兩個檔案一個是用來計算精確度的,一個是用來提交答案的。有興趣可以看看。
compute_prec.py程式碼如下:
import torch
import numpy as np
import argparse
from model import A2NN
from dataset import Traffic_Light
from torch.utils.data import DataLoader
from utils import get_train_val_names, check_folder
def main():
parse = argparse.ArgumentParser()
parse.add_argument('--dataset_path', type=str, default='TL_Dataset/')
parse.add_argument('--remove_names', type=list, default=['README.txt',
'README.png',
'Testset'])
parse.add_argument('--img_resize_shape', type=tuple, default=(64, 64))
parse.add_argument('--num_workers', type=int, default=4)
parse.add_argument('--val_size', type=float, default=0.3)
parse.add_argument('--save_path', type=str, default='logs/')
args = vars(parse.parse_args())
check_folder(args['save_path'])
# pylint: disable=E1101
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# pylint: disable=E1101
model = A2NN().to(device)
model.load_state_dict(torch.load(args['save_path']+'nn_state.t7'))
model.eval()
names = get_train_val_names(args['dataset_path'], args['remove_names'])
val_dataset = Traffic_Light(names['val'], args['img_resize_shape'])
val_dataload = DataLoader(val_dataset,
batch_size=1,
num_workers=args['num_workers'])
count = 0
for ind, (inp, label) in enumerate(val_dataload):
inp = inp.float().to(device)
label = label.long().to(device)
output = model.forward(inp)
output = np.argmax(output.to('cpu').detach().numpy(), axis=1)
label = label.to('cpu').numpy()
count += 1 if output == label else 0
print('precision: {}'.format(count/(ind+1)))
if __name__ == "__main__":
main()
submit.py程式碼如下:
import torch
import numpy as np
import argparse
import os
import cv2
from model import A2NN
from utils import check_folder
def main():
parse = argparse.ArgumentParser()
parse.add_argument('--dataset_path', type=str,
default='TL_Dataset/Testset/')
parse.add_argument('--img_resize_shape', type=tuple, default=(64, 64))
parse.add_argument('--num_workers', type=int, default=4)
parse.add_argument('--save_path', type=str, default='logs/')
args = vars(parse.parse_args())
check_folder(args['save_path'])
# pylint: disable=E1101
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# pylint: disable=E1101
model = A2NN().to(device)
model.load_state_dict(torch.load(args['save_path']+'nn_state.t7'))
model.eval()
txt_path = os.path.join(args['save_path'], 'result.txt')
with open(txt_path, 'w') as f:
for i in range(20000):
name = os.path.join(args['dataset_path'], '{}.png'.format(i))
img = cv2.imread(name)
img = cv2.resize(img, args['img_resize_shape'])
img = img.transpose(2, 0, 1)-127.5/127.5
img = torch.unsqueeze(torch.from_numpy(img).float(), dim=0)
img = img.to(device)
output = model.forward(img).to('cpu').detach().numpy()
img_class = np.argmax(output, axis=1)
f.write(name.split('/')[2] + ' ' + str(img_class[0]))
f.write('\n')
if __name__ == "__main__":
main()
3. 程式碼如下執行
將資料集下載在資料夾TL_Dataset_Classification-master,解壓後,在TL_Dataset_Classification-master檔案中進入終端,執行命令:
$ python main.py
如果還想計算精確度,在訓練玩資料集之後,執行命令:
$ python compute_prec.py
有執行可以到github上提issue或者在給我的郵箱[email protected]發郵件。
4. 總結
好了,11月的尾巴到此結束,希望能對你學習深度學習問題和pytorch有所幫助。12月馬上到,祝我數學考試順利,也祝各位開開心心!