# Copyright or © or Copr. IETR/INSA Rennes (2019)
#
# Contributors :
# Eduardo Fernandes-Montesuma eduardo.fernandes-montesuma@insa-rennes.fr (2019)
# Florian Lemarchand florian.lemarchand@insa-rennes.fr (2019)
#
#
# OpenDenoising is a computer program whose purpose is to benchmark image
# restoration algorithms.
#
# This software is governed by the CeCILL-C license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL-C
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL-C license and that you accept its terms.
import io
import os
import time
import keras
import torch
import shutil
import numpy as np
import tensorflow as tf
import PIL.Image as Image
from keras import backend
from datetime import datetime
from datetime import timedelta
from skimage.util import img_as_ubyte
from OpenDenoising.evaluation import module_logger
from OpenDenoising.data import normalize_batch, clip_batch
def timeit(method):
"""Timing decorator. Use this to get track of time spent on a callback.
Notes
-----
The timing messages have logging.DEBUG level.
Parameters
----------
method : function
Method or function to be timed.
Returns
-------
float
Time spent on function execution
"""
def timed(*args, **kwargs):
start = time.time()
result = method(*args, **kwargs)
finish = time.time()
module_logger.debug("Callback {} took {} seconds".format(method, timedelta(seconds=finish-start)))
return result
return timed
[docs]class LrSchedulerCallback(keras.callbacks.Callback):
"""Custom Learning Rate scheduler based on Keras. This class is mainly used for compatibility between TfModel,
PytorchModel and KerasModel. Please note that this class should not be used as a LearningRateScheduler. To specify
one, you need to use a class that inherits from LrSchedulerCallback.
"""
[docs] def __init__(self):
super(LrSchedulerCallback, self).__init__()
@timeit
def on_epoch_begin(self, epoch, logs=None):
# Set new learning rate on Keras model
backend.set_value(self.model.optimizer.lr, self(epoch))
[docs] def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
logs['LearningRate'] = backend.get_value(self.model.optimizer.lr)
[docs]class DnCNNSchedule(LrSchedulerCallback):
"""DnCNN learning rate decay scheduler as specified in the original paper.
After epoch 30, drops the initial learning rate by a factor of 10.
After epoch 60, drops the initial learning rate by a factor of 20.
Attributes
----------
initial_lr : float
Initial learning rate value.
"""
[docs] def __init__(self, initial_lr=1e-3):
self.initial_lr = initial_lr
@timeit
def __call__(self, epoch, logs=None):
"""Calculates the current Learning Rate.
Parameters
----------
epoch : int
Current epoch.
Returns
-------
lr : float
Learning rate value.
"""
if epoch <= 30:
lr = self.initial_lr
elif epoch <= 60:
lr = self.initial_lr / 10
elif epoch <= 80:
lr = self.initial_lr / 20
else:
lr = self.initial_lr / 20
return np.asarray(lr, dtype='float64')
[docs]class StepSchedule(LrSchedulerCallback):
"""Drops the learning rate at each 'dropEvery' iterations by a factor of 'factor'.
Attributes
----------
initial_lr : float
Initial Learning Rate.
factor : float
Decay factor.
dropEvery : int
The learning rate will be decayed periodically, where the period is defined by dropEvery.
"""
[docs] def __init__(self, initial_lr=1e-3, factor=0.5, dropEvery=10):
self.initial_lr = initial_lr
self.factor = factor
self.dropEvery = dropEvery
@timeit
def __call__(self, epoch, logs=None):
"""Calculates the current Learning Rate.
Parameters
----------
epoch : int
Current epoch.
Returns
-------
lr : float
Learning rate value.
"""
exp = (epoch + 1) // self.dropEvery
return np.asarray(self.initial_lr * (self.factor ** exp), dtype="float64")
[docs]class PolynomialSchedule(LrSchedulerCallback):
"""Drops the learning rate following a polynomial schedule:
.. math::
\\alpha = \\alpha_{0}\\biggr(1 - \dfrac{epoch}{maxEpochs}\\biggr)^{power}
Attributes
----------
initial_lr : float
Initial Learning Rate.
maxEpochs : int
At the end of maxEpochs, the learning_rate will be zero.
power : int
Polynomial power.
"""
[docs] def __init__(self, initial_lr=1e-3, maxEpochs=100, power=1.0):
self.initial_lr = initial_lr
self.maxEpochs = maxEpochs
self.power = power
@timeit
def __call__(self, epoch):
"""Calculates the current Learning Rate.
Parameters
----------
epoch : int
Current epoch.
Returns
-------
lr : float
Learning rate value.
"""
decay = (1 - (epoch / float(self.maxEpochs))) ** self.power
lr = self.initial_lr * decay
return np.asarray(lr, dtype="float64")
[docs]class ExponentialSchedule(LrSchedulerCallback):
"""Drops the learning rate following a exponential schedule:
.. math::
\\alpha = \\alpha_{0}\\times\\gamma^{epoch}
Attributes
----------
initial_lr : float
Initial Learning Rate.
factor : float
Rate at which the learning rate is decayed at each epoch.
"""
[docs] def __init__(self, initial_lr=1e-3, gamma=0.5):
self.initial_lr = initial_lr
self.gamma = gamma
@timeit
def __call__(self, epoch):
"""Calculates the current Learning Rate.
Parameters
----------
epoch : int
Current epoch.
Returns
-------
lr : float
Learning rate value.
"""
lr = self.initial_lr * (self.gamma ** epoch)
lr = np.asarray(lr, dtype="float64")
return lr
def make_image(image_tensor):
"""Convert image to Tensorboard image summary.
Parameters
----------
image_tensor : :class:`numpy.ndarray`
Image to be shown in Tensorboard.
Returns
-------
:tf.Tensor:
Tensor of type string, containing the serialized image Summary protocol buffer.
"""
h, w, c = image_tensor.shape
image = Image.fromarray(image_tensor.reshape(h, w))
output = io.BytesIO()
image.save(output, format='PNG')
image_string = output.getvalue()
output.close()
return tf.Summary.Image(height=h, width=w, colorspace=c,
encoded_image_string=image_string)
def stack_images(list_of_images):
"""Stacks noisy + clean + denoised images in the horizontal, then stacks the entire batch in the vertical
Parameters
----------
list_of_images : list
List of :class:`numpy.ndarray` objects.
Returns
-------
stacked : :class:`numpy.ndarray`
Numpy array corresponding to the stacked images.
"""
to_stack = []
# Stacks batch on the vertical
for image in list_of_images:
to_stack.append(np.vstack([image[i] for i in range(len(image))]))
# Stacks images on the horizontal
stacked = np.hstack(to_stack)
return stacked
[docs]class TensorboardImage(keras.callbacks.Callback):
"""Tensorboard Keras callback. At each epoch end, it plots on Tensorboard metrics/loss information on validation
data, as well as a denoising summary.
Attributes
----------
valid_generator : :class:`data.AbstractDatasetGenerator`
Image dataset generator. Provide validation data for model evaluation.
denoiser : :class:`model.AbstractDeepLearningModel`
Image denoising object.
folder_string : str
String containing the path to logging directory. Corresponds to denoiser's name.
"""
[docs] def __init__(self, valid_generator, denoiser, preprocess="clip"):
super().__init__()
# Folder string for tensorboard files
self.folder_string = str(denoiser)
# Checks if folder already exists
if not os.path.isdir(os.path.join(denoiser.logdir, self.folder_string)):
os.makedirs(os.path.join(denoiser.logdir, self.folder_string))
# Tensorboard objects
self.writer = tf.summary.FileWriter(
os.path.join(denoiser.logdir, self.folder_string)
)
self.writer.add_graph(keras.backend.get_session().graph)
# Auxiliary objects
self.seen = 0 # Number of seen epochs
self.denoiser = denoiser # Denoiser model
self.valid_generator = valid_generator
self.preprocess = preprocess
@timeit
def on_epoch_end(self, epoch, logs=None):
"""Method called at each epoch end. Evaluates the denoiser at validation data, then plots metrics/loss into
tensorboard. Shows denoising visual results on Tensorboard.
Parameters
----------
epoch : int
Current evaluation epoch.
logs : dict
Dictionary where the keys are metrics/loss names, and the values are the functions to be evaluated.
"""
self.seen += 1 # Updates intern epoch counter
if logs is not None:
# Tensorboard summaries:
for key in logs:
summary = tf.Summary(
value=[tf.Summary.Value(tag=key, simple_value=logs[key])]
)
self.writer.add_summary(summary, epoch)
i = np.random.randint(0, len(self.valid_generator))
img_noise, img_ref = self.valid_generator[i]
img_denoised = self.denoiser(img_noise)
# Converts 8 first samples to uint8
if self.preprocess == "normalize":
noised = img_as_ubyte(normalize_batch(img_noise))
original = img_as_ubyte(normalize_batch(img_ref))
denoised = img_as_ubyte(normalize_batch(img_denoised))
elif self.preprocess == "clip":
noised = img_as_ubyte(clip_batch(img_noise))
original = img_as_ubyte(clip_batch(img_ref))
denoised = img_as_ubyte(clip_batch(img_denoised))
else:
noised = img_as_ubyte(img_noise)
original = img_as_ubyte(img_ref)
denoised = img_as_ubyte(img_denoised)
# Print to tensorboard
img = make_image(stack_images([noised, original, denoised]))
summary_original = tf.Summary(
value=[tf.Summary.Value(tag="Denoising Result", image=img)]
)
self.writer.add_summary(summary_original, epoch)
def tf_saved_model(directory, session, input, is_training, output, epoch):
if len(os.listdir(directory)) > 0:
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
os.unlink(filepath)
elif os.path.isdir(filepath):
shutil.rmtree(filepath)
tf.saved_model.simple_save(session=session, export_dir=directory,
inputs={input.name.replace(":0", ""): input,
is_training.name.replace(":0", ""): is_training},
outputs={output.name.replace(":0", ""): output})
[docs]class CheckpointCallback(keras.callbacks.Callback):
"""Creates training checkpoints for Deep Learning models.
Attributes
----------
denoiser : :class:`model.AbstractDenoiser`
Denoiser object to be saved.
monitor : str
Name of the metric being tracked. If the name is not present on logs, tracks the loss value.
mode : str
String having one of these two values: {'max', 'min'}. If it is 'max', saves the model with the greater metric
value. If it is 'min', saves the model with the smaller metric value.
period : int
Saves models at uniform intervals specified by period.
logdir : str
String containing the path to the logs directory.
"""
[docs] def __init__(self, denoiser, monitor="loss", mode='max', period=1):
logpath = os.path.join(denoiser.logdir, str(denoiser))
try:
os.makedirs(logpath)
except FileExistsError:
module_logger.warning("Directory {} already exists. Nothing was done.".format(logpath))
try:
os.makedirs(os.path.join(logpath, "ModelFiles"))
except FileExistsError:
module_logger.warning("Directory {} already exists. Nothing was done.".format(os.path.join(logpath,
"ModelFiles")))
self.denoiser = denoiser
self.monitor = monitor
self.mode = mode
self.period = period
self.logpath = logpath
self.best_value = np.inf if mode == 'min' else -np.inf
@timeit
def on_epoch_end(self, epoch, logs):
"""Read logs, determine if the model should be saved based on period, best_value and mode."""
if epoch % self.period == 0:
try:
metric_val = logs[self.monitor]
except KeyError:
module_logger.warning("Monitor {} not present in dictionary logs with keys {}." \
" Falling into loss tracking.".format(self.monitor, list(logs.keys())))
self.mode = 'min'
self.best_value = np.inf
self.monitor = 'loss'
metric_val = logs[self.monitor]
if metric_val > self.best_value and self.mode == "max":
self.best_value = metric_val
self.__save_model(epoch)
elif metric_val < self.best_value and self.mode == "min":
self.best_value = metric_val
self.__save_model(epoch)
def __save_model(self, epoch):
"""Saves the denoiser model based on its class. """
model_class = self.denoiser.__class__.__name__
if model_class == "TfModel":
tf_saved_model(os.path.join(self.logpath, "ModelFiles"),
self.denoiser.tf_session,
self.denoiser.model_input,
self.denoiser.is_training,
self.denoiser.model_output,
epoch)
elif model_class == "KerasModel":
model_json = self.denoiser.model.to_json()
# Save model architecture in a JSON file
with open(os.path.join(self.logpath, "ModelFiles", str(self.denoiser) + ".json"), "w") as f:
f.write(model_json)
# Save model weights on .HDF5 file
self.denoiser.model.save_weights(
os.path.join(self.logpath, "ModelFiles", str(self.denoiser) + "_weights.hdf5")
)
elif model_class == "PytorchModel":
torch.save(self.denoiser.model, os.path.join(self.logpath, "ModelFiles", str(self.denoiser) + ".pth"))
else:
raise ValueError("Expected model to be either PytorchModel, TfModel or KerasModel, but got"\
"{}".format(model_class))