Source code for xrspatial.bilateral

"""Bilateral filter for feature-preserving smoothing.

Smooths a raster while preserving edges by weighting neighbors based on
both spatial distance and value similarity.

Reference
---------
Tomasi & Manduchi, "Bilateral Filtering for Gray and Color Images," ICCV 1998.
"""
from __future__ import annotations

from functools import partial
from math import ceil, exp, isnan

import numpy as np
import xarray as xr
from numba import cuda
from xarray import DataArray

try:
    import dask.array as da
except ImportError:
    da = None

try:
    import cupy
except ImportError:
    class cupy(object):
        ndarray = False

from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    _boundary_to_dask,
    _pad_array,
    _validate_boundary,
    _validate_raster,
    _validate_scalar,
    cuda_args,
    ngjit,
)
from xrspatial.dataset_support import supports_dataset


# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------

def _kernel_radius(sigma_spatial):
    """Derive kernel radius from sigma_spatial: ceil(2 * sigma)."""
    return int(ceil(2.0 * sigma_spatial))


# ---------------------------------------------------------------------------
# NumPy backend
# ---------------------------------------------------------------------------

@ngjit
def _bilateral_numpy(data, radius, sigma_spatial, sigma_range):
    rows, cols = data.shape
    out = np.empty_like(data)
    inv_2_ss = 1.0 / (2.0 * sigma_spatial * sigma_spatial)
    inv_2_sr = 1.0 / (2.0 * sigma_range * sigma_range)

    for y in range(rows):
        for x in range(cols):
            center = data[y, x]
            if isnan(center):
                out[y, x] = np.nan
                continue

            w_sum = 0.0
            v_sum = 0.0

            y0 = max(y - radius, 0)
            y1 = min(y + radius + 1, rows)
            x0 = max(x - radius, 0)
            x1 = min(x + radius + 1, cols)

            for ny in range(y0, y1):
                for nx in range(x0, x1):
                    val = data[ny, nx]
                    if isnan(val):
                        continue
                    dy = ny - y
                    dx = nx - x
                    dist2 = dy * dy + dx * dx
                    diff = val - center
                    range2 = diff * diff

                    w = exp(-dist2 * inv_2_ss - range2 * inv_2_sr)
                    w_sum += w
                    v_sum += w * val

            if w_sum > 0.0:
                out[y, x] = v_sum / w_sum
            else:
                out[y, x] = np.nan

    return out


def _bilateral_numpy_boundary(data, radius, sigma_spatial, sigma_range,
                              boundary='nan'):
    if boundary == 'nan':
        return _bilateral_numpy(data, radius, sigma_spatial, sigma_range)
    padded = _pad_array(data, radius, boundary)
    result = _bilateral_numpy(padded, radius, sigma_spatial, sigma_range)
    return result[radius:-radius, radius:-radius]


# ---------------------------------------------------------------------------
# Dask + NumPy backend
# ---------------------------------------------------------------------------

def _bilateral_dask_numpy(data, radius, sigma_spatial, sigma_range,
                          boundary='nan'):
    _func = partial(
        _bilateral_numpy, radius=radius,
        sigma_spatial=sigma_spatial, sigma_range=sigma_range,
    )
    out = data.map_overlap(
        _func,
        depth=(radius, radius),
        boundary=_boundary_to_dask(boundary),
        meta=np.array(()),
    )
    return out


# ---------------------------------------------------------------------------
# CuPy (GPU) backend
# ---------------------------------------------------------------------------

@cuda.jit
def _bilateral_gpu(data, radius, inv_2_ss, inv_2_sr, out):
    x, y = cuda.grid(2)
    rows, cols = data.shape

    if 0 <= x < cols and 0 <= y < rows:
        center = data[y, x]
        if isnan(center):
            out[y, x] = center
            return

        w_sum = 0.0
        v_sum = 0.0

        y0 = max(y - radius, 0)
        y1 = min(y + radius + 1, rows)
        x0 = max(x - radius, 0)
        x1 = min(x + radius + 1, cols)

        for ny in range(y0, y1):
            for nx in range(x0, x1):
                val = data[ny, nx]
                if not isnan(val):
                    dy = ny - y
                    dx = nx - x
                    dist2 = dy * dy + dx * dx
                    diff = val - center
                    range2 = diff * diff
                    w = exp(-dist2 * inv_2_ss - range2 * inv_2_sr)
                    w_sum += w
                    v_sum += w * val

        if w_sum > 0.0:
            out[y, x] = v_sum / w_sum
        else:
            out[y, x] = center


def _bilateral_cupy(data, radius, sigma_spatial, sigma_range):
    data_cu = cupy.asarray(data, dtype=cupy.float64)
    inv_2_ss = 1.0 / (2.0 * sigma_spatial * sigma_spatial)
    inv_2_sr = 1.0 / (2.0 * sigma_range * sigma_range)

    griddim, blockdim = cuda_args(data_cu.shape)
    out = cupy.empty_like(data_cu)

    _bilateral_gpu[griddim, blockdim](
        data_cu, radius, inv_2_ss, inv_2_sr, out,
    )
    return out


# ---------------------------------------------------------------------------
# Dask + CuPy backend
# ---------------------------------------------------------------------------

def _bilateral_dask_cupy(data, radius, sigma_spatial, sigma_range,
                         boundary='nan'):
    _func = partial(
        _bilateral_cupy, radius=radius,
        sigma_spatial=sigma_spatial, sigma_range=sigma_range,
    )
    out = data.map_overlap(
        _func,
        depth=(radius, radius),
        boundary=_boundary_to_dask(boundary, is_cupy=True),
        meta=cupy.array(()),
    )
    return out


# ---------------------------------------------------------------------------
# dispatcher
# ---------------------------------------------------------------------------

def _bilateral(data, radius, sigma_spatial, sigma_range, boundary='nan'):
    agg = xr.DataArray(data)
    mapper = ArrayTypeFunctionMapping(
        numpy_func=partial(
            _bilateral_numpy_boundary,
            boundary=boundary,
        ),
        cupy_func=_bilateral_cupy,
        dask_func=partial(
            _bilateral_dask_numpy,
            boundary=boundary,
        ),
        dask_cupy_func=partial(
            _bilateral_dask_cupy,
            boundary=boundary,
        ),
    )
    out = mapper(agg)(
        agg.data,
        radius=radius,
        sigma_spatial=sigma_spatial,
        sigma_range=sigma_range,
    )
    return out


# ---------------------------------------------------------------------------
# public API
# ---------------------------------------------------------------------------

[docs] @supports_dataset def bilateral(agg, sigma_spatial=1.0, sigma_range=10.0, name='bilateral', boundary='nan'): """Apply a bilateral filter for feature-preserving smoothing. Smooths a raster while preserving edges. Each pixel is replaced by a weighted average of its neighbours, where the weight depends on both the spatial distance and the value difference between the neighbour and the center pixel. Neighbours that are far away *or* very different in value contribute little, so edges stay sharp. Parameters ---------- agg : xarray.DataArray or xr.Dataset 2D (or 3D multi-band) array of input values. Supports NumPy, CuPy, Dask+NumPy, and Dask+CuPy backends. sigma_spatial : float, default 1.0 Standard deviation of the spatial Gaussian. Controls the size of the neighbourhood: kernel radius = ceil(2 * sigma_spatial). Must be at least ``sqrt(np.finfo(float64).tiny)`` (~1.49e-154) so that ``2 * sigma_spatial**2`` does not underflow to a denormal and turn the Gaussian weight into NaN. sigma_range : float, default 10.0 Standard deviation of the range (value-similarity) Gaussian. Larger values allow more smoothing across value differences; smaller values preserve more edges. Same lower bound as ``sigma_spatial``. name : str, default 'bilateral' Name for the output DataArray. boundary : str, default 'nan' How to handle edges where the kernel extends beyond the raster. ``'nan'`` -- fill missing neighbours with NaN (default). ``'nearest'`` -- repeat edge values. ``'reflect'`` -- mirror at boundary. ``'wrap'`` -- periodic / toroidal. Returns ------- out : xarray.DataArray or xr.Dataset Filtered array of the same shape, dtype, dims, and coords as the input. Examples -------- .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial import bilateral >>> data = np.array([ ... [0., 0., 0., 100., 100.], ... [0., 0., 0., 100., 100.], ... [0., 0., 0., 100., 100.], ... [0., 0., 0., 100., 100.], ... [0., 0., 0., 100., 100.]]) >>> raster = xr.DataArray(data) >>> smoothed = bilateral(raster, sigma_spatial=1.0, sigma_range=5.0) References ---------- Tomasi & Manduchi, "Bilateral Filtering for Gray and Color Images," ICCV 1998. """ _validate_raster(agg, func_name='bilateral', name='agg', ndim=(2, 3)) # Lower bound on each sigma so 2*sigma**2 stays a normal float64. Below # sqrt(tiny) the squared term denormalises to zero and 1/(2*sigma**2) # becomes +inf (or raises ZeroDivisionError under numba), and exp(-d * # inf) -> 0*inf -> NaN propagates through the whole output. _sigma_min = float(np.sqrt(np.finfo(np.float64).tiny)) _validate_scalar(sigma_spatial, func_name='bilateral', name='sigma_spatial', dtype=(int, float), min_val=_sigma_min) _validate_scalar(sigma_range, func_name='bilateral', name='sigma_range', dtype=(int, float), min_val=_sigma_min) _validate_boundary(boundary) sigma_spatial = float(sigma_spatial) sigma_range = float(sigma_range) radius = _kernel_radius(sigma_spatial) # Clamp the kernel radius to the raster extent. A radius larger than # max(rows, cols) already covers the whole raster, so the filter # output is unchanged by letting it grow further. Without this clamp # an oversized sigma_spatial drives a quadratic allocation in # _pad_array (when boundary != 'nan') and a quadratic overlap depth # in the dask paths, so one float from the caller can DoS the host # (e.g. sigma_spatial=1e9 -> radius=2e9 -> exabyte-scale padding). # The numba/CUDA inner loops are already clamped to rows/cols. rows, cols = agg.shape[-2], agg.shape[-1] max_radius = max(rows, cols) if radius > max_radius: radius = max_radius if agg.ndim == 3: from xrspatial.focal import _apply_per_band return _apply_per_band( bilateral, agg, sigma_spatial=sigma_spatial, sigma_range=sigma_range, name=name, boundary=boundary, ) out = _bilateral( agg.data.astype(float), radius, sigma_spatial, sigma_range, boundary, ) return DataArray( out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs, )