學習筆記之——基於pytorch的FSRCNN
阿新 • • 發佈:2019-01-14
本博文為本人學習基於pytorch的FSRCNN的學習筆記,僅供本人學習記錄用
先採用data_aug.m來增廣資料
再採用generate_train.m將資料生成.h5文件。
至於測試集,此處只採用set5中的五張圖片,所以先不用generate_test.m
gwptrain.py
#!/usr/bin/python # coding=gbk #匯入一系列的庫 from __future__ import print_function import os import argparse#匯入argparse模組 from torch.utils.data import DataLoader from FSRCNN.solver import FSRCNNTrainer from dataset.data import get_h5_set,get_img_set ################### #與訓練相關的設定 ################### parser = argparse.ArgumentParser(description='PyTorch for FSRCNN example')#建立解析器物件ArgumentParser,新增引數description:描述程式。 #設定一系列超參hyper-parameters parser.add_argument('--batchSize', type=int, default=128, help='training batch size') parser.add_argument('--nEpochs', type=int, default=300, help='number of epochs to train for') parser.add_argument('--lr', type=float, default=0.001, help='Learning Rate. Adam default=0.001') parser.add_argument('--seed', type=int, default=123, help='random seed to use. Default=123') #放大的倍數scale configuration parser.add_argument('--upscale_factor', '-uf', type=int, default=3, help="super resolution upscale factor. default=3") args = parser.parse_args()#應該是起到一個彙總前面的argument的作用 #https://www.cnblogs.com/zknublx/p/6106343.html #主函式 def main(): # =========================================================== # Set train dataset & test dataset # =========================================================== #os.environ["CUDU_VISIBLE_DEVICES"]="2,3,4" print('===> Loading datasets') #PyTorch在做一般的深度學習影象處理任務時,先使用dataset類和dataloader類讀入圖片 #用資料集100作為訓練資料 train_set = get_h5_set('train.h5') #train_set = get_h5_set('91.h5') #pytorch中提供DataLoader來實現資料的操作 training_data_loader = DataLoader(dataset=train_set, batch_size=args.batchSize, shuffle=True) #用set5測試資料 #Set5_img_loader = 'set5.mat'#get_img_set('Test/Set5') set5_img = get_img_set('Test/Set5') Set5_img_loader = DataLoader(dataset=set5_img, batch_size=1, shuffle=False) #set5_h5 = get_h5_set('test.h5') #Set5_h5_loader = DataLoader(dataset=set5_h5, batch_size=1, shuffle=False) #訓練模型 model = FSRCNNTrainer(args,training_data_loader,Set5_img_loader)#Set5_h5_loader model.run() if __name__ == '__main__': main()
data.py
#!/usr/bin/python # -- coding: utf-8 -- from os.path import join from .dataset import LoadH5, LoadImg def get_h5_set(train_set):#實參為'Train\91.h5' ''' Load H5 dataset. :param train_set: the filename of the dataset :return: the loaded data ''' train_dir = join("./dataset", train_set) #1、返回通過指定字元連線序列中元素後生成的新字串。 #2、此處的join為os.path.join。將兩個路徑拼接在一起 return LoadH5(train_dir)#這個函式定義在dataset中 #此步驟返回一系列圖片資料給train_set def get_img_set(test_set):#實參為'Test/set5.mat' ''' Load images file data. :param train_set: the folder name of the images in :return: the loaded data ''' test_dir = join("./dataset", test_set) return LoadImg(test_dir)
dataset.py
#!/usr/bin/python # -- coding: utf-8 -- import torch.utils.data as data from PIL import Image from torchvision.transforms import ToTensor, CenterCrop, Resize import h5py import numpy as np from torch import from_numpy from math import floor from os import listdir from os.path import join def is_image_file(filename): ''' Check wheather the file is a image file. :param filename: name of the file :return: bool value shows that whether it is a image ''' return any(filename.endswith(extension) for extension in [".png", ".jpg", ".jpeg", ".bmp"]) def load_img(filepath): ''' Load the image and get the luminance data. :param filepath: path of the image. :return: luminance data ''' img = Image.open(filepath).convert('YCbCr') y, _, _ = img.split() return y class LoadH5(data.Dataset):#實參為訓練集資料的路徑。'Train\91.h5'的資料。image_h5=train_dir=Train\91.h5 def __init__(self, image_h5): super(LoadH5, self).__init__() self.to_tensor = ToTensor() #pytorch在讀入圖片的時候需要做transform變換,其中transform一般都需要ToTensor()操作,相當於將轉換為tensor #對於一個圖片img,呼叫ToTensor轉化成張量的形式 #https://blog.csdn.net/qq_37385726/article/details/81811466 #定義imput和label self.input_patch = [] self.target_patch = [] with h5py.File(image_h5, 'r') as hf:#讀取h5的資料的方式。(不太理解原理,但大概應該是91.h5裡面已經包含了data和label的資料,data為LR,label為HR,同時打包在91.h5中) self.input_patch = np.array(hf.get('data')) self.target_patch = np.array(hf.get('label')) def __getitem__(self, index): input_image = self.input_patch[index] target_image = self.target_patch[index] return from_numpy(input_image), from_numpy(target_image) def __len__(self): return len(self.input_patch) #類似於上面,這是沒有處理h5檔案 class LoadImg(data.Dataset): def __init__(self, image_dir): super(LoadImg, self).__init__() self.image_filenames = [join(image_dir, x) for x in listdir(image_dir) if is_image_file(x)] self.to_tensor = ToTensor() def __getitem__(self, index): input_image = load_img(self.image_filenames[index]) x_re = floor((input_image.size[0] - 1) / 3 + 5) x = (x_re - 5) * 3 + 1 if x != input_image.size[0]: x = floor(x) y_re = floor((input_image.size[1] - 1) / 3 + 5) y = (y_re - 5) * 3 + 1 if y != input_image.size[1]: y = floor(y) self.crop = CenterCrop((x,y)) input_image = self.crop(input_image) target = input_image.copy() target = self.to_tensor(target) self.resize = Resize((x_re, y_re)) input_image = self.resize(input_image) input_image = self.to_tensor(input_image) return input_image, target def __len__(self): return len(self.image_filenames)
model.py
#!/usr/bin/python
# -- coding: utf-8 --
#模型的結構,跟FSRCNN中論文一致,原理可以參考博文:
#https://blog.csdn.net/gwplovekimi/article/details/83041627#FSRCNN%EF%BC%88Fast%20Super-Resolution%20Convolutional%20Neural%20Networks%EF%BC%89
import torch
import torch.nn as nn
class Net(torch.nn.Module):
def __init__(self, num_channels, upscale_factor, d=56, s=12):
super(Net, self).__init__()
# Feature extraction
self.conv1 = nn.Conv2d(in_channels=num_channels, out_channels=d, kernel_size=5, stride=1, padding=0)
self.prelu1 = nn.PReLU()
# Shrinking(收縮)
self.conv2 = nn.Conv2d(in_channels=d, out_channels=s, kernel_size=1, stride=1, padding=0)
self.prelu2 = nn.PReLU()
# Non-linear Mapping
self.conv3 = nn.Conv2d(in_channels=s, out_channels=s, kernel_size=3, stride=1, padding=1)
self.prelu3 = nn.PReLU()
self.conv4 = nn.Conv2d(in_channels=s, out_channels=s, kernel_size=3, stride=1, padding=1)
self.prelu4 = nn.PReLU()
self.conv5 = nn.Conv2d(in_channels=s, out_channels=s, kernel_size=3, stride=1, padding=1)
self.prelu5 = nn.PReLU()
self.conv6 = nn.Conv2d(in_channels=s, out_channels=s, kernel_size=3, stride=1, padding=1)
self.prelu6 = nn.PReLU()
# Expanding
self.conv7 = nn.Conv2d(in_channels=s, out_channels=d, kernel_size=1, stride=1, padding=0)
self.prelu7 = nn.PReLU()
# Deconvolution(反捲積)
self.last_part = nn.ConvTranspose2d(in_channels=d, out_channels=num_channels, kernel_size=9, stride=3, padding=4, output_padding=0)
#前向傳播的過程
def forward(self, x):#x為輸入資料
out = self.prelu1(self.conv1(x))
out = self.prelu2(self.conv2(out))
out = self.prelu3(self.conv3(out))
out = self.prelu4(self.conv4(out))
out = self.prelu5(self.conv5(out))
out = self.prelu6(self.conv6(out))
out = self.prelu7(self.conv7(out))
out = self.last_part(out)
return out
#權重的初始化
def weights_init(self):
#torch.nn.init.normal(tensor, mean=0, std=1)
nn.init.normal_(self.conv1.weight, 0, 0.0378)#MSRA initialization
nn.init.normal_(self.conv2.weight, 0, 0.3536)
nn.init.normal_(self.conv3.weight, 0, 0.1179)
nn.init.normal_(self.conv4.weight, 0, 0.1179)
nn.init.normal_(self.conv5.weight, 0, 0.1179)
nn.init.normal_(self.conv6.weight, 0, 0.1179)
nn.init.normal_(self.conv7.weight, 0, 0.189)
nn.init.normal_(self.last_part.weight, 0, 0.001)
#偏置初始化為0
nn.init.constant_(self.conv1.bias, 0)
nn.init.constant_(self.conv2.bias, 0)
nn.init.constant_(self.conv3.bias, 0)
nn.init.constant_(self.conv4.bias, 0)
nn.init.constant_(self.conv5.bias, 0)
nn.init.constant_(self.conv6.bias, 0)
nn.init.constant_(self.conv7.bias, 0)
nn.init.constant_(self.last_part.bias, 0)
solver.py
#!/usr/bin/python
# -- coding: utf-8 --
from __future__ import print_function
from math import log10
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from FSRCNN.model import Net
from scipy.misc import imread, imresize, imsave#對應imread
from PIL import Image
import numpy as np
import scipy.io as sio
from torchvision.transforms import ToTensor
#from visual_loss import Visualizer
from torchnet import meter
from torch.autograd import Variable
import cv2
from tensorboardX import SummaryWriter#tensorboard
writer = SummaryWriter(log_dir='logs')
from misc import progress_bar
class FSRCNNTrainer(object):#object為args, training_data_loader, Set5_img_loader
def __init__(self, config, training_loader,Set5_img_loader):
super(FSRCNNTrainer, self).__init__()
self.GPU_IN_USE = torch.cuda.is_available()#通過pytorch中的cuda操作,來看看是否有gpu
self.device = torch.device('cuda' if self.GPU_IN_USE else 'cpu')#要麼採用gpu要麼採用cpu
self.model = None
self.lr = config.lr#學習速率,預設為0.001
self.nEpochs = config.nEpochs
self.criterion = None#損失函式
self.optimizer = None#優化器物件Optimizer
self.scheduler = None
self.GPU = torch.cuda.is_available()
self.seed = config.seed#隨機數種子,用於初始化
self.upscale_factor = config.upscale_factor#放大倍數
self.initial_para = []
self.training_loader = training_loader#訓練集
#self.Set5_img_loader = Set5_img_loader#測試集
self.set5_img_loader= Set5_img_loader#測試集
#self.set5_h5_loader = set5_h5_loader
self.info = {'loss':0, 'PSNR for Set5':0}
self.nEpochs = config.nEpochs
#
def get_parameter(self, model, bias=False):#從model中獲得引數
'''
return weights or bias by setting bias False or True
'''
modules_skipped = (nn.PReLU)#torch.nn.PReLU。FSRCNN採用PReLU函式
for m in model.modules():#######################?????????????????????#########################
if isinstance(m, nn.Conv2d):#如果屬於卷積層的引數
#isinstance(object, classinfo)
#如果引數object是classinfo的例項,或者object是classinfo類的子類的一個例項,返回True
#如果object不是一個給定型別的的物件, 則返回結果總是False。
if bias:
yield m.bias
else:
yield m.weight
elif isinstance(m, nn.ConvTranspose2d):#如果屬於反捲積層的引數
if bias:
yield m.weight
yield m.bias
elif isinstance(m, modules_skipped):#如果屬於啟用函式的引數
continue
# else:
# raise ValueError('Unexpected module: %s' % str(m))
def build_model(self):
'''
optim: Adam, loss: MSE , lr : stepLR
'''
#此句使得程式碼可以在cuda或gpu下執行皆可
#https://ptorch.com/news/189.html
self.model = Net(num_channels=1, upscale_factor=self.upscale_factor).to(self.device)#model為model.py裡面定義的模型
print(self.model)#將模型得引數輸出
self.model.weights_init()#先進行權值的初始化
# premodel = torch.load('FSRCNN_model_path_Adam.pth') ###finetune
# premodel_dict = premodel.state_dict()
# self.model.load_state_dict(premodel_dict)
self.criterion = nn.MSELoss()#損失函式,計算MSE
torch.manual_seed(self.seed)#為CPU設定種子用於生成隨機數,以使得結果是確定的
#若有GPU,則為GPU設定種子
if self.GPU_IN_USE:
torch.cuda.manual_seed(self.seed) #set random seed for current GPU
cudnn.benchmark = True#在程式剛開始加這條語句可以提升一點訓練速度,沒什麼額外開銷,一般都會加。
self.criterion.cuda()#用GPU加速
#定義優化器
#https://blog.csdn.net/kgzhang/article/details/77479737
#class torch.optim.Adam(params, lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0)。此處輸入了待優化的引數與學習率
self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
###self.optimizer = optim.Adam([
###{'params': self.get_parameter(self.model, bias=False)},
###{'params': self.get_parameter(self.model, bias=True),'lr': self.lr * 0.1}], lr=self.lr)
# set different learning rate for weighs bias
#速率可變(學習速率的設定)
#set different learning rate for weighs bias
# self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=50, gamma=0.5) # lr decay
self.scheduler = optim.lr_scheduler.MultiStepLR(self.optimizer, milestones=[50, 75, 100], gamma=0.5) # lr decay
def save(self):#儲存這個模型(儲存權值)
model_out_path = "FSRCNN_model_path_Adam1.pth"
torch.save(self.model, model_out_path)
print("Checkpoint saved to {}".format(model_out_path))
#進行訓練
def train(self):
"""
data: [torch.cuda.FloatTensor], 4 batches: [64, 64, 64, 8]
"""
self.model.train()
train_loss = 0
# count = 0
for batch_num, (data, target) in enumerate(self.training_loader):##訓練集
#enumerate() 函式用於將一個可遍歷的資料物件(如列表、元組或字串)組合為一個索引序列,同時列出資料和資料下標
data, target = data.to(self.device), target.to(self.device)#轉換為cuda型別的資料。
self.optimizer.zero_grad()#將初始梯度設定為0
#https://blog.csdn.net/qq_34690929/article/details/79934843
loss = self.criterion(self.model(data), target)#計算損失值
#loss_meter.reset()#reset value
#loss_meter.add(loss.item())
#vis.plot_many_stack({'train_loss': loss_meter.value()[0]})#
train_loss += loss.item()#元素物件 例項 返回元素的第一個子節點
loss.backward()
self.optimizer.step()
progress_bar(batch_num, len(self.training_loader), 'Loss: %.4f' % (train_loss / (batch_num + 1)))
print(" Average Loss: {:.6f}".format(train_loss / len(self.training_loader)))
#######################################################################################3
#計算PSNR
def calpsnr(self, im1, im2):
#print(im1.shape)
#print(im2.shape)
#exit()
diff = np.abs(im1 - im2)
rmse = np.sqrt(np.mean(np.square(diff)))
psnr = 20*np.log10(255/rmse)
return psnr
#################################################################################################33
def test_set5_img(self):
'''
Get PSNR value for test set Set 5 images, and write to Tensorboards logs.
:return:
'''
self.model.eval()
avg_psnr = 0
for batch_num, (data, target) in enumerate(self.set5_img_loader):
target = target.numpy()
#target = target[:, :, 6:target.shape[2] - 6, 6:target.shape[3] - 6](被這句程式碼害死了!!!!!!!!!!!!)
# target = Variable(torch.from_numpy(target))
#if self.GPU:
#data, target = Variable(data).cuda(), Variable(torch.from_numpy(target)).cuda()
#else:
#data, target = Variable(data), Variable(torch.from_numpy(target))
data = data.to(self.device)#########
prediction = self.model(data)
prediction = prediction.data.cpu().numpy()
prediction = np.array(prediction)#########3
prediction = prediction*255###########
target = np.array(target)##########33
# prediction = prediction[:, :, 6:prediction.shape[2] - 6, 6:prediction.shape[3] - 6]
#if self.GPU:
#prediction = Variable(torch.from_numpy(prediction)).cuda()
#else:
#prediction = Variable(torch.from_numpy(prediction))
#print("prediction",prediction.shape)
#print("target",target.shape)
#exit()
#mse = self.criterion(prediction, target)
psnr = self.calpsnr(prediction, target)###################################
#imsave('/home/guanwp/FSRCNN/myproject/result/result.jpg', prediction)
#imsave('/home/guanwp/FSRCNN/myproject/result/target.jpg', target)
#print("MSE=",mse.data[0])
#psnr = 10 * log10(255 / mse.data[0])##################輸出PSNR
###################3psnr1=torch.from_numpy(psnr)
###############writer.add_graph(self.model,input_to_model=(psnr1,))
print("PSNR=",psnr)
avg_psnr += psnr
progress_bar(batch_num, len(self.set5_img_loader), 'PSNR: %.4fdB' % (avg_psnr / (batch_num + 1)))
self.info['PSNR for Set5'] = avg_psnr / len(self.set5_img_loader)
a=len(self.set5_img_loader)#####################33
print("a=",a)############3
print(" Average PSNR: {:.4f} dB".format(avg_psnr / len(self.set5_img_loader)))
############################################################################################3
def get_para(self):
'''
Return the pamameters in the model.
:return:
'''
para = []
for parameter in self.model.parameters():
para.append(parameter.data.cpu().numpy())
return para
def run(self):
self.build_model()
self.initial_para = self.get_para()
#global vis
#vis = Visualizer(env='FSRCNN_example')#為了視覺化增加的內容
global loss_meter, testimage_psnr
loss_meter = meter.AverageValueMeter()#為了視覺化增加的內容
testimage_psnr = meter.AverageValueMeter()#為了視覺化增加的內容
for epoch in range(1, self.nEpochs + 1):
print("\n===> Epoch {} starts:".format(epoch))
self.scheduler.step(epoch)
self.train()
print('Testing Set5:')
self.test_set5_img()#OK
#self.test_set5_patch()
#self.testset5image()#實時畫出測試曲線
#self.testset5image111111()
#self.test()
if epoch == self.nEpochs:
self.save()
執行前先開啟visdom
python -m visdom.server(開啟後出錯。。。。)
然後python gwptrain.py
(CUDA_VISIBLE_DEVICES=5 python gwptrain.py。選定GPU)
(檢視GPU狀態nvidia-smi)
執行結果:
第一次跑SR程式碼,似乎懂非懂得把程式啃下來,跑了300代,然後就沒有然後了。。。。接下來會繼續深入啃這個demo