Source code for data.utils.preprocessing

# Copyright or © or Copr. IETR/INSA Rennes (2019)
# Contributors :
#     Eduardo Fernandes-Montesuma (2019)
#     Florian Lemarchand (2019)
# OpenDenoising is a computer program whose purpose is to benchmark image
# restoration algorithms.
# This software is governed by the CeCILL-C license under French law and
# abiding by the rules of distribution of free software. You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL-C
# license as circulated by CEA, CNRS and INRIA at the following URL
# "".
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author, the holder of the
# economic rights, and the successive licensors have only  limited
# liability.
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL-C license and that you accept its terms.

import numpy as np
from functools import partial
from import module_logger

def clip_batch(inp, ref=None, minrange=0, maxrange=1):
    """Clips a batch of images, so that all pixels with values less than minrange are set to minrange, and all above
    maxrange at set to maxrange.

    inp : :class:`numpy.ndarray`
        4D Batch of images with shape (batch_size, height, width, channels)
    ref : :class:`numpy.ndarray`
        4D Batch of images with shape (batch_size, height, width, channels)
    minrange : float
        Lower bound of pixel interval.
    maxrange : float
        Upper bound of pixel interval.
    if ref is not None:
        return np.clip(inp, minrange, maxrange), np.clip(ref, minrange, maxrange)
        return np.clip(inp, minrange, maxrange)

def normalize_batch(inp, ref=None, minrange=0, maxrange=1):
    """Normalizes each pixel of a 4D batch of images in the range [minrange, maxrange].

    inp : :class:`numpy.ndarray`
        4D Batch of images with shape (batch_size, height, width, channels)
    minrange : float
        Lower bound of pixel interval.
    maxrange : float
        Upper bound of pixel interval.
    n = len(inp)
    _inp = np.zeros(inp.shape)
    _ref = np.zeros(inp.shape) if ref is not None else None

    for i in range(n):
        _inp[i] = (inp[i] - np.amin(inp[i])) / (np.amax(inp[i]) - np.amin(inp[i]))
        if ref is not None:
            _ref[i] = (ref[i] - np.amin(ref[i])) / (np.amax(ref[i]) - np.amin(ref[i]))

    if ref is not None:
        return (maxrange - minrange) * _inp + minrange, (maxrange - minrange) * _ref + minrange
        return (maxrange - minrange) * _inp + minrange

[docs]def dncnn_augmentation(inp, ref=None, aug_times=1, channels_first=False): """Data augmentation policy employed on DnCNN Parameters ---------- inp : :class:`numpy.ndarray` Noised image. ref : :class:`numpy.ndarray` Ground-truth images. aug_times : int Number of times augmentation if applied. Returns ------- inp : :class:`numpy.ndarray` Augmented noised images ref : :class:`numpy.ndarray` Augmented ground-truth images """ _inp = inp.copy() _ref = ref.copy() if ref is not None else None inp_aug = None ref_aug = None if inp.ndim == 4: axes = (1, 2) elif inp.ndim == 3: axes = (0, 1) else: raise ValueError("Expected 3D or 4D array, but got {}".format(inp.ndim)) for _ in range(aug_times): mode = np.random.randint(0, 7) if mode == 0: # No augmentation inp_aug = inp.copy() if ref is not None: ref_aug = ref.copy() elif mode == 1: # Flip image over the x axis (up-down) inp_aug = np.flipud(inp) if ref is not None: ref_aug = np.flipud(ref) elif mode == 2: # Rotate the image by 90 degrees inp_aug = np.rot90(inp, 1, axes=axes) if ref is not None: ref_aug = np.rot90(ref, 1, axes=axes) elif mode == 3: # Flip the image over the x axis, then rotate it by 90 degrees inp_aug = np.flipud(np.rot90(inp, 1, axes=axes)) if ref is not None: ref_aug = np.flipud(np.rot90(ref, 1, axes=axes)) elif mode == 4: # Rotate the image by 180 degrees inp_aug = np.flipud(np.rot90(inp, 1, axes=axes)) if ref is not None: ref_aug = np.flipud(np.rot90(ref, 1, axes=axes)) elif mode == 5: # Flip the image over the x axis, then rotate it by 180 degrees inp_aug = np.flipud(np.rot90(inp, 2, axes=axes)) if ref is not None: ref_aug = np.flipud(np.rot90(ref, 2, axes=axes)) elif mode == 6: # Rotate the image by 270 degrees inp_aug = np.rot90(inp, 3, axes=axes) if ref is not None: ref_aug = np.rot90(ref, 3, axes=axes) elif mode == 7: # Flip the image over the x axis, then rotate it by 270 degrees inp_aug = np.flipud(np.rot90(inp, 3, axes=axes)) if ref is not None: ref_aug = np.flipud(np.rot90(ref, 3, axes=axes)) if inp_aug is not None: # Augmentation was performed inp = np.concatenate([inp, inp_aug], axis=0) if ref is not None and ref_aug is not None: # Reference was passed ref = np.concatenate([ref, ref_aug], axis=0) if ref is not None: # Passed input and reference images assert inp.shape[1:] == _inp.shape[1:], "Data Augmentation changed input shape: " \ "before {}, after {}".format(_inp.shape[1:], inp.shape[1:]) assert ref.shape[1:] == _ref.shape[1:], "Data Augmentation changed reference shape: " \ "before {}, after {}".format(_inp.shape[1:], inp.shape[1:]) return inp, ref else: # Passed only input image assert inp.shape[1:] == _inp.shape[1:], "Data Augmentation changed input shape: " \ "before {}, after {}".format(_inp.shape[1:], inp.shape[1:]) return inp
[docs]def gen_patches(inp, ref, patch_size, channels_first=False, mode="sequential", n_patches=-1): """Patch generation function. Parameters ---------- inp : :class:`numpy.ndarray` Noised image which patches will be extracted. ref : :class:`numpy.ndarray` Reference image which patches will be extracted. patch_size : int Size of patch window (number of pixels in each axis). channels_first : bool Whether data is formatted as NCHW (True) or NHWC (False). mode : str One between {'sequential', 'random'}. If mode = 'sequential', extracts patches sequentially on each axis. If mode = 'random', extracts patches randomly. n_patches : int Number of patches to be extracted from the image. Should be specified only if mode = 'random'. If not specified, then, .. math:: n\_patches = \dfrac{h \\times w}{patch_{size}^{2}} Returns ------- input_patches : :class:`numpy.ndarray` Extracted input patches. reference_patches : :class:`numpy.ndarray` Extracted reference patches. """ assert mode in ["random", "sequential"], "Expected mode to be 'random' or 'sequential' but got {}".format(mode) _inp = inp.copy() _ref = ref.copy() if channels_first: _, h, w = _inp.shape else: h, w = _inp.shape[:2] if mode == "random" and n_patches == -1: n_patches = h * w / (patch_size ** 2) inp_patches = [] ref_patches = [] if mode == "sequential": for i in range(0, h - patch_size + 1, patch_size): for j in range(0, w - patch_size + 1, patch_size): if channels_first: inp_patches.append(_inp[:, i: i + patch_size, j: j + patch_size]) ref_patches.append(_ref[:, i: i + patch_size, j: j + patch_size]) else: inp_patches.append(_inp[i: i + patch_size, j: j + patch_size, :]) ref_patches.append(_ref[i: i + patch_size, j: j + patch_size, :]) if mode == "random": for _ in range(n_patches): i = np.random.randint(0, h - patch_size + 1) j = np.random.randint(0, h - patch_size + 1) if channels_first: inp_patches.append(_inp[:, i: i + patch_size, j: j + patch_size]) ref_patches.append(_ref[:, i: i + patch_size, j: j + patch_size]) else: inp_patches.append(_inp[i: i + patch_size, j: j + patch_size, :]) ref_patches.append(_ref[i: i + patch_size, j: j + patch_size, :]) input_patches = np.array(inp_patches) reference_patches = np.array(ref_patches) return input_patches, reference_patches
def smooth_patches(img, d=64, h=32, sg=32, sl=16, mu=0.1, gamma=0.25): """Extract smooth patches for GCBD algorithm. Parameters ---------- img : :class:`numpy.ndarray` Noised image. d : int Global patch size. h : int Local patch size. sg : int Global stride. sl : int Local stride. mu : float mean-smoothing hyper-parameter. gamma : float Variance-smoothing hyper-parameter. Returns ------- patches : :class:`numpy.ndarray` Extracted patches from img. """ height, width = img.shape patches = [] for i in range(0, height - d, sg): for j in range(0, width - d, sg): # Run for global h x h patches p wg = img[i: i + d, j: j + d] # Patch mean and variance mean_g = np.mean(wg) var_g = np.var(wg) # Initializes smooth verifier smooth = True for k in range(i, i + d - h, sl): for l in range(j, j + d - h, sl): # Run for local h x h patches q wl = img[k: k + h, l: l + h] module_logger.debug("image shape {}, {}:{}, {}:{}, local patch shape: {}".format(img.shape, k, k + h, l, l + h, wl.shape)) # Local mean and variance mean_l = np.mean(wl) var_l = np.var(wl) # Difference between local and global means/variances mean_diff = np.abs(mean_g - mean_l) var_diff = np.abs(var_g - var_l) if mean_diff > mu * mean_l or var_diff > gamma * var_l: module_logger.debug("{}\tmean_g: {}, mean_l: {}, mean_diff: {}, bound: {}".format([i, j, k, l], mean_g, mean_l, mean_diff, mu * mean_l)) module_logger.debug("{}\tvar_g: {}, var_l: {}, var_diff: {}, bound: {}\n".format([i, j, k, l], var_g, var_l, var_diff, gamma * var_l)) # If constraints not met for a local patch, then global patch # is not smooth => smooth verifier becomes false smooth = False if smooth: # If smooth enough, then extracts the noise patch through # noise = patch - mean(patch) patches.append(wg - np.mean(wg)) return patches def rand_tuple_2D(box_size): while True: yield (np.random.rand() * box_size, np.random.rand() * box_size) def get_stratified_coords(coord_gen, box_size, shape): coords = [] box_count_y = int(np.ceil(shape[0] / box_size)) box_count_x = int(np.ceil(shape[1] / box_size)) for i in range(box_count_y): for j in range(box_count_x): y, x = next(coord_gen) y = int(i * box_size + y) x = int(j * box_size + x) if (y < shape[0] and x < shape[1]): coords.append(y, x) return coords def n2v_generate_targets(inp, num_pix=1): _inp = inp.copy() b, h, w, c = inp.shape boxsize = np.round(np.sqrt(h * w / num_pix)).astype('int') coordinate_generator = rand_tuple_2D(boxsize) ref = np.zeros([b, h, w, 2 * c]) # [ref | mask] inp = np.zeros([b, h, w, c]) for channel in range(c): coords = get_stratified_coords(coordinate_generator, box_size=boxsize, shape=(h, w))