1. 程式人生 > >學習筆記之——基於pytorch的FSRCNN

學習筆記之——基於pytorch的FSRCNN

本博文為本人學習基於pytorch的FSRCNN的學習筆記,僅供本人學習記錄用

先採用data_aug.m來增廣資料

再採用generate_train.m將資料生成.h5文件。

至於測試集,此處只採用set5中的五張圖片,所以先不用generate_test.m

gwptrain.py

#!/usr/bin/python
# coding=gbk

#匯入一系列的庫

from __future__ import print_function
import os
import argparse#匯入argparse模組
from torch.utils.data import DataLoader
from FSRCNN.solver import FSRCNNTrainer
from dataset.data import get_h5_set,get_img_set


###################
#與訓練相關的設定
###################
parser = argparse.ArgumentParser(description='PyTorch for FSRCNN example')#建立解析器物件ArgumentParser,新增引數description:描述程式。
#設定一系列超參hyper-parameters
parser.add_argument('--batchSize', type=int, default=128, help='training batch size')
parser.add_argument('--nEpochs', type=int, default=300, help='number of epochs to train for')
parser.add_argument('--lr', type=float, default=0.001, help='Learning Rate. Adam default=0.001')
parser.add_argument('--seed', type=int, default=123, help='random seed to use. Default=123')

#放大的倍數scale configuration
parser.add_argument('--upscale_factor', '-uf',  type=int, default=3, help="super resolution upscale factor. default=3")

args = parser.parse_args()#應該是起到一個彙總前面的argument的作用
#https://www.cnblogs.com/zknublx/p/6106343.html

#主函式
def main():
    # ===========================================================
    # Set train dataset & test dataset
    # ===========================================================
    #os.environ["CUDU_VISIBLE_DEVICES"]="2,3,4"
    print('===> Loading datasets')
    #PyTorch在做一般的深度學習影象處理任務時,先使用dataset類和dataloader類讀入圖片
    #用資料集100作為訓練資料
    train_set = get_h5_set('train.h5')
    #train_set = get_h5_set('91.h5')
    #pytorch中提供DataLoader來實現資料的操作
    training_data_loader = DataLoader(dataset=train_set, batch_size=args.batchSize, shuffle=True)
    #用set5測試資料
    #Set5_img_loader = 'set5.mat'#get_img_set('Test/Set5')
    
    
    set5_img = get_img_set('Test/Set5')
    Set5_img_loader = DataLoader(dataset=set5_img, batch_size=1, shuffle=False)
    #set5_h5 = get_h5_set('test.h5')
    #Set5_h5_loader = DataLoader(dataset=set5_h5, batch_size=1, shuffle=False)
    
    #訓練模型
    model = FSRCNNTrainer(args,training_data_loader,Set5_img_loader)#Set5_h5_loader
    model.run()

if __name__ == '__main__':
    main()

data.py

#!/usr/bin/python
# -- coding: utf-8 --

from os.path import join
from .dataset import LoadH5, LoadImg

def get_h5_set(train_set):#實參為'Train\91.h5'
    '''
    Load H5 dataset.
    :param train_set: the filename of the dataset
    :return: the loaded data
    '''
    train_dir = join("./dataset", train_set)
    #1、返回通過指定字元連線序列中元素後生成的新字串。
    #2、此處的join為os.path.join。將兩個路徑拼接在一起

    return LoadH5(train_dir)#這個函式定義在dataset中
    #此步驟返回一系列圖片資料給train_set


def get_img_set(test_set):#實參為'Test/set5.mat'
    '''
    Load images file data.
    :param train_set: the folder name of the images in
    :return: the loaded data
    '''
    test_dir = join("./dataset", test_set)

    return LoadImg(test_dir)

dataset.py

#!/usr/bin/python
# -- coding: utf-8 --

import torch.utils.data as data
from PIL import Image
from torchvision.transforms import ToTensor, CenterCrop, Resize
import h5py
import numpy as np
from torch import from_numpy
from math import floor
from os import listdir
from os.path import join

def is_image_file(filename):
    '''
    Check wheather the file is a image file.
    :param filename: name of the file
    :return: bool value shows that whether it is a image
    '''
    return any(filename.endswith(extension) for extension in [".png", ".jpg", ".jpeg", ".bmp"])


def load_img(filepath):
    '''
    Load the image and get the luminance data.
    :param filepath: path of the image.
    :return: luminance data
    '''
    img = Image.open(filepath).convert('YCbCr')
    y, _, _ = img.split()
    return y


class LoadH5(data.Dataset):#實參為訓練集資料的路徑。'Train\91.h5'的資料。image_h5=train_dir=Train\91.h5
    def __init__(self, image_h5):
        super(LoadH5, self).__init__()

        self.to_tensor = ToTensor()
        #pytorch在讀入圖片的時候需要做transform變換,其中transform一般都需要ToTensor()操作,相當於將轉換為tensor
        #對於一個圖片img,呼叫ToTensor轉化成張量的形式
        #https://blog.csdn.net/qq_37385726/article/details/81811466

        #定義imput和label
        self.input_patch = []
        self.target_patch = []

        with h5py.File(image_h5, 'r') as hf:#讀取h5的資料的方式。(不太理解原理,但大概應該是91.h5裡面已經包含了data和label的資料,data為LR,label為HR,同時打包在91.h5中)
            self.input_patch = np.array(hf.get('data'))
            self.target_patch = np.array(hf.get('label'))

    def __getitem__(self, index):
        input_image = self.input_patch[index]
        target_image = self.target_patch[index]
        return from_numpy(input_image), from_numpy(target_image)

    def __len__(self):
        return len(self.input_patch)

#類似於上面,這是沒有處理h5檔案
class LoadImg(data.Dataset):
    def __init__(self, image_dir):
        super(LoadImg, self).__init__()
        self.image_filenames = [join(image_dir, x) for x in listdir(image_dir) if is_image_file(x)]
        self.to_tensor = ToTensor()


    def __getitem__(self, index):
        input_image = load_img(self.image_filenames[index])
        x_re = floor((input_image.size[0] - 1) / 3 + 5)
        x = (x_re - 5) * 3 + 1
        if x != input_image.size[0]:
            x = floor(x)
        y_re = floor((input_image.size[1] - 1) / 3 + 5)
        y = (y_re - 5) * 3 + 1
        if y != input_image.size[1]:
            y = floor(y)

        self.crop = CenterCrop((x,y))
        input_image = self.crop(input_image)
        target = input_image.copy()
        target = self.to_tensor(target)

        self.resize = Resize((x_re, y_re))
        input_image = self.resize(input_image)
        input_image = self.to_tensor(input_image)

        return input_image, target

    def __len__(self):
        return len(self.image_filenames)

model.py

#!/usr/bin/python
# -- coding: utf-8 --

#模型的結構,跟FSRCNN中論文一致,原理可以參考博文:
#https://blog.csdn.net/gwplovekimi/article/details/83041627#FSRCNN%EF%BC%88Fast%20Super-Resolution%20Convolutional%20Neural%20Networks%EF%BC%89
import torch
import torch.nn as nn

class Net(torch.nn.Module):
    def __init__(self, num_channels, upscale_factor, d=56, s=12):
        super(Net, self).__init__()
        # Feature extraction
        self.conv1 = nn.Conv2d(in_channels=num_channels, out_channels=d, kernel_size=5, stride=1, padding=0)
        self.prelu1 = nn.PReLU()


        # Shrinking(收縮)
        self.conv2 = nn.Conv2d(in_channels=d, out_channels=s, kernel_size=1, stride=1, padding=0)
        self.prelu2 = nn.PReLU()
        
        # Non-linear Mapping
        self.conv3 = nn.Conv2d(in_channels=s, out_channels=s, kernel_size=3, stride=1, padding=1)
        self.prelu3 = nn.PReLU()
        self.conv4 = nn.Conv2d(in_channels=s, out_channels=s, kernel_size=3, stride=1, padding=1)
        self.prelu4 = nn.PReLU()
        self.conv5 = nn.Conv2d(in_channels=s, out_channels=s, kernel_size=3, stride=1, padding=1)
        self.prelu5 = nn.PReLU()
        self.conv6 = nn.Conv2d(in_channels=s, out_channels=s, kernel_size=3, stride=1, padding=1)
        self.prelu6 = nn.PReLU()
        # Expanding
        self.conv7 = nn.Conv2d(in_channels=s, out_channels=d, kernel_size=1, stride=1, padding=0)
        self.prelu7 = nn.PReLU()
        # Deconvolution(反捲積)
        self.last_part = nn.ConvTranspose2d(in_channels=d, out_channels=num_channels, kernel_size=9, stride=3, padding=4, output_padding=0)

    #前向傳播的過程
    def forward(self, x):#x為輸入資料
         out = self.prelu1(self.conv1(x))
         out = self.prelu2(self.conv2(out))
         out = self.prelu3(self.conv3(out))
         out = self.prelu4(self.conv4(out))
         out = self.prelu5(self.conv5(out))
         out = self.prelu6(self.conv6(out))
         out = self.prelu7(self.conv7(out))
         out = self.last_part(out)

         return out
     
     #權重的初始化
    def weights_init(self):
        #torch.nn.init.normal(tensor, mean=0, std=1)
        nn.init.normal_(self.conv1.weight, 0, 0.0378)#MSRA initialization
        
        nn.init.normal_(self.conv2.weight, 0, 0.3536)
        
        nn.init.normal_(self.conv3.weight, 0, 0.1179)
        nn.init.normal_(self.conv4.weight, 0, 0.1179)
        nn.init.normal_(self.conv5.weight, 0, 0.1179)
        nn.init.normal_(self.conv6.weight, 0, 0.1179)

        nn.init.normal_(self.conv7.weight, 0, 0.189)
        nn.init.normal_(self.last_part.weight, 0, 0.001)
        
        
        #偏置初始化為0
        nn.init.constant_(self.conv1.bias, 0)
        nn.init.constant_(self.conv2.bias, 0)
        nn.init.constant_(self.conv3.bias, 0)
        nn.init.constant_(self.conv4.bias, 0)
        nn.init.constant_(self.conv5.bias, 0)
        nn.init.constant_(self.conv6.bias, 0)
        nn.init.constant_(self.conv7.bias, 0)
        nn.init.constant_(self.last_part.bias, 0)
        

solver.py

#!/usr/bin/python
# -- coding: utf-8 --

from __future__ import print_function
from math import log10
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from FSRCNN.model import Net

from scipy.misc import imread, imresize, imsave#對應imread


from PIL import Image
import numpy as np
import scipy.io as sio
from torchvision.transforms import ToTensor
#from visual_loss import Visualizer
from torchnet import meter
from torch.autograd import Variable
import cv2
from tensorboardX import SummaryWriter#tensorboard
writer = SummaryWriter(log_dir='logs')

from misc import progress_bar

class FSRCNNTrainer(object):#object為args, training_data_loader, Set5_img_loader
    def __init__(self, config, training_loader,Set5_img_loader):
        super(FSRCNNTrainer, self).__init__()
        self.GPU_IN_USE = torch.cuda.is_available()#通過pytorch中的cuda操作,來看看是否有gpu
        self.device = torch.device('cuda' if self.GPU_IN_USE else 'cpu')#要麼採用gpu要麼採用cpu
        self.model = None
        self.lr = config.lr#學習速率,預設為0.001
        self.nEpochs = config.nEpochs
        self.criterion = None#損失函式
        self.optimizer = None#優化器物件Optimizer
        self.scheduler = None
        self.GPU = torch.cuda.is_available()
        self.seed = config.seed#隨機數種子,用於初始化
        self.upscale_factor = config.upscale_factor#放大倍數
        self.initial_para = []
        self.training_loader = training_loader#訓練集
        #self.Set5_img_loader = Set5_img_loader#測試集
        self.set5_img_loader= Set5_img_loader#測試集
        #self.set5_h5_loader = set5_h5_loader
        self.info = {'loss':0, 'PSNR for Set5':0}
        self.nEpochs = config.nEpochs
        
#         
    def get_parameter(self, model, bias=False):#從model中獲得引數
        '''
        return weights or bias by setting bias False or True
        '''
        modules_skipped = (nn.PReLU)#torch.nn.PReLU。FSRCNN採用PReLU函式
        for m in model.modules():#######################?????????????????????#########################
            if isinstance(m, nn.Conv2d):#如果屬於卷積層的引數
            #isinstance(object, classinfo)
            #如果引數object是classinfo的例項,或者object是classinfo類的子類的一個例項,返回True
            #如果object不是一個給定型別的的物件, 則返回結果總是False。
                if bias:
                    yield m.bias
                else:
                    yield m.weight
            
            elif isinstance(m, nn.ConvTranspose2d):#如果屬於反捲積層的引數
                if bias:
                    yield m.weight
                    yield m.bias
            elif isinstance(m, modules_skipped):#如果屬於啟用函式的引數
                continue
#            else:
#                raise ValueError('Unexpected module: %s' % str(m))
            
    def build_model(self):
        '''
        optim: Adam, loss: MSE , lr : stepLR
        '''
        #此句使得程式碼可以在cuda或gpu下執行皆可
        #https://ptorch.com/news/189.html
        self.model = Net(num_channels=1, upscale_factor=self.upscale_factor).to(self.device)#model為model.py裡面定義的模型
        print(self.model)#將模型得引數輸出        
        self.model.weights_init()#先進行權值的初始化
        
#        premodel = torch.load('FSRCNN_model_path_Adam.pth') ###finetune
#        premodel_dict = premodel.state_dict()
#        self.model.load_state_dict(premodel_dict)

        self.criterion = nn.MSELoss()#損失函式,計算MSE
        torch.manual_seed(self.seed)#為CPU設定種子用於生成隨機數,以使得結果是確定的 
        #若有GPU,則為GPU設定種子
        if self.GPU_IN_USE:
            torch.cuda.manual_seed(self.seed) #set random seed for current GPU
            cudnn.benchmark = True#在程式剛開始加這條語句可以提升一點訓練速度,沒什麼額外開銷,一般都會加。
            self.criterion.cuda()#用GPU加速
        
        #定義優化器
        #https://blog.csdn.net/kgzhang/article/details/77479737
        #class torch.optim.Adam(params, lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0)。此處輸入了待優化的引數與學習率
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
        ###self.optimizer = optim.Adam([ 
                ###{'params': self.get_parameter(self.model, bias=False)},
                ###{'params': self.get_parameter(self.model, bias=True),'lr': self.lr * 0.1}], lr=self.lr)  
#       set different learning rate for weighs bias

        #速率可變(學習速率的設定)
        #set different learning rate for weighs bias
#       self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=50, gamma=0.5)  # lr decay
        self.scheduler = optim.lr_scheduler.MultiStepLR(self.optimizer, milestones=[50, 75, 100], gamma=0.5)  # lr decay

                
                
    def save(self):#儲存這個模型(儲存權值)
        model_out_path = "FSRCNN_model_path_Adam1.pth"
        torch.save(self.model, model_out_path)
        print("Checkpoint saved to {}".format(model_out_path))

    #進行訓練
    def train(self):
        """
        data: [torch.cuda.FloatTensor], 4 batches: [64, 64, 64, 8]
        """
        self.model.train()
        train_loss = 0
#        count = 0
        for batch_num, (data, target) in enumerate(self.training_loader):##訓練集
        #enumerate() 函式用於將一個可遍歷的資料物件(如列表、元組或字串)組合為一個索引序列,同時列出資料和資料下標
            data, target = data.to(self.device), target.to(self.device)#轉換為cuda型別的資料。
            self.optimizer.zero_grad()#將初始梯度設定為0
            #https://blog.csdn.net/qq_34690929/article/details/79934843
                        
            loss = self.criterion(self.model(data), target)#計算損失值
            
            #loss_meter.reset()#reset value
            #loss_meter.add(loss.item())
            #vis.plot_many_stack({'train_loss': loss_meter.value()[0]})#
            
            train_loss += loss.item()#元素物件 例項 返回元素的第一個子節點
            loss.backward()
            self.optimizer.step()
            progress_bar(batch_num, len(self.training_loader), 'Loss: %.4f' % (train_loss / (batch_num + 1)))
        print("    Average Loss: {:.6f}".format(train_loss / len(self.training_loader)))

#######################################################################################3    
    #計算PSNR    
    def calpsnr(self, im1, im2):
        #print(im1.shape)
        #print(im2.shape)
        #exit()
        diff = np.abs(im1 - im2)
        rmse = np.sqrt(np.mean(np.square(diff)))
        psnr = 20*np.log10(255/rmse)
        return psnr

#################################################################################################33
    def test_set5_img(self):
        '''
        Get PSNR value for test set Set 5 images, and write to Tensorboards logs.
        :return:
        '''
        self.model.eval()
        avg_psnr = 0
        for batch_num, (data, target) in enumerate(self.set5_img_loader):
            target = target.numpy()
            
            
            #target = target[:, :, 6:target.shape[2] - 6, 6:target.shape[3] - 6](被這句程式碼害死了!!!!!!!!!!!!)
            # target = Variable(torch.from_numpy(target))
            #if self.GPU:
                #data, target = Variable(data).cuda(), Variable(torch.from_numpy(target)).cuda()
            #else:
                #data, target = Variable(data), Variable(torch.from_numpy(target))

            data = data.to(self.device)#########
            prediction = self.model(data)
            prediction = prediction.data.cpu().numpy()
            prediction = np.array(prediction)#########3
            prediction = prediction*255###########
            target = np.array(target)##########33
            # prediction = prediction[:, :, 6:prediction.shape[2] - 6, 6:prediction.shape[3] - 6]
            #if self.GPU:
                #prediction = Variable(torch.from_numpy(prediction)).cuda()
            #else:
                #prediction = Variable(torch.from_numpy(prediction))
            #print("prediction",prediction.shape)
            #print("target",target.shape)
            #exit()
            #mse = self.criterion(prediction, target) 
            psnr = self.calpsnr(prediction, target)###################################       
            #imsave('/home/guanwp/FSRCNN/myproject/result/result.jpg', prediction) 
            #imsave('/home/guanwp/FSRCNN/myproject/result/target.jpg', target) 
          
            #print("MSE=",mse.data[0])
            #psnr = 10 * log10(255 / mse.data[0])##################輸出PSNR
            ###################3psnr1=torch.from_numpy(psnr)
            ###############writer.add_graph(self.model,input_to_model=(psnr1,))
            print("PSNR=",psnr)
            avg_psnr += psnr
            progress_bar(batch_num, len(self.set5_img_loader), 'PSNR: %.4fdB' % (avg_psnr / (batch_num + 1)))

        self.info['PSNR for Set5'] = avg_psnr / len(self.set5_img_loader)
        a=len(self.set5_img_loader)#####################33
        print("a=",a)############3
        print("    Average PSNR: {:.4f} dB".format(avg_psnr / len(self.set5_img_loader)))
############################################################################################3

    def get_para(self):
        '''
        Return the pamameters in the model.
        :return:
        '''
        para = []
        for parameter in self.model.parameters():
            para.append(parameter.data.cpu().numpy())
        return para



    def run(self):
        self.build_model()
        
        self.initial_para = self.get_para()
        #global vis
        #vis = Visualizer(env='FSRCNN_example')#為了視覺化增加的內容
        global loss_meter, testimage_psnr
        loss_meter = meter.AverageValueMeter()#為了視覺化增加的內容
        testimage_psnr = meter.AverageValueMeter()#為了視覺化增加的內容

        for epoch in range(1, self.nEpochs + 1):
            print("\n===> Epoch {} starts:".format(epoch))
            self.scheduler.step(epoch)
            self.train()
            print('Testing Set5:')
            self.test_set5_img()#OK
            #self.test_set5_patch()
            #self.testset5image()#實時畫出測試曲線
            #self.testset5image111111()
            #self.test()
            

            if epoch == self.nEpochs:
                self.save()
            


執行前先開啟visdom

python -m visdom.server(開啟後出錯。。。。)

然後python gwptrain.py

(CUDA_VISIBLE_DEVICES=5 python gwptrain.py。選定GPU)

(檢視GPU狀態nvidia-smi)

執行結果:

第一次跑SR程式碼,似乎懂非懂得把程式啃下來,跑了300代,然後就沒有然後了。。。。接下來會繼續深入啃這個demo