from __future__ import annotations
import numpy as np
import xarray as xr
from numba import jit
try:
import cupy
from numba import cuda
except ImportError:
class cupy(object):
ndarray = False
cuda = None
try:
import dask.array as da
except ImportError:
da = None
from xrspatial.utils import (
ArrayTypeFunctionMapping,
_validate_scalar,
has_cuda_and_cupy,
is_cupy_array,
is_dask_cupy,
)
# Default erosion parameters
_DEFAULT_PARAMS = dict(
inertia=0.05,
capacity=4.0,
deposition=0.3,
erosion=0.3,
evaporation=0.01,
gravity=4.0,
min_slope=0.01,
radius=3,
max_lifetime=30,
)
# Upper bounds on user-controlled iteration / kernel-size parameters.
# These cap the worst-case CPU work and the worst-case host/device
# allocation regardless of raster shape.
#
# - _MAX_ITERATIONS sizes the (iterations, 2) random_pos buffer
# (16 bytes/particle on the host, copied to GPU). 10**8 is ~1.6 GB,
# well above any realistic erosion run (default is 50_000).
# - _MAX_RADIUS bounds the brush size: (2r+1)**2 cells, three arrays.
# At r=1024 the brush arrays total ~50 MB.
# - _MAX_LIFETIME bounds the inner per-particle loop in the JIT kernel.
# At 100_000 with iterations=10**8 the total JIT step count is 10**13,
# which is still excessive; the iteration cap is the binding limit.
_MAX_ITERATIONS = 10**8
_MAX_RADIUS = 1024
_MAX_LIFETIME = 100_000
def _build_brush(radius):
"""Precompute brush offsets and weights for the erosion kernel."""
offsets_y = []
offsets_x = []
weights = []
for dy in range(-radius, radius + 1):
for dx in range(-radius, radius + 1):
dist2 = dx * dx + dy * dy
if dist2 <= radius * radius:
w = max(0.0, radius - dist2 ** 0.5)
offsets_y.append(dy)
offsets_x.append(dx)
weights.append(w)
weight_sum = sum(weights)
boy = np.array(offsets_y, dtype=np.int32)
box = np.array(offsets_x, dtype=np.int32)
bw = np.array([w / weight_sum for w in weights], dtype=np.float64)
return boy, box, bw
@jit(nopython=True, nogil=True)
def _erode_cpu(heightmap, random_pos, boy, box, bw,
inertia, capacity, deposition, erosion_rate,
evaporation, gravity, min_slope, max_lifetime):
"""Particle-based hydraulic erosion on a 2D heightmap (CPU).
random_pos : float64 array of shape (n_iterations, 2) with pre-generated
random starting positions in [0, 1).
"""
height, width = heightmap.shape
n_iterations = random_pos.shape[0]
n_brush = bw.shape[0]
for iteration in range(n_iterations):
pos_x = random_pos[iteration, 0] * (width - 3) + 1
pos_y = random_pos[iteration, 1] * (height - 3) + 1
dir_x = 0.0
dir_y = 0.0
speed = 1.0
water = 1.0
sediment = 0.0
for step in range(max_lifetime):
node_x = int(pos_x)
node_y = int(pos_y)
if node_x < 1 or node_x >= width - 2 or node_y < 1 or node_y >= height - 2:
break
fx = pos_x - node_x
fy = pos_y - node_y
h00 = heightmap[node_y, node_x]
h10 = heightmap[node_y, node_x + 1]
h01 = heightmap[node_y + 1, node_x]
h11 = heightmap[node_y + 1, node_x + 1]
grad_x = (h10 - h00) * (1 - fy) + (h11 - h01) * fy
grad_y = (h01 - h00) * (1 - fx) + (h11 - h10) * fx
dir_x = dir_x * inertia - grad_x * (1 - inertia)
dir_y = dir_y * inertia - grad_y * (1 - inertia)
dir_len = (dir_x * dir_x + dir_y * dir_y) ** 0.5
if dir_len < 1e-10:
break
dir_x /= dir_len
dir_y /= dir_len
new_x = pos_x + dir_x
new_y = pos_y + dir_y
if new_x < 1 or new_x >= width - 2 or new_y < 1 or new_y >= height - 2:
break
h_old = h00 * (1 - fx) * (1 - fy) + h10 * fx * (1 - fy) + \
h01 * (1 - fx) * fy + h11 * fx * fy
new_node_x = int(new_x)
new_node_y = int(new_y)
new_fx = new_x - new_node_x
new_fy = new_y - new_node_y
h_new = (heightmap[new_node_y, new_node_x] * (1 - new_fx) * (1 - new_fy) +
heightmap[new_node_y, new_node_x + 1] * new_fx * (1 - new_fy) +
heightmap[new_node_y + 1, new_node_x] * (1 - new_fx) * new_fy +
heightmap[new_node_y + 1, new_node_x + 1] * new_fx * new_fy)
h_diff = h_new - h_old
sed_capacity = max(-h_diff, min_slope) * speed * water * capacity
if sediment > sed_capacity or h_diff > 0:
if h_diff > 0:
amount = min(h_diff, sediment)
else:
amount = (sediment - sed_capacity) * deposition
sediment -= amount
heightmap[node_y, node_x] += amount * (1 - fx) * (1 - fy)
heightmap[node_y, node_x + 1] += amount * fx * (1 - fy)
heightmap[node_y + 1, node_x] += amount * (1 - fx) * fy
heightmap[node_y + 1, node_x + 1] += amount * fx * fy
else:
amount = min((sed_capacity - sediment) * erosion_rate, -h_diff)
for k in range(n_brush):
ey = node_y + boy[k]
ex = node_x + box[k]
if 0 <= ey < height and 0 <= ex < width:
heightmap[ey, ex] -= amount * bw[k]
sediment += amount
speed_sq = speed * speed + h_diff * gravity
speed = speed_sq ** 0.5 if speed_sq > 0 else 0.0
water *= (1 - evaporation)
pos_x = new_x
pos_y = new_y
return heightmap
# ---------------------------------------------------------------------------
# GPU (CuPy / CUDA) implementation
# ---------------------------------------------------------------------------
if cuda is not None:
@cuda.jit
def _erode_gpu_kernel(
heightmap, random_pos, boy, box, bw,
inertia, capacity, deposition, erosion_rate,
evaporation, gravity, min_slope, max_lifetime,
):
"""One CUDA thread per particle.
Particles within a single launch are independent. Conflicts at
shared heightmap cells are resolved via ``cuda.atomic.add``.
"""
tid = cuda.grid(1)
n_iterations = random_pos.shape[0]
if tid >= n_iterations:
return
height = heightmap.shape[0]
width = heightmap.shape[1]
n_brush = bw.shape[0]
pos_x = random_pos[tid, 0] * (width - 3) + 1
pos_y = random_pos[tid, 1] * (height - 3) + 1
dir_x = 0.0
dir_y = 0.0
speed = 1.0
water = 1.0
sediment = 0.0
for step in range(max_lifetime):
node_x = int(pos_x)
node_y = int(pos_y)
if node_x < 1 or node_x >= width - 2:
return
if node_y < 1 or node_y >= height - 2:
return
fx = pos_x - node_x
fy = pos_y - node_y
h00 = heightmap[node_y, node_x]
h10 = heightmap[node_y, node_x + 1]
h01 = heightmap[node_y + 1, node_x]
h11 = heightmap[node_y + 1, node_x + 1]
grad_x = (h10 - h00) * (1 - fy) + (h11 - h01) * fy
grad_y = (h01 - h00) * (1 - fx) + (h11 - h10) * fx
dir_x = dir_x * inertia - grad_x * (1 - inertia)
dir_y = dir_y * inertia - grad_y * (1 - inertia)
dir_len = (dir_x * dir_x + dir_y * dir_y) ** 0.5
if dir_len < 1e-10:
return
dir_x /= dir_len
dir_y /= dir_len
new_x = pos_x + dir_x
new_y = pos_y + dir_y
if new_x < 1 or new_x >= width - 2:
return
if new_y < 1 or new_y >= height - 2:
return
h_old = (h00 * (1 - fx) * (1 - fy) + h10 * fx * (1 - fy) +
h01 * (1 - fx) * fy + h11 * fx * fy)
new_node_x = int(new_x)
new_node_y = int(new_y)
new_fx = new_x - new_node_x
new_fy = new_y - new_node_y
h_new = (heightmap[new_node_y, new_node_x] * (1 - new_fx) * (1 - new_fy) +
heightmap[new_node_y, new_node_x + 1] * new_fx * (1 - new_fy) +
heightmap[new_node_y + 1, new_node_x] * (1 - new_fx) * new_fy +
heightmap[new_node_y + 1, new_node_x + 1] * new_fx * new_fy)
h_diff = h_new - h_old
neg_h_diff = -h_diff
if neg_h_diff < min_slope:
neg_h_diff = min_slope
sed_capacity = neg_h_diff * speed * water * capacity
if sediment > sed_capacity or h_diff > 0:
if h_diff > 0:
amount = h_diff if h_diff < sediment else sediment
else:
amount = (sediment - sed_capacity) * deposition
sediment -= amount
cuda.atomic.add(heightmap, (node_y, node_x),
amount * (1 - fx) * (1 - fy))
cuda.atomic.add(heightmap, (node_y, node_x + 1),
amount * fx * (1 - fy))
cuda.atomic.add(heightmap, (node_y + 1, node_x),
amount * (1 - fx) * fy)
cuda.atomic.add(heightmap, (node_y + 1, node_x + 1),
amount * fx * fy)
else:
neg_h = -h_diff
sc_minus_sed = (sed_capacity - sediment) * erosion_rate
amount = sc_minus_sed if sc_minus_sed < neg_h else neg_h
for k in range(n_brush):
ey = node_y + boy[k]
ex = node_x + box[k]
if 0 <= ey < height and 0 <= ex < width:
cuda.atomic.add(heightmap, (ey, ex),
-(amount * bw[k]))
sediment += amount
speed_sq = speed * speed + h_diff * gravity
if speed_sq > 0:
speed = speed_sq ** 0.5
else:
speed = 0.0
water *= (1 - evaporation)
pos_x = new_x
pos_y = new_y
def _available_memory_bytes():
"""Best-effort estimate of available host memory in bytes."""
try:
with open('/proc/meminfo', 'r') as f:
for line in f:
if line.startswith('MemAvailable:'):
return int(line.split()[1]) * 1024 # kB -> bytes
except (OSError, ValueError, IndexError):
pass
try:
import psutil
return psutil.virtual_memory().available
except (ImportError, AttributeError):
pass
return 2 * 1024 ** 3
def _check_erosion_memory(shape, dtype, iterations, n_brush):
"""Raise MemoryError if the projected working set would exceed RAM.
Covers four allocations:
- input copy as float64 (numpy path) or float32 output
- random_pos buffer of shape (iterations, 2), float64
- brush arrays of length n_brush (two int32, one float64)
- output cast back to float32
The check fires before any of these are allocated, so callers see
a clean MemoryError instead of an opaque OOM from numpy/cupy.
"""
raster_bytes = int(np.prod(shape)) * np.dtype(dtype).itemsize
# float64 working copy + float32 output ~ 3x raster_bytes for float32
# input (8/4 = 2 for the copy and another 1 for the output cast).
raster_working = raster_bytes * 3
rng_bytes = int(iterations) * 2 * 8 # float64 (iterations, 2)
brush_bytes = int(n_brush) * (4 + 4 + 8) # two int32 + one float64
required = raster_working + rng_bytes + brush_bytes
avail = _available_memory_bytes()
if required > 0.8 * avail:
raise MemoryError(
f"erode() needs ~{required / 1e9:.1f} GB of working memory "
f"(raster ~{raster_working / 1e9:.2f} GB, "
f"random_pos ~{rng_bytes / 1e9:.2f} GB, "
f"brush ~{brush_bytes / 1e9:.2f} GB) but only "
f"~{avail / 1e9:.1f} GB is available. "
f"Reduce `iterations`, `params['radius']`, or downsample "
f"the input."
)
def _erode_cupy(data, random_pos_np, boy, box, bw, params):
"""Run erosion on a CuPy array using the CUDA kernel."""
_check_erosion_memory(data.shape, data.dtype,
random_pos_np.shape[0], bw.shape[0])
hm = data.astype(cupy.float64)
# Transfer brush and random positions to device
random_pos_d = cupy.asarray(random_pos_np)
boy_d = cupy.asarray(boy)
box_d = cupy.asarray(box)
bw_d = cupy.asarray(bw)
n_particles = random_pos_np.shape[0]
threads_per_block = 256
blocks = (n_particles + threads_per_block - 1) // threads_per_block
_erode_gpu_kernel[blocks, threads_per_block](
hm, random_pos_d, boy_d, box_d, bw_d,
params['inertia'], params['capacity'], params['deposition'],
params['erosion'], params['evaporation'], params['gravity'],
params['min_slope'], int(params['max_lifetime']),
)
cuda.synchronize()
return hm.astype(cupy.float32)
def _erode_numpy(data, random_pos, boy, box, bw, params):
"""Run erosion on a NumPy array using the CPU kernel."""
_check_erosion_memory(data.shape, data.dtype,
random_pos.shape[0], bw.shape[0])
hm = data.astype(np.float64).copy()
hm = _erode_cpu(
hm, random_pos, boy, box, bw,
params['inertia'], params['capacity'], params['deposition'],
params['erosion'], params['evaporation'], params['gravity'],
params['min_slope'], int(params['max_lifetime']),
)
return hm.astype(np.float32)
def _erode_dask_numpy(data, random_pos, boy, box, bw, params):
"""Run erosion on a dask+numpy array.
Erosion is a global operation (particles traverse the full grid),
so we materialize to numpy, run the CPU kernel, then re-wrap.
"""
np_data = data.compute()
result = _erode_numpy(np_data, random_pos, boy, box, bw, params)
return da.from_array(result, chunks=data.chunksize)
def _erode_dask_cupy(data, random_pos, boy, box, bw, params):
"""Run erosion on a dask+cupy array.
Materializes to a single CuPy array, runs the GPU kernel, then
re-wraps as dask.
"""
cp_data = data.compute() # CuPy ndarray
result = _erode_cupy(cp_data, random_pos, boy, box, bw, params)
return da.from_array(result, chunks=data.chunksize,
meta=cupy.array((), dtype=cupy.float32))
[docs]
def erode(agg, iterations=50000, seed=42, params=None):
"""Apply particle-based hydraulic erosion to a terrain DataArray.
Erosion is a global operation that cannot be chunked, so dask arrays
are materialized before processing and re-wrapped afterwards.
Parameters
----------
agg : xr.DataArray
2D terrain heightmap.
iterations : int
Number of water droplets to simulate. Capped at
``_MAX_ITERATIONS`` (1e8) to keep host and device allocations
bounded.
seed : int
Random seed for droplet placement.
params : dict, optional
Override default erosion constants. Keys:
inertia, capacity, deposition, erosion, evaporation,
gravity, min_slope, radius, max_lifetime.
``radius`` is capped at ``_MAX_RADIUS`` (1024) and
``max_lifetime`` at ``_MAX_LIFETIME`` (1e5).
Returns
-------
xr.DataArray
Eroded terrain.
Raises
------
ValueError
If `iterations`, `params['radius']`, or `params['max_lifetime']`
is outside the allowed range.
MemoryError
If the projected working set exceeds available memory.
"""
_validate_scalar(
iterations, func_name='erode', name='iterations',
dtype=int, min_val=1, max_val=_MAX_ITERATIONS,
)
p = dict(_DEFAULT_PARAMS)
if params is not None:
p.update(params)
_validate_scalar(
p['radius'], func_name='erode', name="params['radius']",
dtype=int, min_val=1, max_val=_MAX_RADIUS,
)
_validate_scalar(
p['max_lifetime'], func_name='erode', name="params['max_lifetime']",
dtype=int, min_val=1, max_val=_MAX_LIFETIME,
)
# Precompute brush and random positions on the host
boy, box, bw = _build_brush(int(p['radius']))
rng = np.random.RandomState(seed)
random_pos = rng.random((iterations, 2))
mapper = ArrayTypeFunctionMapping(
numpy_func=lambda d: _erode_numpy(d, random_pos, boy, box, bw, p),
cupy_func=lambda d: _erode_cupy(d, random_pos, boy, box, bw, p),
dask_func=lambda d: _erode_dask_numpy(d, random_pos, boy, box, bw, p),
dask_cupy_func=lambda d: _erode_dask_cupy(d, random_pos, boy, box, bw, p),
)
result_data = mapper(agg)(agg.data)
return xr.DataArray(result_data, dims=agg.dims, coords=agg.coords,
attrs=agg.attrs, name=agg.name)