Source code for xrspatial.erosion

from __future__ import annotations

import numpy as np
import xarray as xr
from numba import jit

try:
    import cupy
    from numba import cuda
except ImportError:
    class cupy(object):
        ndarray = False
    cuda = None

try:
    import dask.array as da
except ImportError:
    da = None

from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    has_cuda_and_cupy,
    is_cupy_array,
    is_dask_cupy,
)

# Default erosion parameters
_DEFAULT_PARAMS = dict(
    inertia=0.05,
    capacity=4.0,
    deposition=0.3,
    erosion=0.3,
    evaporation=0.01,
    gravity=4.0,
    min_slope=0.01,
    radius=3,
    max_lifetime=30,
)


def _build_brush(radius):
    """Precompute brush offsets and weights for the erosion kernel."""
    offsets_y = []
    offsets_x = []
    weights = []
    for dy in range(-radius, radius + 1):
        for dx in range(-radius, radius + 1):
            dist2 = dx * dx + dy * dy
            if dist2 <= radius * radius:
                w = max(0.0, radius - dist2 ** 0.5)
                offsets_y.append(dy)
                offsets_x.append(dx)
                weights.append(w)
    weight_sum = sum(weights)
    boy = np.array(offsets_y, dtype=np.int32)
    box = np.array(offsets_x, dtype=np.int32)
    bw = np.array([w / weight_sum for w in weights], dtype=np.float64)
    return boy, box, bw


@jit(nopython=True, nogil=True)
def _erode_cpu(heightmap, random_pos, boy, box, bw,
               inertia, capacity, deposition, erosion_rate,
               evaporation, gravity, min_slope, max_lifetime):
    """Particle-based hydraulic erosion on a 2D heightmap (CPU).

    random_pos : float64 array of shape (n_iterations, 2) with pre-generated
                 random starting positions in [0, 1).
    """
    height, width = heightmap.shape
    n_iterations = random_pos.shape[0]
    n_brush = bw.shape[0]

    for iteration in range(n_iterations):
        pos_x = random_pos[iteration, 0] * (width - 3) + 1
        pos_y = random_pos[iteration, 1] * (height - 3) + 1
        dir_x = 0.0
        dir_y = 0.0
        speed = 1.0
        water = 1.0
        sediment = 0.0

        for step in range(max_lifetime):
            node_x = int(pos_x)
            node_y = int(pos_y)

            if node_x < 1 or node_x >= width - 2 or node_y < 1 or node_y >= height - 2:
                break

            fx = pos_x - node_x
            fy = pos_y - node_y

            h00 = heightmap[node_y, node_x]
            h10 = heightmap[node_y, node_x + 1]
            h01 = heightmap[node_y + 1, node_x]
            h11 = heightmap[node_y + 1, node_x + 1]

            grad_x = (h10 - h00) * (1 - fy) + (h11 - h01) * fy
            grad_y = (h01 - h00) * (1 - fx) + (h11 - h10) * fx

            dir_x = dir_x * inertia - grad_x * (1 - inertia)
            dir_y = dir_y * inertia - grad_y * (1 - inertia)

            dir_len = (dir_x * dir_x + dir_y * dir_y) ** 0.5
            if dir_len < 1e-10:
                break
            dir_x /= dir_len
            dir_y /= dir_len

            new_x = pos_x + dir_x
            new_y = pos_y + dir_y

            if new_x < 1 or new_x >= width - 2 or new_y < 1 or new_y >= height - 2:
                break

            h_old = h00 * (1 - fx) * (1 - fy) + h10 * fx * (1 - fy) + \
                    h01 * (1 - fx) * fy + h11 * fx * fy

            new_node_x = int(new_x)
            new_node_y = int(new_y)
            new_fx = new_x - new_node_x
            new_fy = new_y - new_node_y
            h_new = (heightmap[new_node_y, new_node_x] * (1 - new_fx) * (1 - new_fy) +
                     heightmap[new_node_y, new_node_x + 1] * new_fx * (1 - new_fy) +
                     heightmap[new_node_y + 1, new_node_x] * (1 - new_fx) * new_fy +
                     heightmap[new_node_y + 1, new_node_x + 1] * new_fx * new_fy)

            h_diff = h_new - h_old

            sed_capacity = max(-h_diff, min_slope) * speed * water * capacity

            if sediment > sed_capacity or h_diff > 0:
                if h_diff > 0:
                    amount = min(h_diff, sediment)
                else:
                    amount = (sediment - sed_capacity) * deposition

                sediment -= amount

                heightmap[node_y, node_x] += amount * (1 - fx) * (1 - fy)
                heightmap[node_y, node_x + 1] += amount * fx * (1 - fy)
                heightmap[node_y + 1, node_x] += amount * (1 - fx) * fy
                heightmap[node_y + 1, node_x + 1] += amount * fx * fy
            else:
                amount = min((sed_capacity - sediment) * erosion_rate, -h_diff)

                for k in range(n_brush):
                    ey = node_y + boy[k]
                    ex = node_x + box[k]
                    if 0 <= ey < height and 0 <= ex < width:
                        heightmap[ey, ex] -= amount * bw[k]

                sediment += amount

            speed_sq = speed * speed + h_diff * gravity
            speed = speed_sq ** 0.5 if speed_sq > 0 else 0.0
            water *= (1 - evaporation)

            pos_x = new_x
            pos_y = new_y

    return heightmap


# ---------------------------------------------------------------------------
# GPU (CuPy / CUDA) implementation
# ---------------------------------------------------------------------------

if cuda is not None:

    @cuda.jit
    def _erode_gpu_kernel(
        heightmap, random_pos, boy, box, bw,
        inertia, capacity, deposition, erosion_rate,
        evaporation, gravity, min_slope, max_lifetime,
    ):
        """One CUDA thread per particle.

        Particles within a single launch are independent.  Conflicts at
        shared heightmap cells are resolved via ``cuda.atomic.add``.
        """
        tid = cuda.grid(1)
        n_iterations = random_pos.shape[0]
        if tid >= n_iterations:
            return

        height = heightmap.shape[0]
        width = heightmap.shape[1]
        n_brush = bw.shape[0]

        pos_x = random_pos[tid, 0] * (width - 3) + 1
        pos_y = random_pos[tid, 1] * (height - 3) + 1
        dir_x = 0.0
        dir_y = 0.0
        speed = 1.0
        water = 1.0
        sediment = 0.0

        for step in range(max_lifetime):
            node_x = int(pos_x)
            node_y = int(pos_y)

            if node_x < 1 or node_x >= width - 2:
                return
            if node_y < 1 or node_y >= height - 2:
                return

            fx = pos_x - node_x
            fy = pos_y - node_y

            h00 = heightmap[node_y, node_x]
            h10 = heightmap[node_y, node_x + 1]
            h01 = heightmap[node_y + 1, node_x]
            h11 = heightmap[node_y + 1, node_x + 1]

            grad_x = (h10 - h00) * (1 - fy) + (h11 - h01) * fy
            grad_y = (h01 - h00) * (1 - fx) + (h11 - h10) * fx

            dir_x = dir_x * inertia - grad_x * (1 - inertia)
            dir_y = dir_y * inertia - grad_y * (1 - inertia)

            dir_len = (dir_x * dir_x + dir_y * dir_y) ** 0.5
            if dir_len < 1e-10:
                return
            dir_x /= dir_len
            dir_y /= dir_len

            new_x = pos_x + dir_x
            new_y = pos_y + dir_y

            if new_x < 1 or new_x >= width - 2:
                return
            if new_y < 1 or new_y >= height - 2:
                return

            h_old = (h00 * (1 - fx) * (1 - fy) + h10 * fx * (1 - fy) +
                     h01 * (1 - fx) * fy + h11 * fx * fy)

            new_node_x = int(new_x)
            new_node_y = int(new_y)
            new_fx = new_x - new_node_x
            new_fy = new_y - new_node_y
            h_new = (heightmap[new_node_y, new_node_x] * (1 - new_fx) * (1 - new_fy) +
                     heightmap[new_node_y, new_node_x + 1] * new_fx * (1 - new_fy) +
                     heightmap[new_node_y + 1, new_node_x] * (1 - new_fx) * new_fy +
                     heightmap[new_node_y + 1, new_node_x + 1] * new_fx * new_fy)

            h_diff = h_new - h_old

            neg_h_diff = -h_diff
            if neg_h_diff < min_slope:
                neg_h_diff = min_slope
            sed_capacity = neg_h_diff * speed * water * capacity

            if sediment > sed_capacity or h_diff > 0:
                if h_diff > 0:
                    amount = h_diff if h_diff < sediment else sediment
                else:
                    amount = (sediment - sed_capacity) * deposition

                sediment -= amount

                cuda.atomic.add(heightmap, (node_y, node_x),
                                amount * (1 - fx) * (1 - fy))
                cuda.atomic.add(heightmap, (node_y, node_x + 1),
                                amount * fx * (1 - fy))
                cuda.atomic.add(heightmap, (node_y + 1, node_x),
                                amount * (1 - fx) * fy)
                cuda.atomic.add(heightmap, (node_y + 1, node_x + 1),
                                amount * fx * fy)
            else:
                neg_h = -h_diff
                sc_minus_sed = (sed_capacity - sediment) * erosion_rate
                amount = sc_minus_sed if sc_minus_sed < neg_h else neg_h

                for k in range(n_brush):
                    ey = node_y + boy[k]
                    ex = node_x + box[k]
                    if 0 <= ey < height and 0 <= ex < width:
                        cuda.atomic.add(heightmap, (ey, ex),
                                        -(amount * bw[k]))

                sediment += amount

            speed_sq = speed * speed + h_diff * gravity
            if speed_sq > 0:
                speed = speed_sq ** 0.5
            else:
                speed = 0.0
            water *= (1 - evaporation)

            pos_x = new_x
            pos_y = new_y


def _erode_cupy(data, random_pos_np, boy, box, bw, params):
    """Run erosion on a CuPy array using the CUDA kernel."""
    hm = data.astype(cupy.float64)

    # Transfer brush and random positions to device
    random_pos_d = cupy.asarray(random_pos_np)
    boy_d = cupy.asarray(boy)
    box_d = cupy.asarray(box)
    bw_d = cupy.asarray(bw)

    n_particles = random_pos_np.shape[0]
    threads_per_block = 256
    blocks = (n_particles + threads_per_block - 1) // threads_per_block

    _erode_gpu_kernel[blocks, threads_per_block](
        hm, random_pos_d, boy_d, box_d, bw_d,
        params['inertia'], params['capacity'], params['deposition'],
        params['erosion'], params['evaporation'], params['gravity'],
        params['min_slope'], int(params['max_lifetime']),
    )
    cuda.synchronize()

    return hm.astype(cupy.float32)


def _erode_numpy(data, random_pos, boy, box, bw, params):
    """Run erosion on a NumPy array using the CPU kernel."""
    hm = data.astype(np.float64).copy()

    hm = _erode_cpu(
        hm, random_pos, boy, box, bw,
        params['inertia'], params['capacity'], params['deposition'],
        params['erosion'], params['evaporation'], params['gravity'],
        params['min_slope'], int(params['max_lifetime']),
    )

    return hm.astype(np.float32)


def _check_erosion_memory(data):
    """Raise MemoryError if the array is too large to materialize."""
    estimated = np.prod(data.shape) * data.dtype.itemsize
    # The erosion kernel needs ~3x: input copy + brush scratch + output
    working = estimated * 3
    try:
        from xrspatial.zonal import _available_memory_bytes
        avail = _available_memory_bytes()
    except ImportError:
        avail = 2 * 1024**3
    if working > 0.8 * avail:
        raise MemoryError(
            f"erode() needs ~{working / 1e9:.1f} GB to materialize and "
            f"process the full grid but only ~{avail / 1e9:.1f} GB "
            f"available.  Particle erosion is a global operation that "
            f"cannot be chunked.  Downsample the input or use a machine "
            f"with more RAM."
        )


def _erode_dask_numpy(data, random_pos, boy, box, bw, params):
    """Run erosion on a dask+numpy array.

    Erosion is a global operation (particles traverse the full grid),
    so we materialize to numpy, run the CPU kernel, then re-wrap.
    """
    _check_erosion_memory(data)
    np_data = data.compute()
    result = _erode_numpy(np_data, random_pos, boy, box, bw, params)
    return da.from_array(result, chunks=data.chunksize)


def _erode_dask_cupy(data, random_pos, boy, box, bw, params):
    """Run erosion on a dask+cupy array.

    Materializes to a single CuPy array, runs the GPU kernel, then
    re-wraps as dask.
    """
    _check_erosion_memory(data)
    cp_data = data.compute()  # CuPy ndarray
    result = _erode_cupy(cp_data, random_pos, boy, box, bw, params)
    return da.from_array(result, chunks=data.chunksize,
                         meta=cupy.array((), dtype=cupy.float32))


[docs] def erode(agg, iterations=50000, seed=42, params=None): """Apply particle-based hydraulic erosion to a terrain DataArray. Erosion is a global operation that cannot be chunked, so dask arrays are materialized before processing and re-wrapped afterwards. Parameters ---------- agg : xr.DataArray 2D terrain heightmap. iterations : int Number of water droplets to simulate. seed : int Random seed for droplet placement. params : dict, optional Override default erosion constants. Keys: inertia, capacity, deposition, erosion, evaporation, gravity, min_slope, radius, max_lifetime. Returns ------- xr.DataArray Eroded terrain. """ p = dict(_DEFAULT_PARAMS) if params is not None: p.update(params) # Precompute brush and random positions on the host boy, box, bw = _build_brush(int(p['radius'])) rng = np.random.RandomState(seed) random_pos = rng.random((iterations, 2)) mapper = ArrayTypeFunctionMapping( numpy_func=lambda d: _erode_numpy(d, random_pos, boy, box, bw, p), cupy_func=lambda d: _erode_cupy(d, random_pos, boy, box, bw, p), dask_func=lambda d: _erode_dask_numpy(d, random_pos, boy, box, bw, p), dask_cupy_func=lambda d: _erode_dask_cupy(d, random_pos, boy, box, bw, p), ) result_data = mapper(agg)(agg.data) return xr.DataArray(result_data, dims=agg.dims, coords=agg.coords, attrs=agg.attrs, name=agg.name)