Source code for model.pytorch_model

# Copyright or © or Copr. IETR/INSA Rennes (2019)
# 
# Contributors :
#     Eduardo Fernandes-Montesuma eduardo.fernandes-montesuma@insa-rennes.fr (2019)
#     Florian Lemarchand florian.lemarchand@insa-rennes.fr (2019)
# 
# 
# OpenDenoising is a computer program whose purpose is to benchmark image
# restoration algorithms.
# 
# This software is governed by the CeCILL-C license under French law and
# abiding by the rules of distribution of free software. You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL-C
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
# 
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author, the holder of the
# economic rights, and the successive licensors have only  limited
# liability.
# 
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
# 
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL-C license and that you accept its terms.


import os
import time
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim

from tqdm import tqdm
from datetime import timedelta
from OpenDenoising.model import module_logger
from OpenDenoising.model import AbstractDeepLearningModel


def change_pgbar_desc(pgbar, log, train=True):
    if train:
        string = "[Train {}/{}, Best: {}] ".format(log["epoch"], log["n_epochs"], log["best_epoch"])
        string += "Loss: {0:.4f}, Metrics: ".format(log["loss_val"])
        for metric in log["metrics_val"]:
            string += "{0:.4f} ".format(metric)
        string += " Elapsed Time: {}".format(timedelta(seconds=log["finish_time"] - log["start_time"]))
    else:
        string = "[Valid {}/{}] ".format(log["epoch"], log["n_epochs"])
        string += "Loss: {0:.4f}, Metrics: ".format(log["loss_val"])
        for metric in log["metrics_val"]:
            string += "{0:.4f} ".format(metric)
        string += " Elapsed Time: {}".format(timedelta(seconds=log["finish_time"] - log["start_time"]))
    pgbar.set_description(string)


[docs]class PytorchModel(AbstractDeepLearningModel): """Pytorch wrapper class. See Also -------- :class:`model.AbstractDenoiser` : for the basic functionalities of Image Denoisers. :class:`model.AbstractDeepLearningModel` : for the basic functionalities of Deep Learning based Denoisers. """
[docs] def __init__(self, model_name="DeepLearningModel", logdir="./logs/Pytorch", return_diff=False): super().__init__(model_name, logdir, framework="Pytorch", return_diff=return_diff)
[docs] def charge_model(self, model_function=None, model_path=None, **kwargs): """Pytorch model charging function. You can charge a model either by specifying a class that implements the network architecture (passing it through model_function) or by specifying the path to a .pt or .pth file. If you class constructor accepts optional arguments, you can specify these by using Keyword arguments. Parameters ---------- model_function : :class:`torch.nn.Module` Pytorch network Class implementing the network architecture. model_path : str String containing the path to a .pt or .pth file. Examples -------- Loading Pytorch DnCNN from class. Notice that in our implementation, depth is an optional argument. >>> from OpenDenoising import model >>> mymodel = model.PytorchModel(model_name="mymodel") >>> mymodel.charge_model(model_function=model.architectures.pytorch.DnCNN, depth=17) Loading Pytorch DnCNN from a file. >>> from OpenDenoising import model >>> mymodel = model.PytorchModel(model_name="mymodel") >>> mymodel.charge_model(model_path=PATH) """ assert (model_function is not None or model_path is not None), "You should provide at least a model_function\ or a model_path to build your neural network\ model" if model_path is not None: filename, extension = os.path.splitext(model_path) if extension == '.pt' or extension == '.pth': module_logger.info("Loading Pytorch model from {}".format(model_path)) self.model = torch.load(model_path) else: raise ValueError("Invalid file extension. Expected .pt or .pth, but got {}".format(extension)) elif model_function is not None: self.model = model_function(**kwargs) module_logger.warning("You have loaded your model from a python function, which does not hold any " "information about weight values. Be sure to train the network before running" " your tests.") if torch.cuda.is_available(): self.model = self.model.cuda()
[docs] def train(self, train_generator, valid_generator=None, n_epochs=250, n_stages=500, learning_rate=1e-3, metrics=None, optimizer_name=None, kcallbacks=None, loss=None, valid_steps=10,): """Trains a Pytorch model. Parameters ---------- train_generator : data.AbstractDataset dataset object inheriting from AbstractDataset class. It is a generator object that yields data from train dataset folder. valid_generator : data.AbstractDataset dataset object inheriting from AbstractDataset class. It is a generator object that yields data from valid dataset folder. n_epochs : int number of training epochs. n_stages : int number of batches seen per epoch. learning_rate : float constant multiplication constant for optimizer or initial value for training with dynamic learning rate (see callbacks) metrics : list List of metric functions. These functions should have two inputs, two instances of :class:`numpy.ndarray`. It outputs a float corresponding to the metric computed on those two arrays. For more information, take a look on the Benchmarking module. optimizer_name : str Name of optimizer to use. Check Pytorch documentation for a complete list. kcallbacks : list List of custom_callbacks. loss : :class:`torch.nn.modules.loss` Pytorch loss function. valid_steps : int If valid_generator was specified, valid_steps determines the number of valid batches that will be seen per validation run. """ do_valid = bool(valid_generator) """ Optimizer """ if optimizer_name is None: # Checks if optimizer was given optimizer_name = "Adam" optimizer = getattr(optim, optimizer_name)(self.model.parameters(), lr=learning_rate) """ Callbacks """ if kcallbacks is None: kcallbacks = [] """ Loss """ if loss is None: loss = nn.MSELoss(reduction="sum") min_val_loss = np.inf best_epoch = None start = time.time() log_dict = {"n_epochs": n_epochs} for i in range(int(n_epochs)): pgbar = tqdm(range(n_stages), ncols=150, ascii=True) epoch_start = time.time() log_dict["start_time"] = epoch_start log_dict["epoch"] = i log_dict["best_epoch"] = best_epoch callback_logs = dict() callback_logs["LearningRate"] = learning_rate for _ in pgbar: x_numpy, y_numpy = next(train_generator) if x_numpy.shape[1] not in [1, 3]: x_numpy = np.transpose(x_numpy, [0, 3, 1, 2]) if y_numpy.shape[1] not in [1, 3]: y_numpy = np.transpose(y_numpy, [0, 3, 1, 2]) # Numpy array to Tensor x_tensor = torch.from_numpy(x_numpy).float() y_tensor = torch.from_numpy(y_numpy).float() if torch.cuda.is_available(): # Pass Tensors to GPU x_tensor = x_tensor.cuda() y_tensor = y_tensor.cuda() # Makes prediction based on inputs y_pred_tensor = self.model(x_tensor) # Compute loss loss_tensor = loss(y_pred_tensor, y_tensor) # Tensor to scalar log_dict["loss_val"] = loss_tensor.item() # Computes metrics if torch.cuda.is_available(): # Pass prediction to CPU y_pred_tensor = y_pred_tensor.cpu() y_pred_numpy = y_pred_tensor.detach().numpy() metrics_v = [] for metric in metrics: metrics_v.append(metric(y_numpy, y_pred_numpy)) log_dict["metrics_val"] = metrics_v log_dict["finish_time"] = time.time() change_pgbar_desc(pgbar, log_dict) # Reset gradients optimizer.zero_grad() # Computes gradients loss_tensor.backward() # Performs optimization optimizer.step() callback_logs['loss'] = loss_tensor.item() for metric_v, metric in zip(metrics_v, metrics): callback_logs[metric.__name__] = metric_v if do_valid: loss_m = 0 metrics_m = np.zeros((len(metrics))) pgbar = tqdm(range(valid_steps), ncols=150, ascii=True) eval_start = time.time() log_dict["start_time"] = eval_start for _ in pgbar: x_numpy, y_numpy = next(train_generator) if x_numpy.shape[1] not in [1, 3]: x_numpy = np.transpose(x_numpy, [0, 3, 1, 2]) if y_numpy.shape[1] not in [1, 3]: y_numpy = np.transpose(y_numpy, [0, 3, 1, 2]) # Numpy array to Tensor x_tensor = torch.from_numpy(x_numpy).float() y_tensor = torch.from_numpy(y_numpy).float() if torch.cuda.is_available(): # Pass prediction to CPU x_tensor = x_tensor.cuda() y_tensor = y_tensor.cuda() # Makes prediction based on inputs y_pred_tensor = self.model(x_tensor) # Compute loss loss_tensor = loss(y_pred_tensor, y_tensor) # Tensor to scalar log_dict["loss_val"] = loss_tensor.item() # Computes metrics if torch.cuda.is_available(): # Pass prediction to CPU y_pred_tensor = y_pred_tensor.cpu() y_pred_numpy = y_pred_tensor.detach().numpy() metrics_v = [] for metric in metrics: metrics_v.append(metric(y_numpy, y_pred_numpy)) log_dict["metrics_val"] = metrics_v log_dict["finish_time"] = time.time() change_pgbar_desc(pgbar, log_dict, train=False) loss_m = loss_m + loss_tensor.item() / valid_steps metrics_m = metrics_m + np.array(metrics_v) / valid_steps callback_logs['val_loss'] = loss_m for metric_m, metric in zip(metrics_m, metrics): callback_logs["val_" + metric.__name__] = metric_m else: loss_m = loss_tensor.item() if loss_m < min_val_loss: min_val_loss = loss_m best_epoch = i """ Calls on_epoch_end on callbacks and datasets. """ train_generator.on_epoch_end() if valid_generator is not None: valid_generator.on_epoch_end() for callback in kcallbacks: if "schedule" in callback.__class__.__name__.lower(): # Calling learning rate scheduler. learning_rate = callback(i) for param_group in optimizer.param_groups: param_group['lr'] = learning_rate else: # Calling other callbacks (such as Tensorboard). callback.on_epoch_end(i, logs=callback_logs)
[docs] def __call__(self, image): """Denoises a batch of images. Parameters ---------- image: :class:`numpy.ndarray` 4D batch of noised images. It has shape: (batch_size, height, width, channels) Returns ------- :class:`numpy.ndarray`: Restored batch of images, with same shape as the input. """ channels_first = True if image.shape[1] not in [1, 3]: # Pytorch only accepts NCHW input arrays. # If dim1 is not 1 or 3 (channel), transposes the array. channels_first = False image = np.transpose(image, [0, 3, 1, 2]) image_tensor = torch.from_numpy(image).float() if torch.cuda.is_available(): image_tensor = image_tensor.cuda() denoised_tensor = self.model(image_tensor) if torch.cuda.is_available(): denoised_tensor = denoised_tensor.cpu() denoised_numpy = denoised_tensor.detach().numpy() if not channels_first: # Transforms to NHWC. # Note: output dimension should agree with input dimension. denoised_numpy = np.transpose(denoised_numpy, [0, 2, 3, 1]) if self.return_diff: return image - denoised_numpy else: return denoised_numpy
[docs] def __len__(self): """Counts the number of parameters in the network. Returns ------- nparams : int Number of parameters in the network. """ n_params = 0 for param in self.model.parameters(): var_params = 1 for dim in list(param.shape): var_params *= dim n_params += var_params return n_params