Source code for xrspatial.bump

from typing import Optional

import numpy as np
import xarray as xr
from xarray import DataArray

try:
    import cupy
except ImportError:
    class cupy(object):
        ndarray = False

try:
    import dask.array as da
except ImportError:
    da = None

from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    _validate_scalar,
    has_cuda_and_cupy,
    is_cupy_array,
    ngjit,
)

# Upper bound on bump count to prevent accidental OOM from the default
# w*h//10 heuristic.  16 bytes per bump (int32 loc pair + float64 height),
# so 10M bumps ~ 160 MB.
_MAX_DEFAULT_COUNT = 10_000_000


def _available_memory_bytes():
    """Best-effort estimate of available memory in bytes."""
    try:
        with open('/proc/meminfo', 'r') as f:
            for line in f:
                if line.startswith('MemAvailable:'):
                    return int(line.split()[1]) * 1024
    except (OSError, ValueError, IndexError):
        pass
    try:
        import psutil
        return psutil.virtual_memory().available
    except (ImportError, AttributeError):
        pass
    return 2 * 1024 ** 3


@ngjit
def _finish_bump(width, height, locs, heights, spread):
    out = np.zeros((height, width))
    rows, cols = out.shape
    s = spread ** 2  # removed sqrt for perf.
    for i in range(len(heights)):
        x = locs[i][0]
        y = locs[i][1]
        z = heights[i]
        out[y, x] = out[y, x] + z
        if s > 0:
            for nx in range(max(x - spread, 0), min(x + spread + 1, width)):
                for ny in range(max(y - spread, 0), min(y + spread + 1, height)):
                    d2 = (nx - x) * (nx - x) + (ny - y) * (ny - y)
                    if d2 <= s:
                        out[ny, nx] = out[ny, nx] + (out[y, x] * ((s - d2) / s))
    return out


def _bump_numpy(data, width, height, locs, heights, spread):
    return _finish_bump(width, height, locs, heights, spread)


def _bump_cupy(data, width, height, locs, heights, spread):
    return cupy.asarray(_finish_bump(width, height, locs, heights, spread))


def _partition_bumps(data, locs, heights, spread):
    """Split bumps into per-chunk subsets so closures stay small.

    Returns a dict mapping ``(yi, xi)`` chunk indices to
    ``(local_locs, local_heights)`` pairs (or *None* when a chunk has
    no bumps).  Coordinates in *local_locs* are relative to the chunk
    origin.  Bumps whose spread overlaps a chunk boundary are assigned
    to the chunk that contains their centre pixel.
    """
    y_offsets = np.concatenate([[0], np.cumsum(data.chunks[0])])
    x_offsets = np.concatenate([[0], np.cumsum(data.chunks[1])])
    ny = len(data.chunks[0])
    nx = len(data.chunks[1])

    partitions = {}
    for yi in range(ny):
        y0, y1 = int(y_offsets[yi]), int(y_offsets[yi + 1])
        for xi in range(nx):
            x0, x1 = int(x_offsets[xi]), int(x_offsets[xi + 1])
            mask = ((locs[:, 0] >= x0) & (locs[:, 0] < x1) &
                    (locs[:, 1] >= y0) & (locs[:, 1] < y1))
            if np.any(mask):
                local_locs = locs[mask].copy()
                local_locs[:, 0] -= x0
                local_locs[:, 1] -= y0
                partitions[(yi, xi)] = (local_locs, heights[mask].copy())
            else:
                partitions[(yi, xi)] = None
    return partitions


def _bump_dask_numpy(data, width, height, locs, heights, spread):
    import dask

    partitions = _partition_bumps(data, locs, heights, spread)
    rows = []
    for yi, ch in enumerate(data.chunks[0]):
        row = []
        for xi, cw in enumerate(data.chunks[1]):
            ch_int, cw_int = int(ch), int(cw)
            part = partitions[(yi, xi)]
            if part is None:
                row.append(da.zeros((ch_int, cw_int),
                                    chunks=(ch_int, cw_int),
                                    dtype=np.float64))
            else:
                p_locs, p_heights = part
                delayed = dask.delayed(_finish_bump)(
                    cw_int, ch_int, p_locs, p_heights, spread)
                row.append(da.from_delayed(delayed,
                                           shape=(ch_int, cw_int),
                                           dtype=np.float64))
        rows.append(row)
    return da.block(rows)


def _bump_dask_cupy(data, width, height, locs, heights, spread):
    import dask

    def _finish_bump_cupy(cw, ch, p_locs, p_heights, spread):
        return cupy.asarray(_finish_bump(cw, ch, p_locs, p_heights, spread))

    partitions = _partition_bumps(data, locs, heights, spread)
    rows = []
    for yi, ch in enumerate(data.chunks[0]):
        row = []
        for xi, cw in enumerate(data.chunks[1]):
            ch_int, cw_int = int(ch), int(cw)
            part = partitions[(yi, xi)]
            if part is None:
                row.append(da.zeros((ch_int, cw_int),
                                    chunks=(ch_int, cw_int),
                                    dtype=np.float64))
            else:
                p_locs, p_heights = part
                delayed = dask.delayed(_finish_bump_cupy)(
                    cw_int, ch_int, p_locs, p_heights, spread)
                row.append(da.from_delayed(delayed,
                                           shape=(ch_int, cw_int),
                                           dtype=np.float64))
        rows.append(row)
    return da.block(rows)


[docs] def bump(width: int = None, height: int = None, count: Optional[int] = None, height_func=None, spread: int = 1, *, agg: xr.DataArray = None) -> xr.DataArray: """ Generate a simple bump map to simulate the appearance of land features. Using a user-defined height function, determines at what elevation a specific bump height is acceptable. Bumps of number `count` are applied over the area `width` x `height`. Parameters ---------- width : int, optional Total width, in pixels, of the image. Not required when ``agg`` is provided. height : int, optional Total height, in pixels, of the image. Not required when ``agg`` is provided. count : int Number of bumps to generate. height_func : function which takes x, y and returns a height value Function used to apply varying bump heights to different elevations. spread : int, default=1 Number of pixels to spread on all sides. agg : xarray.DataArray, optional Template raster whose shape, chunks, and backend (NumPy, CuPy, Dask, Dask+CuPy) determine the output type. When provided, ``width`` and ``height`` are inferred from ``agg.shape``. Returns ------- bump_agg : xarray.DataArray 2D aggregate array of calculated bump heights. References ---------- - ICA: http://www.mountaincartography.org/mt_hood/pdfs/nighbert_bump1.pdf # noqa Examples -------- .. plot:: :include-source: from functools import partial import matplotlib.pyplot as plt import numpy as np import xarray as xr from xrspatial import generate_terrain, bump # Generate Example Terrain W = 500 H = 300 template_terrain = xr.DataArray(np.zeros((H, W))) x_range=(-20e6, 20e6) y_range=(-20e6, 20e6) terrain_agg = generate_terrain( template_terrain, x_range=x_range, y_range=y_range ) # Edit Attributes terrain_agg = terrain_agg.assign_attrs( { 'Description': 'Example Terrain', 'units': 'km', 'Max Elevation': '4000', } ) terrain_agg = terrain_agg.rename({'x': 'lon', 'y': 'lat'}) terrain_agg = terrain_agg.rename('Elevation') # Create Height Function def heights(locations, src, src_range, height = 20): num_bumps = locations.shape[0] out = np.zeros(num_bumps, dtype = np.uint16) for r in range(0, num_bumps): loc = locations[r] x = loc[0] y = loc[1] val = src[y, x] if val >= src_range[0] and val < src_range[1]: out[r] = height return out # Create Bump Map Aggregate Array bump_count = 10000 src = terrain_agg.data # Short Bumps from z = 1000 to z = 1300 bump_agg = bump(width = W, height = H, count = bump_count, height_func = partial(heights, src = src, src_range = (1000, 1300), height = 5)) # Tall Bumps from z = 1300 to z = 1700 bump_agg += bump(width = W, height = H, count = bump_count // 2, height_func = partial(heights, src = src, src_range = (1300, 1700), height=20)) # Short Bumps from z = 1700 to z = 2000 bump_agg += bump(width = W, height = H, count = bump_count // 3, height_func = partial(heights, src = src, src_range = (1700, 2000), height=5)) # Edit Attributes bump_agg = bump_agg.assign_attrs({'Description': 'Example Bump Map', 'units': 'km'}) bump_agg = bump_agg.rename('Bump Height') # Rename Coordinates bump_agg = bump_agg.assign_coords({'x': terrain_agg.coords['lon'].data, 'y': terrain_agg.coords['lat'].data}) # Remove zeros bump_agg.data[bump_agg.data == 0] = np.nan # Plot Terrain terrain_agg.plot(cmap = 'terrain', aspect = 2, size = 4) plt.title("Terrain") plt.ylabel("latitude") plt.xlabel("longitude") # Plot Bump Map bump_agg.plot(cmap = 'summer', aspect = 2, size = 4) plt.title("Bump Map") plt.ylabel("latitude") plt.xlabel("longitude") .. sourcecode:: python >>> print(terrain_agg[200:203, 200:202]) <xarray.DataArray 'Elevation' (lat: 3, lon: 2)> array([[1264.02296597, 1261.947921 ], [1285.37105519, 1282.48079719], [1306.02339636, 1303.4069579 ]]) Coordinates: * lon (lon) float64 -3.96e+06 -3.88e+06 * lat (lat) float64 6.733e+06 6.867e+06 7e+06 Attributes: res: (80000.0, 133333.3333333333) Description: Example Terrain units: km Max Elevation: 4000 .. sourcecode:: python >>> print(bump_agg[200:205, 200:206]) <xarray.DataArray 'Bump Height' (y: 5, x: 6)> array([[nan, nan, nan, nan, 5., 5.], [nan, nan, nan, nan, nan, 5.], [nan, nan, nan, nan, nan, nan], [nan, nan, nan, nan, nan, nan], [nan, nan, nan, nan, nan, nan]]) Coordinates: * x (x) float64 -3.96e+06 -3.88e+06 -3.8e+06 ... -3.64e+06 -3.56e+06 * y (y) float64 6.733e+06 6.867e+06 7e+06 7.133e+06 7.267e+06 Attributes: res: 1 Description: Example Bump Map units: km """ if agg is not None: h, w = agg.shape else: _validate_scalar(width, func_name='bump', name='width', dtype=int, min_val=1) _validate_scalar(height, func_name='bump', name='height', dtype=int, min_val=1) w, h = width, height linx = range(w) liny = range(h) if count is None: count = min(w * h // 10, _MAX_DEFAULT_COUNT) # The dask backends build the output lazily per-chunk, so the full # raster never lives in memory at once. Only guard against raster # size when we will actually materialize it. materializes_raster = ( agg is None or isinstance(agg.data, np.ndarray) or (has_cuda_and_cupy() and is_cupy_array(agg.data)) ) # Budget: 16 bytes per bump (2 x int32 coord + float64 height) plus # the full output raster at 8 bytes/cell (float64) when the backend # will materialize it. Guard both so a huge w*h cannot allocate # multi-TB arrays silently. bump_bytes = count * 16 raster_bytes = w * h * 8 if materializes_raster else 0 required_bytes = bump_bytes + raster_bytes available = _available_memory_bytes() if required_bytes > 0.5 * available: if raster_bytes > 0: detail = ( f"({raster_bytes / 1e9:.1f} GB for the output raster plus " f"{bump_bytes / 1e9:.1f} GB for location/height arrays)" ) hint = "Use a smaller raster or pass a dask-backed agg." else: detail = "for location/height arrays" hint = "Pass a smaller count explicitly." raise MemoryError( f"bump() with width={w:,}, height={h:,}, count={count:,} " f"requires ~{required_bytes / 1e9:.1f} GB {detail}, " f"but only {available / 1e9:.1f} GB is available. " f"{hint}" ) if height_func is None: height_func = lambda bumps: np.ones(len(bumps)) # noqa # create 2d array of random x, y for bump locations locs = np.empty((count, 2), dtype=np.int32) locs[:, 0] = np.random.choice(linx, count) locs[:, 1] = np.random.choice(liny, count) heights = height_func(locs) if agg is not None: mapper = ArrayTypeFunctionMapping( numpy_func=_bump_numpy, cupy_func=_bump_cupy, dask_func=_bump_dask_numpy, dask_cupy_func=_bump_dask_cupy, ) out = mapper(agg)(agg.data, w, h, locs, heights, spread) return DataArray(out, coords=agg.coords, dims=agg.dims, attrs=dict(res=1)) else: bumps = _finish_bump(w, h, locs, heights, spread) return DataArray(bumps, dims=['y', 'x'], attrs=dict(res=1))