Source code for xrspatial.terrain

from __future__ import annotations

# std lib
from functools import partial
from typing import Dict, List, Optional, Tuple, Union

# 3rd-party
import numpy as np
import xarray as xr

try:
    import cupy
except ImportError:
    class cupy(object):
        ndarray = False

try:
    import dask
    import dask.array as da
except ImportError:
    dask = None
    da = None

# local modules
from xrspatial.utils import (ArrayTypeFunctionMapping, _validate_raster, cuda_args,
                             get_dataarray_resolution, not_implemented_func)

# Scratch-buffer budget: 5-7 same-shape float32 arrays plus headroom.
# 24 bytes/pixel covers (data, x, y, warp_x, warp_y, weight) at 4 B each.
_SCRATCH_BYTES_PER_PIXEL = 24
_GPU_SCRATCH_BYTES_PER_PIXEL = 24


def _available_memory_bytes():
    """Best-effort estimate of available host memory in bytes."""
    try:
        with open('/proc/meminfo', 'r') as f:
            for line in f:
                if line.startswith('MemAvailable:'):
                    return int(line.split()[1]) * 1024
    except (OSError, ValueError, IndexError):
        pass
    try:
        import psutil
        return psutil.virtual_memory().available
    except (ImportError, AttributeError):
        pass
    return 2 * 1024 ** 3


def _available_gpu_memory_bytes():
    """Free GPU memory in bytes, or 0 when CUDA is unavailable."""
    try:
        import cupy as _cp
        free, _total = _cp.cuda.runtime.memGetInfo()
        return int(free)
    except Exception:
        return 0


def _check_memory(height, width):
    """Raise MemoryError if scratch buffers would exceed 50% of RAM."""
    required = int(height) * int(width) * _SCRATCH_BYTES_PER_PIXEL
    available = _available_memory_bytes()
    if required > 0.5 * available:
        raise MemoryError(
            f"generate_terrain on a {height}x{width} grid requires "
            f"~{required / 1e9:.1f} GB of scratch memory but only "
            f"~{available / 1e9:.1f} GB is available.  Use a "
            f"dask-backed DataArray for out-of-core processing."
        )


def _check_gpu_memory(height, width):
    """Raise MemoryError if scratch buffers would exceed 50% of free VRAM."""
    available = _available_gpu_memory_bytes()
    if available <= 0:
        return
    required = int(height) * int(width) * _GPU_SCRATCH_BYTES_PER_PIXEL
    if required > 0.5 * available:
        raise MemoryError(
            f"generate_terrain on a {height}x{width} grid requires "
            f"~{required / 1e9:.1f} GB of GPU scratch memory but only "
            f"~{available / 1e9:.1f} GB is free on the active device.  "
            f"Use a dask+cupy DataArray for out-of-core processing."
        )

from .perlin import _make_perm_table, _perlin, _perlin_gpu, _perlin_gpu_xy
from .worley import _worley_cpu, _worley_numpy_xy, _worley_gpu, _worley_gpu_xy


def _scale(value, old_range, new_range):
    d = (value - old_range[0]) / (old_range[1] - old_range[0])
    return d * (new_range[1] - new_range[0]) + new_range[0]


# ---------------------------------------------------------------------------
# numpy backend
# ---------------------------------------------------------------------------

def _gen_terrain(height_map, seed, x_range=(0, 1), y_range=(0, 1),
                 octaves=16, lacunarity=2.0, persistence=0.5,
                 noise_mode='fbm', warp_strength=0.0, warp_octaves=4,
                 worley_blend=0.0, worley_seed=None,
                 worley_norm_range=None):
    height, width = height_map.shape

    linx = np.linspace(
        x_range[0], x_range[1], width, endpoint=False, dtype=np.float32
    )
    liny = np.linspace(
        y_range[0], y_range[1], height, endpoint=False, dtype=np.float32
    )
    x, y = np.meshgrid(linx, liny)

    # --- domain warping ---
    if warp_strength > 0:
        warp_x = np.zeros_like(x)
        warp_y = np.zeros_like(y)
        for wi in range(warp_octaves):
            w_amp = persistence ** wi
            w_freq = lacunarity ** wi
            p_wx = _make_perm_table(seed + 100 + wi)
            p_wy = _make_perm_table(seed + 200 + wi)
            warp_x += _perlin(p_wx, x * w_freq, y * w_freq) * w_amp
            warp_y += _perlin(p_wy, x * w_freq, y * w_freq) * w_amp
        warp_norm = sum(persistence ** i for i in range(warp_octaves))
        warp_x /= warp_norm
        warp_y /= warp_norm
        x = x + warp_x * warp_strength
        y = y + warp_y * warp_strength

    # --- octave noise loop ---
    norm = sum(persistence ** i for i in range(octaves))

    if noise_mode == 'ridged':
        weight = np.ones((height, width), dtype=np.float32)
        for i in range(octaves):
            amp = persistence ** i
            freq = lacunarity ** i
            p = _make_perm_table(seed + i)
            noise = _perlin(p, x * freq, y * freq)
            noise = 1.0 - np.abs(noise)
            noise = noise * noise
            noise *= weight
            weight = np.clip(noise, 0, 1)
            height_map += noise * amp
    else:  # fbm
        for i in range(octaves):
            amp = persistence ** i
            freq = lacunarity ** i
            p = _make_perm_table(seed + i)
            noise = _perlin(p, x * freq, y * freq) * amp
            height_map += noise

    height_map /= norm

    # --- worley blending ---
    if worley_blend > 0:
        if worley_seed is None:
            worley_seed = seed + 1000
        w_p = _make_perm_table(worley_seed)
        w_noise = _worley_numpy_xy(w_p, x, y)
        if worley_norm_range is not None:
            w_min, w_max = worley_norm_range
            w_ptp = w_max - w_min
        else:
            w_min = w_noise.min()
            w_ptp = w_noise.max() - w_min
        if w_ptp > 0:
            w_noise = (w_noise - w_min) / w_ptp
        height_map = height_map * (1 - worley_blend) + w_noise * worley_blend

    height_map = height_map ** 3
    return height_map


def _terrain_numpy(data, seed, x_range_scaled, y_range_scaled, zfactor,
                   octaves, lacunarity, persistence, noise_mode,
                   warp_strength, warp_octaves,
                   worley_blend, worley_seed):
    _check_memory(data.shape[0], data.shape[1])
    data = data * 0
    data[:] = _gen_terrain(
        data, seed, x_range=x_range_scaled, y_range=y_range_scaled,
        octaves=octaves, lacunarity=lacunarity, persistence=persistence,
        noise_mode=noise_mode, warp_strength=warp_strength,
        warp_octaves=warp_octaves, worley_blend=worley_blend,
        worley_seed=worley_seed,
    )

    data = np.clip(data, -1, 1)
    data = (data + 1) / 2
    data[data < 0.3] = 0  # create water
    data *= zfactor

    return data


# ---------------------------------------------------------------------------
# dask + numpy backend
# ---------------------------------------------------------------------------

def _terrain_dask_numpy(data, seed, x_range_scaled, y_range_scaled, zfactor,
                        octaves, lacunarity, persistence, noise_mode,
                        warp_strength, warp_octaves,
                        worley_blend, worley_seed):
    """Inline the entire terrain computation into a single map_blocks call.

    Each chunk computes its own warp + octave + worley pipeline independently
    using _gen_terrain.  This keeps the dask graph shallow (one task per chunk)
    instead of building per-octave intermediate arrays that blow up memory.
    """
    height, width = data.shape

    # --- worley pre-pass: get global min/max to avoid per-chunk seams ---
    w_norm_range = None
    if worley_blend > 0:
        _w_seed = worley_seed if worley_seed is not None else seed + 1000

        def _chunk_worley_raw(block, block_info=None):
            info = block_info[0]
            y_start, y_end = info['array-location'][0]
            x_start, x_end = info['array-location'][1]
            x0 = x_range_scaled[0] + (x_range_scaled[1] - x_range_scaled[0]) * x_start / width
            x1 = x_range_scaled[0] + (x_range_scaled[1] - x_range_scaled[0]) * x_end / width
            y0 = y_range_scaled[0] + (y_range_scaled[1] - y_range_scaled[0]) * y_start / height
            y1 = y_range_scaled[0] + (y_range_scaled[1] - y_range_scaled[0]) * y_end / height

            h, w = block.shape
            linx = np.linspace(x0, x1, w, endpoint=False, dtype=np.float32)
            liny = np.linspace(y0, y1, h, endpoint=False, dtype=np.float32)
            x_arr, y_arr = np.meshgrid(linx, liny)

            if warp_strength > 0:
                warp_x = np.zeros((h, w), dtype=np.float32)
                warp_y = np.zeros((h, w), dtype=np.float32)
                for wi in range(warp_octaves):
                    w_amp = persistence ** wi
                    w_freq = lacunarity ** wi
                    p_wx = _make_perm_table(seed + 100 + wi)
                    p_wy = _make_perm_table(seed + 200 + wi)
                    warp_x += _perlin(p_wx, x_arr * w_freq, y_arr * w_freq) * w_amp
                    warp_y += _perlin(p_wy, x_arr * w_freq, y_arr * w_freq) * w_amp
                warp_norm = sum(persistence ** i for i in range(warp_octaves))
                warp_x /= warp_norm
                warp_y /= warp_norm
                x_arr = x_arr + warp_x * warp_strength
                y_arr = y_arr + warp_y * warp_strength

            w_p = _make_perm_table(_w_seed)
            return _worley_numpy_xy(w_p, x_arr, y_arr)

        raw_worley = da.map_blocks(
            _chunk_worley_raw, data, dtype=np.float32,
            meta=np.array((), dtype=np.float32),
        )
        (raw_worley,) = dask.persist(raw_worley)
        w_min, w_max = dask.compute(da.min(raw_worley), da.max(raw_worley))
        w_norm_range = (float(w_min), float(w_max))
        del raw_worley

    # _chunk_terrain ignores the input block values — it creates its own
    # np.zeros internally.  We pass `data` only so map_blocks inherits its
    # chunk structure and block_info coordinates.
    def _chunk_terrain(block, block_info=None):
        info = block_info[0]
        y_start, y_end = info['array-location'][0]
        x_start, x_end = info['array-location'][1]
        x0 = x_range_scaled[0] + (x_range_scaled[1] - x_range_scaled[0]) * x_start / width
        x1 = x_range_scaled[0] + (x_range_scaled[1] - x_range_scaled[0]) * x_end / width
        y0 = y_range_scaled[0] + (y_range_scaled[1] - y_range_scaled[0]) * y_start / height
        y1 = y_range_scaled[0] + (y_range_scaled[1] - y_range_scaled[0]) * y_end / height

        out = np.zeros(block.shape, dtype=np.float32)
        out[:] = _gen_terrain(
            out, seed, x_range=(x0, x1), y_range=(y0, y1),
            octaves=octaves, lacunarity=lacunarity,
            persistence=persistence, noise_mode=noise_mode,
            warp_strength=warp_strength, warp_octaves=warp_octaves,
            worley_blend=worley_blend, worley_seed=worley_seed,
            worley_norm_range=w_norm_range,
        )
        np.clip(out, -1, 1, out=out)
        out = (out + 1) / 2
        out[out < 0.3] = 0
        out *= zfactor
        return out

    return da.map_blocks(_chunk_terrain, data, dtype=np.float32,
                         meta=np.array((), dtype=np.float32))


# ---------------------------------------------------------------------------
# cupy (GPU) backend
# ---------------------------------------------------------------------------

def _terrain_gpu(height_map, seed, x_range=(0, 1), y_range=(0, 1),
                 octaves=16, lacunarity=2.0, persistence=0.5,
                 noise_mode='fbm', warp_strength=0.0, warp_octaves=4,
                 worley_blend=0.0, worley_seed=None,
                 worley_norm_range=None):

    h, w = height_map.shape
    griddim, blockdim = cuda_args(height_map.shape)
    noise = cupy.empty_like(height_map, dtype=cupy.float32)

    # coordinate arrays (needed if warping or worley with xy kernels)
    use_xy_kernel = (warp_strength > 0)
    x_arr = None
    y_arr = None

    if use_xy_kernel or worley_blend > 0:
        linx = cupy.linspace(x_range[0], x_range[1], w,
                             endpoint=False, dtype=cupy.float32)
        liny = cupy.linspace(y_range[0], y_range[1], h,
                             endpoint=False, dtype=cupy.float32)
        y_arr, x_arr = cupy.meshgrid(liny, linx, indexing='ij')

    # --- domain warping ---
    # pre-allocate reusable buffers for scaled coordinates (GPU)
    if use_xy_kernel:
        scaled_x = cupy.empty_like(x_arr)
        scaled_y = cupy.empty_like(y_arr)

    if warp_strength > 0:
        warp_x = cupy.zeros((h, w), dtype=cupy.float32)
        warp_y = cupy.zeros((h, w), dtype=cupy.float32)
        tmp = cupy.empty_like(noise)

        for wi in range(warp_octaves):
            w_amp = persistence ** wi
            w_freq = lacunarity ** wi
            p_wx = cupy.asarray(_make_perm_table(seed + 100 + wi))
            p_wy = cupy.asarray(_make_perm_table(seed + 200 + wi))

            cupy.multiply(x_arr, w_freq, out=scaled_x)
            cupy.multiply(y_arr, w_freq, out=scaled_y)

            _perlin_gpu_xy[griddim, blockdim](
                p_wx, scaled_x, scaled_y, 1.0, tmp
            )
            warp_x += tmp * w_amp

            _perlin_gpu_xy[griddim, blockdim](
                p_wy, scaled_x, scaled_y, 1.0, tmp
            )
            warp_y += tmp * w_amp

        warp_norm = sum(persistence ** i for i in range(warp_octaves))
        warp_x /= warp_norm
        warp_y /= warp_norm
        x_arr = x_arr + warp_x * warp_strength
        y_arr = y_arr + warp_y * warp_strength

    # --- octave loop ---
    norm = sum(persistence ** i for i in range(octaves))

    if noise_mode == 'ridged':
        weight = cupy.ones((h, w), dtype=cupy.float32)
        for i in range(octaves):
            amp = persistence ** i
            freq = lacunarity ** i
            p = cupy.asarray(_make_perm_table(seed + i))

            if use_xy_kernel:
                cupy.multiply(x_arr, freq, out=scaled_x)
                cupy.multiply(y_arr, freq, out=scaled_y)
                _perlin_gpu_xy[griddim, blockdim](
                    p, scaled_x, scaled_y, 1.0, noise
                )
            else:
                _perlin_gpu[griddim, blockdim](
                    p, x_range[0] * freq, x_range[1] * freq,
                    y_range[0] * freq, y_range[1] * freq, 1.0, noise
                )

            noise_val = 1.0 - cupy.abs(noise)
            noise_val = noise_val * noise_val
            noise_val *= weight
            weight = cupy.clip(noise_val, 0, 1)
            height_map += noise_val * amp
    else:  # fbm
        for i in range(octaves):
            amp = persistence ** i
            freq = lacunarity ** i
            p = cupy.asarray(_make_perm_table(seed + i))

            if use_xy_kernel:
                cupy.multiply(x_arr, freq, out=scaled_x)
                cupy.multiply(y_arr, freq, out=scaled_y)
                _perlin_gpu_xy[griddim, blockdim](
                    p, scaled_x, scaled_y, amp, noise
                )
            else:
                _perlin_gpu[griddim, blockdim](
                    p, x_range[0] * freq, x_range[1] * freq,
                    y_range[0] * freq, y_range[1] * freq, amp, noise
                )

            height_map += noise

    height_map /= norm

    # --- worley blending ---
    if worley_blend > 0:
        if worley_seed is None:
            worley_seed = seed + 1000
        w_p = cupy.asarray(_make_perm_table(worley_seed))
        w_noise = cupy.empty_like(height_map)

        if x_arr is not None:
            _worley_gpu_xy[griddim, blockdim](w_p, x_arr, y_arr, w_noise)
        else:
            _worley_gpu[griddim, blockdim](
                w_p, x_range[0], x_range[1], y_range[0], y_range[1], w_noise
            )

        if worley_norm_range is not None:
            w_min, w_max = worley_norm_range
            w_ptp = w_max - w_min
        else:
            w_min = cupy.amin(w_noise)
            w_ptp = cupy.amax(w_noise) - w_min
        if float(w_ptp) > 0:
            w_noise = (w_noise - w_min) / w_ptp
        height_map = height_map * (1 - worley_blend) + w_noise * worley_blend

    height_map = height_map ** 3
    return height_map


def _terrain_cupy(data, seed, x_range_scaled, y_range_scaled, zfactor,
                  octaves, lacunarity, persistence, noise_mode,
                  warp_strength, warp_octaves,
                  worley_blend, worley_seed):
    _check_gpu_memory(data.shape[0], data.shape[1])
    data = data * 0
    data[:] = _terrain_gpu(
        data, seed, x_range=x_range_scaled, y_range=y_range_scaled,
        octaves=octaves, lacunarity=lacunarity, persistence=persistence,
        noise_mode=noise_mode, warp_strength=warp_strength,
        warp_octaves=warp_octaves, worley_blend=worley_blend,
        worley_seed=worley_seed,
    )
    data = cupy.clip(data, -1, 1)
    data[:] = (data + 1) / 2
    data[data < 0.3] = 0  # create water
    data *= zfactor
    return data


# ---------------------------------------------------------------------------
# dask + cupy backend
# ---------------------------------------------------------------------------

def _terrain_dask_cupy(data, seed, x_range_scaled, y_range_scaled, zfactor,
                       octaves, lacunarity, persistence, noise_mode,
                       warp_strength, warp_octaves,
                       worley_blend, worley_seed):
    """Inline the entire terrain computation into a single map_blocks call.

    Each chunk computes its own warp + octave + worley pipeline independently
    using the GPU kernels.  When worley blending is enabled, a pre-pass
    computes global worley noise range so normalization is consistent across
    chunk boundaries.
    """
    data = data * 0
    height, width = data.shape

    # --- worley pre-pass: get global min/max to avoid per-chunk seams ---
    w_norm_range = None
    if worley_blend > 0:
        _w_seed = worley_seed if worley_seed is not None else seed + 1000

        def _chunk_worley_raw(block, block_info=None):
            """Compute raw worley noise for this chunk (with warp)."""
            info = block_info[0]
            y_start, y_end = info['array-location'][0]
            x_start, x_end = info['array-location'][1]
            x0 = x_range_scaled[0] + (x_range_scaled[1] - x_range_scaled[0]) * x_start / width
            x1 = x_range_scaled[0] + (x_range_scaled[1] - x_range_scaled[0]) * x_end / width
            y0 = y_range_scaled[0] + (y_range_scaled[1] - y_range_scaled[0]) * y_start / height
            y1 = y_range_scaled[0] + (y_range_scaled[1] - y_range_scaled[0]) * y_end / height

            h, w = block.shape
            griddim, blockdim = cuda_args(block.shape)

            linx = cupy.linspace(x0, x1, w, endpoint=False, dtype=cupy.float32)
            liny = cupy.linspace(y0, y1, h, endpoint=False, dtype=cupy.float32)
            y_arr, x_arr = cupy.meshgrid(liny, linx, indexing='ij')

            # apply the same warp as _terrain_gpu
            if warp_strength > 0:
                warp_x = cupy.zeros((h, w), dtype=cupy.float32)
                warp_y = cupy.zeros((h, w), dtype=cupy.float32)
                tmp = cupy.empty((h, w), dtype=cupy.float32)
                scaled_x = cupy.empty_like(x_arr)
                scaled_y = cupy.empty_like(y_arr)

                for wi in range(warp_octaves):
                    w_amp = persistence ** wi
                    w_freq = lacunarity ** wi
                    p_wx = cupy.asarray(_make_perm_table(seed + 100 + wi))
                    p_wy = cupy.asarray(_make_perm_table(seed + 200 + wi))
                    cupy.multiply(x_arr, w_freq, out=scaled_x)
                    cupy.multiply(y_arr, w_freq, out=scaled_y)
                    _perlin_gpu_xy[griddim, blockdim](p_wx, scaled_x, scaled_y, 1.0, tmp)
                    warp_x += tmp * w_amp
                    _perlin_gpu_xy[griddim, blockdim](p_wy, scaled_x, scaled_y, 1.0, tmp)
                    warp_y += tmp * w_amp

                warp_norm = sum(persistence ** i for i in range(warp_octaves))
                warp_x /= warp_norm
                warp_y /= warp_norm
                x_arr = x_arr + warp_x * warp_strength
                y_arr = y_arr + warp_y * warp_strength

            w_p = cupy.asarray(_make_perm_table(_w_seed))
            w_noise = cupy.empty((h, w), dtype=cupy.float32)
            _worley_gpu_xy[griddim, blockdim](w_p, x_arr, y_arr, w_noise)
            return w_noise

        raw_worley = da.map_blocks(
            _chunk_worley_raw, data, dtype=cupy.float32,
            meta=cupy.array((), dtype=cupy.float32),
        )
        (raw_worley,) = dask.persist(raw_worley)
        w_min, w_max = dask.compute(da.min(raw_worley), da.max(raw_worley))
        w_norm_range = (float(w_min), float(w_max))
        del raw_worley

    def _chunk_terrain(block, block_info=None):
        info = block_info[0]
        y_start, y_end = info['array-location'][0]
        x_start, x_end = info['array-location'][1]
        x0 = x_range_scaled[0] + (x_range_scaled[1] - x_range_scaled[0]) * x_start / width
        x1 = x_range_scaled[0] + (x_range_scaled[1] - x_range_scaled[0]) * x_end / width
        y0 = y_range_scaled[0] + (y_range_scaled[1] - y_range_scaled[0]) * y_start / height
        y1 = y_range_scaled[0] + (y_range_scaled[1] - y_range_scaled[0]) * y_end / height

        out = cupy.zeros(block.shape, dtype=cupy.float32)
        out[:] = _terrain_gpu(
            out, seed, x_range=(x0, x1), y_range=(y0, y1),
            octaves=octaves, lacunarity=lacunarity,
            persistence=persistence, noise_mode=noise_mode,
            warp_strength=warp_strength, warp_octaves=warp_octaves,
            worley_blend=worley_blend, worley_seed=worley_seed,
            worley_norm_range=w_norm_range,
        )
        return out

    data = da.map_blocks(_chunk_terrain, data, dtype=cupy.float32,
                         meta=cupy.array((), dtype=cupy.float32))

    data = da.clip(data, -1, 1)
    data = (data + 1) / 2
    data = da.where(data < 0.3, 0, data)
    data *= zfactor
    return data


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------

[docs] def generate_terrain(agg: xr.DataArray, x_range: tuple = (0, 500), y_range: tuple = (0, 500), seed: int = 10, zfactor: int = 4000, full_extent: Optional[Union[Tuple, List]] = None, name: str = 'terrain', # enhanced terrain parameters octaves: Optional[int] = 16, lacunarity: float = 2.0, persistence: float = 0.5, noise_mode: str = 'fbm', warp_strength: float = 0.0, warp_octaves: int = 4, worley_blend: float = 0.0, worley_seed: Optional[int] = None, erode: bool = False, erosion_iterations: int = 50000, erosion_params: Optional[Dict] = None, ) -> xr.DataArray: """ Generate pseudo-random terrain for testing raster functions. Parameters ---------- agg : xr.DataArray 2D template array (determines height, width, and backend). x_range : tuple, default=(0, 500) Range of x values. y_range : tuple, default=(0, 500) Range of y values. seed : int, default=10 Seed for random number generator. zfactor : int, default=4000 Multiplier for z values. full_extent : tuple or list, optional bbox (xmin, ymin, xmax, ymax). Full extent of coordinate system. name : str, default='terrain' Name for the output DataArray. octaves : int or None, default=16 Number of noise octaves. None = adaptive based on raster size. lacunarity : float, default=2.0 Frequency multiplier per octave. persistence : float, default=0.5 Amplitude multiplier per octave. noise_mode : str, default='fbm' Noise algorithm: 'fbm' (fractal Brownian motion) or 'ridged' (ridged multifractal for sharp mountain ridges). warp_strength : float, default=0.0 Domain warping intensity. 0 disables warping; ~0.5 produces organic flowing features. warp_octaves : int, default=4 Octaves used for the warp displacement fields. worley_blend : float, default=0.0 Blend factor for Worley (cellular) noise. 0 = none; 0.1-0.3 adds rocky micro-texture. worley_seed : int or None, default=None Seed for Worley noise. None = seed + 1000. erode : bool, default=False Apply hydraulic erosion post-pass. erosion_iterations : int, default=50000 Number of water droplets for erosion. erosion_params : dict, optional Override default erosion constants. Returns ------- terrain : xr.DataArray 2D array of generated terrain values. References ---------- - Michael McHugh: https://www.youtube.com/watch?v=O33YV4ooHSo - Red Blob Games: https://www.redblobgames.com/maps/terrain-from-noise/ """ _validate_raster(agg, func_name='generate_terrain', name='agg') height, width = agg.shape # --- validate noise_mode --- if noise_mode not in ('fbm', 'ridged'): raise ValueError( f"noise_mode must be 'fbm' or 'ridged', got {noise_mode!r}" ) # --- adaptive octaves --- if octaves is None: octaves = max(1, int(np.ceil(np.log2(min(height, width))))) if octaves < 1: raise ValueError(f"octaves must be >= 1, got {octaves}") if not (np.isfinite(lacunarity) and lacunarity > 0): raise ValueError( f"lacunarity must be a positive finite number, got {lacunarity!r}" ) if not (np.isfinite(persistence) and persistence > 0): raise ValueError( f"persistence must be a positive finite number, got {persistence!r}" ) if full_extent is None: full_extent = (x_range[0], y_range[0], x_range[1], y_range[1]) elif not isinstance(full_extent, (list, tuple)) and len(full_extent) != 4: raise TypeError('full_extent must be tuple(4)') full_xrange = (full_extent[0], full_extent[2]) full_yrange = (full_extent[1], full_extent[3]) x_range_scaled = (_scale(x_range[0], full_xrange, (0.0, 1.0)), _scale(x_range[1], full_xrange, (0.0, 1.0))) y_range_scaled = (_scale(y_range[0], full_yrange, (0.0, 1.0)), _scale(y_range[1], full_yrange, (0.0, 1.0))) mapper = ArrayTypeFunctionMapping( numpy_func=_terrain_numpy, cupy_func=_terrain_cupy, dask_func=_terrain_dask_numpy, dask_cupy_func=_terrain_dask_cupy ) out = mapper(agg)(agg.data, seed, x_range_scaled, y_range_scaled, zfactor, octaves, lacunarity, persistence, noise_mode, warp_strength, warp_octaves, worley_blend, worley_seed) # Build pixel-center coordinates (matches datashader Canvas convention) dx = (x_range[1] - x_range[0]) / width dy = (y_range[1] - y_range[0]) / height xs = np.linspace(x_range[0] + dx / 2, x_range[1] - dx / 2, width) ys = np.linspace(y_range[0] + dy / 2, y_range[1] - dy / 2, height) result = xr.DataArray(out, name=name, coords={'y': ys, 'x': xs}, dims=['y', 'x'], attrs={'res': (dx, dy)}) # --- hydraulic erosion --- if erode: from xrspatial.erosion import erode as _erode result = _erode(result, iterations=erosion_iterations, seed=seed, params=erosion_params) result.name = name return result