Source code for xrspatial.diffusion

"""Scalar diffusion solver for raster fields.

Solves the 2D diffusion (heat) equation using an explicit forward-Euler
finite-difference scheme with a 5-point Laplacian stencil::

    du/dt = alpha * laplacian(u)

Supports uniform or spatially varying diffusivity and all four backends
(numpy, cupy, dask+numpy, dask+cupy).
"""

from __future__ import annotations

from functools import partial

import numpy as np
import xarray as xr
from xarray import DataArray

from numba import cuda, prange

try:
    import cupy
except ImportError:
    class cupy:
        ndarray = False

try:
    import dask.array as da
except ImportError:
    da = None

from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    _boundary_to_dask,
    _pad_array,
    _validate_boundary,
    _validate_raster,
    _validate_scalar,
    calc_cuda_dims,
    has_cuda_and_cupy,
    is_cupy_array,
    ngjit,
    not_implemented_func,
)


# ---------------------------------------------------------------------------
# Memory guard
# ---------------------------------------------------------------------------
#
# Per-step working set on the eager numpy / cupy paths:
#   * u           : rows*cols  float64 (8 B)
#   * out         : rows*cols  float64 (8 B)
#   * alpha_arr   : rows*cols  float64 (8 B)   (only when materialised)
#   * padded copy : rows*cols  float64 (8 B)   (rough upper bound for the
#                                              padded buffer, ignoring the
#                                              1-cell border)
# Total ~32 bytes per pixel.  For dask paths the equivalent footprint is
# per-chunk and is bounded by chunk size, so the public-API guard targets
# the eager paths only.
_BYTES_PER_PIXEL = 32

# Hard cap on the number of explicit Euler iterations a single diffuse()
# call will run.  100k steps on a modest 1024x1024 raster is already minutes
# of CPU work; anything beyond this is almost certainly a misuse.
_MAX_STEPS = 100_000


def _available_memory_bytes():
    """Best-effort estimate of available memory in bytes."""
    try:
        with open('/proc/meminfo', 'r') as f:
            for line in f:
                if line.startswith('MemAvailable:'):
                    return int(line.split()[1]) * 1024  # kB -> bytes
    except (OSError, ValueError, IndexError):
        pass
    try:
        import psutil
        return psutil.virtual_memory().available
    except (ImportError, AttributeError):
        pass
    return 2 * 1024 ** 3  # 2 GB fallback


def _available_gpu_memory_bytes():
    """Best-effort estimate of free GPU memory in bytes.

    Returns 0 when the query fails -- callers treat that as a sentinel
    meaning "no GPU info, skip the guard".
    """
    try:
        import cupy as _cp
        free, _total = _cp.cuda.runtime.memGetInfo()
        return int(free)
    except Exception:
        return 0


def _check_memory(rows, cols):
    """Raise MemoryError if the eager numpy diffuse buffers exceed 50% of RAM."""
    required = int(rows) * int(cols) * _BYTES_PER_PIXEL
    available = _available_memory_bytes()
    if required > 0.5 * available:
        raise MemoryError(
            f"diffuse() on a {rows}x{cols} raster needs "
            f"~{required / 1e9:.1f} GB of working memory, but only "
            f"~{available / 1e9:.1f} GB is available. "
            f"Use a dask-backed DataArray for out-of-core processing."
        )


def _check_gpu_memory(rows, cols):
    """Raise MemoryError if the CuPy diffuse buffers exceed 50% of free GPU RAM.

    Skipped (returns silently) when the free-memory query fails -- the
    kernel will then fail at the cupy.asarray boundary anyway.
    """
    available = _available_gpu_memory_bytes()
    if available <= 0:
        return
    required = int(rows) * int(cols) * _BYTES_PER_PIXEL
    if required > 0.5 * available:
        raise MemoryError(
            f"diffuse() on a {rows}x{cols} raster needs "
            f"~{required / 1e9:.1f} GB of GPU working memory, but only "
            f"~{available / 1e9:.1f} GB is free on the active device. "
            f"Use a dask+cupy DataArray for out-of-core processing."
        )


# ---- numpy backend ----

@ngjit
def _diffuse_step_numpy(u, alpha_arr, dt_over_dx2, rows, cols):
    """One forward-Euler step on a padded array.

    *u* has shape ``(rows+2, cols+2)`` (1-cell pad on each side).
    Writes the updated interior back into *u* in-place.
    """
    out = np.empty((rows, cols), dtype=u.dtype)
    for i in prange(rows):
        ip = i + 1  # padded index
        for j in range(cols):
            jp = j + 1
            val = u[ip, jp]
            if np.isnan(val):
                out[i, j] = np.nan
                continue
            n = u[ip - 1, jp]
            s = u[ip + 1, jp]
            w = u[ip, jp - 1]
            e = u[ip, jp + 1]
            lap = n + s + w + e - 4.0 * val
            out[i, j] = val + dt_over_dx2 * alpha_arr[i, j] * lap
    return out


def _diffuse_numpy(data, alpha_arr, steps, dt_over_dx2, boundary):
    rows, cols = data.shape
    _check_memory(rows, cols)
    u = data.astype(np.float64)
    for _ in range(steps):
        if boundary == 'nan':
            padded = np.pad(u, 1, mode='constant', constant_values=np.nan)
        else:
            padded = _pad_array(u, 1, boundary)
        u = _diffuse_step_numpy(padded, alpha_arr, dt_over_dx2, rows, cols)
    return u


# ---- cupy backend ----

@cuda.jit
def _diffuse_step_gpu(u, alpha_arr, dt_over_dx2, out):
    """One forward-Euler step.  *u* is padded (rows+2, cols+2)."""
    i, j = cuda.grid(2)
    rows = out.shape[0]
    cols = out.shape[1]
    if i < rows and j < cols:
        ip = i + 1
        jp = j + 1
        val = u[ip, jp]
        if val != val:  # NaN check in device code
            out[i, j] = val
            return
        n = u[ip - 1, jp]
        s = u[ip + 1, jp]
        w = u[ip, jp - 1]
        e = u[ip, jp + 1]
        lap = n + s + w + e - 4.0 * val
        out[i, j] = val + dt_over_dx2 * alpha_arr[i, j] * lap


def _diffuse_cupy(data, alpha_arr, steps, dt_over_dx2, boundary):
    import cupy as cp

    rows, cols = data.shape
    _check_gpu_memory(rows, cols)
    u = cp.asarray(data, dtype=cp.float64)
    alpha_dev = cp.asarray(alpha_arr, dtype=cp.float64)
    bpg, tpb = calc_cuda_dims((rows, cols))

    for _ in range(steps):
        if boundary == 'nan':
            padded = cp.pad(u, 1, mode='constant', constant_values=cp.nan)
        else:
            padded = _pad_array(u, 1, boundary)
        out = cp.empty_like(u)
        _diffuse_step_gpu[bpg, tpb](padded, alpha_dev, dt_over_dx2, out)
        u = out
    return u


# ---- dask + numpy backend ----

def _diffuse_chunk_numpy(chunk, alpha_chunk, steps, dt_over_dx2, block_info=None):
    """Process a single dask chunk (already overlapped by 1 cell).

    ``alpha_chunk`` may be a scalar (uniform diffusivity) or a 2-D array
    matching the overlapped chunk shape.
    """
    rows = chunk.shape[0] - 2
    cols = chunk.shape[1] - 2
    if np.ndim(alpha_chunk) == 0:
        # Scalar diffusivity — broadcast to interior shape
        interior_alpha = np.broadcast_to(float(alpha_chunk),
                                         (rows, cols))
    else:
        interior_alpha = alpha_chunk[1:-1, 1:-1]

    u = chunk.copy()
    for _ in range(steps):
        interior = _diffuse_step_numpy(u, interior_alpha, dt_over_dx2, rows, cols)
        u[1:-1, 1:-1] = interior
    return u


def _diffuse_dask_numpy(data, alpha, steps, dt_over_dx2, boundary):
    """Dask+numpy backend.

    ``alpha`` is either a Python float (scalar diffusivity) or a dask
    array matching data's shape (spatially varying diffusivity).
    """
    if isinstance(alpha, (int, float, np.floating)):
        # Scalar: pass directly — no full-raster allocation, tiny closure.
        _func = partial(
            _diffuse_chunk_numpy,
            alpha_chunk=float(alpha),
            steps=1,
            dt_over_dx2=dt_over_dx2,
        )
    else:
        # Spatially varying: alpha is a dask array.  map_overlap will
        # feed matching chunks automatically.
        _func = partial(
            _diffuse_chunk_numpy,
            steps=1,
            dt_over_dx2=dt_over_dx2,
        )
    u = data.astype(np.float64)
    for _ in range(steps):
        if isinstance(alpha, (int, float, np.floating)):
            u = u.map_overlap(
                _func,
                depth=(1, 1),
                boundary=_boundary_to_dask(boundary),
                meta=np.array(()),
            )
        else:
            # Pass alpha as a second dask argument to map_overlap
            u = da.map_overlap(
                _diffuse_chunk_numpy,
                u, alpha,
                depth=(1, 1),
                boundary=_boundary_to_dask(boundary),
                meta=np.array(()),
                steps=1,
                dt_over_dx2=dt_over_dx2,
            )
    return u


# ---- dask + cupy backend ----

def _diffuse_chunk_cupy(chunk, alpha_chunk, dt_over_dx2, block_info=None):
    import cupy as cp

    rows = chunk.shape[0] - 2
    cols = chunk.shape[1] - 2
    if np.ndim(alpha_chunk) == 0:
        alpha_dev = cp.full((rows, cols), float(alpha_chunk), dtype=cp.float64)
    else:
        alpha_dev = cp.asarray(alpha_chunk[1:-1, 1:-1], dtype=cp.float64)
    bpg, tpb = calc_cuda_dims((rows, cols))
    out = cp.empty((rows, cols), dtype=cp.float64)
    _diffuse_step_gpu[bpg, tpb](chunk, alpha_dev, dt_over_dx2, out)
    # rebuild padded shape expected by map_overlap
    result = chunk.copy()
    result[1:-1, 1:-1] = out
    return result


def _diffuse_dask_cupy(data, alpha, steps, dt_over_dx2, boundary):
    import cupy as cp

    u = data.astype(cp.float64)
    for _ in range(steps):
        if isinstance(alpha, (int, float, np.floating)):
            _func = partial(
                _diffuse_chunk_cupy,
                alpha_chunk=float(alpha),
                dt_over_dx2=dt_over_dx2,
            )
            u = u.map_overlap(
                _func,
                depth=(1, 1),
                boundary=_boundary_to_dask(boundary, is_cupy=True),
                meta=cp.array(()),
            )
        else:
            u = da.map_overlap(
                _diffuse_chunk_cupy,
                u, alpha,
                depth=(1, 1),
                boundary=_boundary_to_dask(boundary, is_cupy=True),
                meta=cp.array(()),
                dt_over_dx2=dt_over_dx2,
            )
    return u


# ---- public API ----

[docs] def diffuse( agg, diffusivity=1.0, steps=1, dt=None, boundary='nan', name='diffuse', ): """Run forward-Euler diffusion on a 2D scalar field. Solves ``du/dt = alpha * laplacian(u)`` using an explicit 5-point stencil for *steps* time steps. Parameters ---------- agg : xarray.DataArray 2D raster representing the initial scalar field (temperature, concentration, etc.). diffusivity : float or xarray.DataArray Thermal/mass diffusivity. A scalar applies uniformly; a DataArray of the same shape allows spatially varying diffusivity. steps : int, default 1 Number of time steps to run. Capped at 100,000 to prevent a single call from pinning a CPU indefinitely. dt : float or None Time step size. When ``None`` the largest stable step is chosen automatically: ``dt = 0.25 * dx**2 / max(alpha)``. boundary : str, default ``'nan'`` Edge handling: ``'nan'``, ``'nearest'``, ``'reflect'``, or ``'wrap'``. name : str, default ``'diffuse'`` Name for the output DataArray. Returns ------- xarray.DataArray Scalar field after *steps* diffusion steps. """ _validate_raster(agg, func_name='diffuse', name='agg', ndim=2) _validate_scalar( steps, func_name='diffuse', name='steps', dtype=int, min_val=1, max_val=_MAX_STEPS, ) _validate_boundary(boundary) # resolve diffusivity # - scalar: keep as float; only materialised to a full raster if the # dispatched backend is eager (numpy/cupy) and actually needs it # - DataArray: keep as .data (numpy/cupy/dask) for backend dispatch if isinstance(diffusivity, xr.DataArray): _validate_raster(diffusivity, func_name='diffuse', name='diffusivity', ndim=2) if diffusivity.shape != agg.shape: raise ValueError( f"diffuse(): diffusivity shape {diffusivity.shape} " f"does not match agg shape {agg.shape}" ) alpha_scalar = None alpha_data = diffusivity.data # may be numpy, cupy, or dask # For numpy/cupy eager paths, materialize to numpy if da is not None and isinstance(alpha_data, da.Array): alpha_arr_eager = None # deferred — only built if needed else: if hasattr(alpha_data, 'get'): alpha_arr_eager = alpha_data.get().astype(np.float64) else: alpha_arr_eager = np.asarray(alpha_data, dtype=np.float64) elif isinstance(diffusivity, (int, float)): if diffusivity <= 0: raise ValueError( f"diffuse(): diffusivity must be > 0, got {diffusivity}" ) alpha_scalar = float(diffusivity) alpha_data = None # Defer the np.full allocation: dask paths can use the scalar # directly, and the eager paths build their own broadcast view # below after passing the memory guard. alpha_arr_eager = None else: raise TypeError( f"diffuse(): diffusivity must be a float or xr.DataArray, " f"got {type(diffusivity).__name__}" ) # resolve cell size — assume square cells, use x dimension res = agg.attrs.get('res') if isinstance(res, (tuple, list, np.ndarray)) and len(res) >= 1: dx = float(res[0]) if isinstance(res[0], (int, float)) else 1.0 elif isinstance(res, (int, float)): dx = float(res) else: dx = 1.0 if alpha_scalar is not None: alpha_max = alpha_scalar elif alpha_arr_eager is not None: alpha_max = float(np.nanmax(alpha_arr_eager)) else: # dask DataArray diffusivity — compute max lazily alpha_max = float(da.nanmax(alpha_data).compute()) if alpha_max <= 0: raise ValueError("diffuse(): all diffusivity values must be > 0") # auto dt from CFL stability condition cfl_max = 0.25 * dx * dx / alpha_max if dt is None: dt = cfl_max else: _validate_scalar(dt, func_name='diffuse', name='dt', dtype=(int, float), min_val=0, min_exclusive=True) # Explicit forward-Euler is unconditionally unstable above the CFL # bound; without this check, oversized dt silently produces Inf/NaN # that compound across iterations. if dt > cfl_max: raise ValueError( f"diffuse(): dt={dt} exceeds the CFL stability bound " f"{cfl_max} (= 0.25 * dx**2 / max(alpha)). " f"Pass dt=None to use the largest stable step automatically, " f"or reduce dt below {cfl_max}." ) dt_over_dx2 = float(dt) / (dx * dx) # Decide which backend will run before materialising any full-raster # alpha buffer. This matters for the dask + scalar case: we want to # leave alpha as a scalar all the way through to the per-chunk worker # rather than allocating an N*M float64 raster up front. is_eager_numpy = isinstance(agg.data, np.ndarray) is_eager_cupy = ( has_cuda_and_cupy() and is_cupy_array(agg.data) ) needs_eager_alpha = is_eager_numpy or is_eager_cupy if needs_eager_alpha and alpha_arr_eager is None: if alpha_scalar is not None: # Run the memory guard *before* allocating the full raster. # The guard already accounts for alpha_arr in its budget. rows_, cols_ = agg.shape if is_eager_cupy: _check_gpu_memory(rows_, cols_) else: _check_memory(rows_, cols_) alpha_arr_eager = np.full(agg.shape, alpha_scalar, dtype=np.float64) elif alpha_data is not None: # Dask DataArray diffusivity but the input array itself is eager. # This is unusual but supported: materialise alpha now. alpha_arr_eager = alpha_data.compute() if hasattr(alpha_arr_eager, 'get'): alpha_arr_eager = alpha_arr_eager.get() alpha_arr_eager = np.asarray(alpha_arr_eager, dtype=np.float64) dask_alpha = alpha_scalar if alpha_scalar is not None else alpha_data # dispatch to backend mapper = ArrayTypeFunctionMapping( numpy_func=partial(_diffuse_numpy, alpha_arr=alpha_arr_eager, steps=steps, dt_over_dx2=dt_over_dx2, boundary=boundary), cupy_func=partial(_diffuse_cupy, alpha_arr=alpha_arr_eager, steps=steps, dt_over_dx2=dt_over_dx2, boundary=boundary), dask_func=partial(_diffuse_dask_numpy, alpha=dask_alpha, steps=steps, dt_over_dx2=dt_over_dx2, boundary=boundary), dask_cupy_func=partial(_diffuse_dask_cupy, alpha=dask_alpha, steps=steps, dt_over_dx2=dt_over_dx2, boundary=boundary), ) out = mapper(agg)(agg.data) return DataArray( out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs, )