Source code for xrspatial.morphology

"""Morphological raster operators.

Applies grayscale morphological operations using a structuring element
(kernel) over a 2D raster.  Erosion computes the local minimum and
dilation the local maximum within the kernel footprint.  Opening
(erosion then dilation) removes small bright features; closing
(dilation then erosion) fills small dark gaps.  Gradient, white
top-hat, and black top-hat are derived by subtracting pairs of the
above.

Supports all four backends: numpy, cupy, dask+numpy, dask+cupy.
"""

from __future__ import annotations

from functools import partial

import numpy as np
import xarray as xr
from xarray import DataArray

from numba import cuda, prange

try:
    import cupy
except ImportError:
    class cupy:
        ndarray = False

try:
    import dask.array as da
except ImportError:
    da = None

from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    _boundary_to_dask,
    _pad_array,
    _validate_boundary,
    _validate_raster,
    calc_cuda_dims,
    has_cuda_and_cupy,
    ngjit,
    not_implemented_func,
)
from xrspatial.dataset_support import supports_dataset


# ---------------------------------------------------------------------------
# Memory guard
# ---------------------------------------------------------------------------

def _available_memory_bytes():
    """Best-effort estimate of available memory in bytes."""
    # Try /proc/meminfo (Linux)
    try:
        with open('/proc/meminfo', 'r') as f:
            for line in f:
                if line.startswith('MemAvailable:'):
                    return int(line.split()[1]) * 1024
    except (OSError, ValueError, IndexError):
        pass
    # Try psutil
    try:
        import psutil
        return psutil.virtual_memory().available
    except (ImportError, AttributeError):
        pass
    # Fallback: 2 GB
    return 2 * 1024 ** 3


def _check_kernel_memory(rows, cols, ky, kx, func_name):
    """Raise MemoryError if the padded float64 array would exceed 50% of RAM.

    Each morphology backend allocates a padded copy of the input of shape
    ``(rows + ky - 1, cols + kx - 1)`` as float64, plus an output of shape
    ``(rows, cols)``.  Budget ~16 bytes per padded cell to cover both
    allocations plus small temporaries.
    """
    padded_rows = rows + ky - 1
    padded_cols = cols + kx - 1
    required = padded_rows * padded_cols * 16
    available = _available_memory_bytes()
    if required > 0.5 * available:
        raise MemoryError(
            f"{func_name}(): kernel shape ({ky}, {kx}) on a "
            f"({rows}, {cols}) raster needs ~{required / 1e9:.1f} GB "
            f"of padded float64 memory, but only "
            f"{available / 1e9:.1f} GB is available. "
            f"Use a smaller kernel."
        )


# ---------------------------------------------------------------------------
# Kernel construction
# ---------------------------------------------------------------------------

[docs] def _circle_kernel(radius): """Return a boolean 2D array for a circular structuring element. The kernel has shape ``(2*radius+1, 2*radius+1)`` with ``True`` for cells whose centre is within *radius* cells of the centre. """ size = 2 * radius + 1 y, x = np.ogrid[-radius:radius + 1, -radius:radius + 1] return (x * x + y * y <= radius * radius).astype(np.uint8)
def _validate_kernel(kernel, func_name): """Ensure *kernel* is a 2D uint8 numpy array with odd dimensions.""" if not isinstance(kernel, np.ndarray): raise TypeError( f"{func_name}(): `kernel` must be a numpy ndarray, " f"got {type(kernel).__name__}" ) if kernel.ndim != 2: raise ValueError( f"{func_name}(): `kernel` must be 2D, got {kernel.ndim}D" ) if kernel.shape[0] % 2 == 0 or kernel.shape[1] % 2 == 0: raise ValueError( f"{func_name}(): `kernel` dimensions must be odd, " f"got {kernel.shape}" ) return kernel.astype(np.uint8) # --------------------------------------------------------------------------- # numpy backend # --------------------------------------------------------------------------- @ngjit def _erode_kernel_numpy(data, kernel, rows, cols, ky, kx): """Erosion (local minimum) on a padded array. Only kernel cells with non-zero entries contribute to the output. The centre cell is included only when ``kernel[hy, hx]`` is non-zero. NaN neighbours included by the kernel propagate to NaN. Cells where the kernel covers no non-zero entries return NaN. """ out = np.empty((rows, cols), dtype=data.dtype) for i in prange(rows): for j in range(cols): mn = np.nan seen = False for dy in range(ky): for dx in range(kx): if kernel[dy, dx] == 0: continue v = data[i + dy, j + dx] if v != v: # NaN neighbour propagates mn = v seen = True break if not seen or v < mn: mn = v seen = True if seen and mn != mn: break out[i, j] = mn return out @ngjit def _dilate_kernel_numpy(data, kernel, rows, cols, ky, kx): """Dilation (local maximum) on a padded array. Only kernel cells with non-zero entries contribute to the output. The centre cell is included only when ``kernel[hy, hx]`` is non-zero. NaN neighbours included by the kernel propagate to NaN. Cells where the kernel covers no non-zero entries return NaN. """ out = np.empty((rows, cols), dtype=data.dtype) for i in prange(rows): for j in range(cols): mx = np.nan seen = False for dy in range(ky): for dx in range(kx): if kernel[dy, dx] == 0: continue v = data[i + dy, j + dx] if v != v: # NaN neighbour propagates mx = v seen = True break if not seen or v > mx: mx = v seen = True if seen and mx != mx: break out[i, j] = mx return out def _morph_numpy(data, kernel, op, boundary): """Run erosion or dilation on a numpy array.""" ky, kx = kernel.shape hy, hx = ky // 2, kx // 2 rows, cols = data.shape arr = data.astype(np.float64) if boundary == 'nan': padded = np.pad(arr, ((hy, hy), (hx, hx)), mode='constant', constant_values=np.nan) else: padded = _pad_array(arr, (hy, hx), boundary) if op == 'erode': return _erode_kernel_numpy(padded, kernel, rows, cols, ky, kx) else: return _dilate_kernel_numpy(padded, kernel, rows, cols, ky, kx) def _erode_numpy(data, kernel, boundary): return _morph_numpy(data, kernel, 'erode', boundary) def _dilate_numpy(data, kernel, boundary): return _morph_numpy(data, kernel, 'dilate', boundary) def _opening_numpy(data, kernel, boundary): tmp = _morph_numpy(data, kernel, 'erode', boundary) return _morph_numpy(tmp, kernel, 'dilate', boundary) def _closing_numpy(data, kernel, boundary): tmp = _morph_numpy(data, kernel, 'dilate', boundary) return _morph_numpy(tmp, kernel, 'erode', boundary) # --------------------------------------------------------------------------- # cupy backend # --------------------------------------------------------------------------- @cuda.jit def _erode_gpu(data, kernel, out, hy, hx, ky, kx): i, j = cuda.grid(2) rows = out.shape[0] cols = out.shape[1] if i < rows and j < cols: mn = np.nan seen = False for dy in range(ky): for dx in range(kx): if kernel[dy, dx] == 0: continue v = data[i + dy, j + dx] if v != v: mn = v seen = True break if not seen or v < mn: mn = v seen = True if seen and mn != mn: break out[i, j] = mn @cuda.jit def _dilate_gpu(data, kernel, out, hy, hx, ky, kx): i, j = cuda.grid(2) rows = out.shape[0] cols = out.shape[1] if i < rows and j < cols: mx = np.nan seen = False for dy in range(ky): for dx in range(kx): if kernel[dy, dx] == 0: continue v = data[i + dy, j + dx] if v != v: mx = v seen = True break if not seen or v > mx: mx = v seen = True if seen and mx != mx: break out[i, j] = mx def _morph_cupy(data, kernel, op, boundary): import cupy as cp ky, kx = kernel.shape hy, hx = ky // 2, kx // 2 rows, cols = data.shape arr = cp.asarray(data, dtype=cp.float64) kern_dev = cp.asarray(kernel) if boundary == 'nan': padded = cp.pad(arr, ((hy, hy), (hx, hx)), mode='constant', constant_values=cp.nan) else: padded = _pad_array(arr, (hy, hx), boundary) out = cp.empty((rows, cols), dtype=cp.float64) bpg, tpb = calc_cuda_dims((rows, cols)) if op == 'erode': _erode_gpu[bpg, tpb](padded, kern_dev, out, hy, hx, ky, kx) else: _dilate_gpu[bpg, tpb](padded, kern_dev, out, hy, hx, ky, kx) return out def _erode_cupy(data, kernel, boundary): return _morph_cupy(data, kernel, 'erode', boundary) def _dilate_cupy(data, kernel, boundary): return _morph_cupy(data, kernel, 'dilate', boundary) def _opening_cupy(data, kernel, boundary): tmp = _morph_cupy(data, kernel, 'erode', boundary) return _morph_cupy(tmp, kernel, 'dilate', boundary) def _closing_cupy(data, kernel, boundary): tmp = _morph_cupy(data, kernel, 'dilate', boundary) return _morph_cupy(tmp, kernel, 'erode', boundary) # --------------------------------------------------------------------------- # dask + numpy backend # --------------------------------------------------------------------------- def _morph_chunk_numpy(chunk, kernel, op, block_info=None): """Process a single overlapped dask chunk.""" ky, kx = kernel.shape hy, hx = ky // 2, kx // 2 rows = chunk.shape[0] - 2 * hy cols = chunk.shape[1] - 2 * hx if op == 'erode': interior = _erode_kernel_numpy(chunk, kernel, rows, cols, ky, kx) else: interior = _dilate_kernel_numpy(chunk, kernel, rows, cols, ky, kx) result = chunk.copy() result[hy:hy + rows, hx:hx + cols] = interior return result def _morph_dask_numpy(data, kernel, op, boundary): ky, kx = kernel.shape hy, hx = ky // 2, kx // 2 _func = partial(_morph_chunk_numpy, kernel=kernel, op=op) return data.astype(np.float64).map_overlap( _func, depth=(hy, hx), boundary=_boundary_to_dask(boundary), meta=np.array(()), ) def _erode_dask_numpy(data, kernel, boundary): return _morph_dask_numpy(data, kernel, 'erode', boundary) def _dilate_dask_numpy(data, kernel, boundary): return _morph_dask_numpy(data, kernel, 'dilate', boundary) def _opening_dask_numpy(data, kernel, boundary): tmp = _morph_dask_numpy(data, kernel, 'erode', boundary) return _morph_dask_numpy(tmp, kernel, 'dilate', boundary) def _closing_dask_numpy(data, kernel, boundary): tmp = _morph_dask_numpy(data, kernel, 'dilate', boundary) return _morph_dask_numpy(tmp, kernel, 'erode', boundary) # --------------------------------------------------------------------------- # dask + cupy backend # --------------------------------------------------------------------------- def _morph_chunk_cupy(chunk, kernel, op, block_info=None): """Process a single overlapped dask+cupy chunk.""" import cupy as cp ky, kx = kernel.shape hy, hx = ky // 2, kx // 2 rows = chunk.shape[0] - 2 * hy cols = chunk.shape[1] - 2 * hx kern_dev = cp.asarray(kernel) out = cp.empty((rows, cols), dtype=cp.float64) bpg, tpb = calc_cuda_dims((rows, cols)) if op == 'erode': _erode_gpu[bpg, tpb](chunk, kern_dev, out, hy, hx, ky, kx) else: _dilate_gpu[bpg, tpb](chunk, kern_dev, out, hy, hx, ky, kx) result = chunk.copy() result[hy:hy + rows, hx:hx + cols] = out return result def _morph_dask_cupy(data, kernel, op, boundary): import cupy as cp ky, kx = kernel.shape hy, hx = ky // 2, kx // 2 _func = partial(_morph_chunk_cupy, kernel=kernel, op=op) return data.astype(cp.float64).map_overlap( _func, depth=(hy, hx), boundary=_boundary_to_dask(boundary, is_cupy=True), meta=cp.array(()), ) def _erode_dask_cupy(data, kernel, boundary): return _morph_dask_cupy(data, kernel, 'erode', boundary) def _dilate_dask_cupy(data, kernel, boundary): return _morph_dask_cupy(data, kernel, 'dilate', boundary) def _opening_dask_cupy(data, kernel, boundary): tmp = _morph_dask_cupy(data, kernel, 'erode', boundary) return _morph_dask_cupy(tmp, kernel, 'dilate', boundary) def _closing_dask_cupy(data, kernel, boundary): tmp = _morph_dask_cupy(data, kernel, 'dilate', boundary) return _morph_dask_cupy(tmp, kernel, 'erode', boundary) # --------------------------------------------------------------------------- # Dispatcher helpers # --------------------------------------------------------------------------- def _dispatch(agg, kernel, boundary, name, numpy_fn, cupy_fn, dask_fn, dask_cupy_fn): """Common dispatch logic for all four morphological functions.""" _validate_raster(agg, func_name=name, name='agg', ndim=2) kernel = _validate_kernel(kernel, name) _validate_boundary(boundary) rows, cols = agg.shape ky, kx = kernel.shape _check_kernel_memory(rows, cols, ky, kx, name) mapper = ArrayTypeFunctionMapping( numpy_func=partial(numpy_fn, kernel=kernel, boundary=boundary), cupy_func=partial(cupy_fn, kernel=kernel, boundary=boundary), dask_func=partial(dask_fn, kernel=kernel, boundary=boundary), dask_cupy_func=partial(dask_cupy_fn, kernel=kernel, boundary=boundary), ) out = mapper(agg)(agg.data) return DataArray( out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs, ) # --------------------------------------------------------------------------- # Public API # ---------------------------------------------------------------------------
[docs] @supports_dataset def morph_erode(agg, kernel=None, boundary='nan', name='erode'): """Apply morphological erosion (local minimum) to a 2D raster. Each output cell receives the minimum value found within the structuring element footprint centred on that cell. Parameters ---------- agg : xarray.DataArray 2D raster of numeric values. kernel : numpy.ndarray or None 2D structuring element with odd dimensions. Non-zero entries mark the footprint. Defaults to a 3x3 square (``np.ones((3, 3), dtype=np.uint8)``). boundary : str, default ``'nan'`` Edge handling: ``'nan'``, ``'nearest'``, ``'reflect'``, or ``'wrap'``. name : str, default ``'erode'`` Name for the output DataArray. Returns ------- xarray.DataArray Eroded raster with the same shape, coords, dims, and attrs as the input. Examples -------- >>> from xrspatial.morphology import morph_erode >>> eroded = morph_erode(raster, kernel=np.ones((5, 5), dtype=np.uint8)) """ if kernel is None: kernel = np.ones((3, 3), dtype=np.uint8) return _dispatch( agg, kernel, boundary, name, _erode_numpy, _erode_cupy, _erode_dask_numpy, _erode_dask_cupy, )
[docs] @supports_dataset def morph_dilate(agg, kernel=None, boundary='nan', name='dilate'): """Apply morphological dilation (local maximum) to a 2D raster. Each output cell receives the maximum value found within the structuring element footprint centred on that cell. Parameters ---------- agg : xarray.DataArray 2D raster of numeric values. kernel : numpy.ndarray or None 2D structuring element with odd dimensions. Non-zero entries mark the footprint. Defaults to a 3x3 square. boundary : str, default ``'nan'`` Edge handling: ``'nan'``, ``'nearest'``, ``'reflect'``, or ``'wrap'``. name : str, default ``'dilate'`` Name for the output DataArray. Returns ------- xarray.DataArray Dilated raster. Examples -------- >>> from xrspatial.morphology import morph_dilate >>> dilated = morph_dilate(raster, kernel=np.ones((5, 5), dtype=np.uint8)) """ if kernel is None: kernel = np.ones((3, 3), dtype=np.uint8) return _dispatch( agg, kernel, boundary, name, _dilate_numpy, _dilate_cupy, _dilate_dask_numpy, _dilate_dask_cupy, )
[docs] @supports_dataset def morph_opening(agg, kernel=None, boundary='nan', name='opening'): """Apply morphological opening (erosion then dilation) to a 2D raster. Opening removes small bright features and salt noise while preserving the overall shape of larger structures. Parameters ---------- agg : xarray.DataArray 2D raster of numeric values. kernel : numpy.ndarray or None 2D structuring element with odd dimensions. Defaults to a 3x3 square. boundary : str, default ``'nan'`` Edge handling: ``'nan'``, ``'nearest'``, ``'reflect'``, or ``'wrap'``. name : str, default ``'opening'`` Name for the output DataArray. Returns ------- xarray.DataArray Opened raster. Examples -------- >>> from xrspatial.morphology import morph_opening >>> cleaned = morph_opening(binary_mask) """ if kernel is None: kernel = np.ones((3, 3), dtype=np.uint8) return _dispatch( agg, kernel, boundary, name, _opening_numpy, _opening_cupy, _opening_dask_numpy, _opening_dask_cupy, )
[docs] @supports_dataset def morph_closing(agg, kernel=None, boundary='nan', name='closing'): """Apply morphological closing (dilation then erosion) to a 2D raster. Closing fills small dark gaps and pepper noise while preserving the overall shape of larger structures. Parameters ---------- agg : xarray.DataArray 2D raster of numeric values. kernel : numpy.ndarray or None 2D structuring element with odd dimensions. Defaults to a 3x3 square. boundary : str, default ``'nan'`` Edge handling: ``'nan'``, ``'nearest'``, ``'reflect'``, or ``'wrap'``. name : str, default ``'closing'`` Name for the output DataArray. Returns ------- xarray.DataArray Closed raster. Examples -------- >>> from xrspatial.morphology import morph_closing >>> filled = morph_closing(binary_mask) """ if kernel is None: kernel = np.ones((3, 3), dtype=np.uint8) return _dispatch( agg, kernel, boundary, name, _closing_numpy, _closing_cupy, _closing_dask_numpy, _closing_dask_cupy, )
[docs] @supports_dataset def morph_gradient(agg, kernel=None, boundary='nan', name='gradient'): """Morphological gradient: dilation minus erosion. Highlights edges and transitions in a 2D raster. The result is always non-negative for non-NaN cells. Parameters ---------- agg : xarray.DataArray 2D raster of numeric values. kernel : numpy.ndarray or None 2D structuring element with odd dimensions. Defaults to a 3x3 square. boundary : str, default ``'nan'`` Edge handling: ``'nan'``, ``'nearest'``, ``'reflect'``, or ``'wrap'``. name : str, default ``'gradient'`` Name for the output DataArray. Returns ------- xarray.DataArray Gradient raster (dilate - erode). Examples -------- >>> from xrspatial.morphology import morph_gradient >>> edges = morph_gradient(elevation) """ dilated = morph_dilate(agg, kernel=kernel, boundary=boundary) eroded = morph_erode(agg, kernel=kernel, boundary=boundary) result = dilated - eroded result.name = name return result
[docs] @supports_dataset def morph_white_tophat(agg, kernel=None, boundary='nan', name='white_tophat'): """White top-hat: original minus opening. Isolates bright features that are smaller than the structuring element. Parameters ---------- agg : xarray.DataArray 2D raster of numeric values. kernel : numpy.ndarray or None 2D structuring element with odd dimensions. Defaults to a 3x3 square. boundary : str, default ``'nan'`` Edge handling: ``'nan'``, ``'nearest'``, ``'reflect'``, or ``'wrap'``. name : str, default ``'white_tophat'`` Name for the output DataArray. Returns ------- xarray.DataArray White top-hat raster (original - opening). Examples -------- >>> from xrspatial.morphology import morph_white_tophat >>> bright = morph_white_tophat(elevation, kernel=circle_kernel) """ opened = morph_opening(agg, kernel=kernel, boundary=boundary) result = agg - opened result.name = name return result
[docs] @supports_dataset def morph_black_tophat(agg, kernel=None, boundary='nan', name='black_tophat'): """Black top-hat: closing minus original. Isolates dark features that are smaller than the structuring element. Parameters ---------- agg : xarray.DataArray 2D raster of numeric values. kernel : numpy.ndarray or None 2D structuring element with odd dimensions. Defaults to a 3x3 square. boundary : str, default ``'nan'`` Edge handling: ``'nan'``, ``'nearest'``, ``'reflect'``, or ``'wrap'``. name : str, default ``'black_tophat'`` Name for the output DataArray. Returns ------- xarray.DataArray Black top-hat raster (closing - original). Examples -------- >>> from xrspatial.morphology import morph_black_tophat >>> dark = morph_black_tophat(elevation, kernel=circle_kernel) """ closed = morph_closing(agg, kernel=kernel, boundary=boundary) result = closed - agg result.name = name return result