Source code for xrspatial.sky_view_factor

"""Sky-view factor (SVF) for digital elevation models.

SVF measures the fraction of the sky hemisphere visible from each cell,
ranging from 0 (fully obstructed) to 1 (flat open terrain).  For each
cell, rays are cast at *n_directions* evenly spaced azimuths out to
*max_radius* cells.  Along each ray the maximum elevation angle to the
horizon is recorded and SVF is computed as:

    SVF(i,j) = 1 - mean(sin(max_horizon_angle(d)) for d in directions)

References
----------
Zakek, K., Ostir, K., Kokalj, Z. (2011).  Sky-View Factor as a Relief
Visualization Technique.  Remote Sensing, 3(2), 398-415.

Yokoyama, R., Sidle, R.C., Noguchi, S. (2002).  Application of
Topographic Shading to Terrain Visualization.  International Journal
of Geographical Information Science, 16(5), 489-502.
"""
from __future__ import annotations

from functools import partial
from math import atan2 as _atan2, cos as _cos, pi as _pi, sin as _sin, sqrt as _sqrt
from typing import Optional

import numpy as np
import xarray as xr
from numba import cuda

try:
    import cupy
except ImportError:
    class cupy:
        ndarray = False

try:
    import dask.array as da
except ImportError:
    da = None

from xrspatial.dataset_support import supports_dataset
from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    _boundary_to_dask,
    _validate_raster,
    _validate_scalar,
    cuda_args,
    get_dataarray_resolution,
    ngjit,
)


# ---------------------------------------------------------------------------
# Memory guards
# ---------------------------------------------------------------------------
# Peak working set per pixel:
#   float64 input cast    8 bytes
#   float64 output array  8 bytes
# Total ~16 bytes/pixel.  Same budget pattern as resample / kde / sieve.
_BYTES_PER_PIXEL = 16


def _available_memory_bytes():
    """Best-effort estimate of available host memory in bytes."""
    try:
        with open("/proc/meminfo", "r") as f:
            for line in f:
                if line.startswith("MemAvailable:"):
                    return int(line.split()[1]) * 1024
    except (OSError, ValueError, IndexError):
        pass
    try:
        import psutil

        return psutil.virtual_memory().available
    except (ImportError, AttributeError):
        pass
    return 2 * 1024**3


def _available_gpu_memory_bytes():
    """Best-effort estimate of free GPU memory in bytes.

    Returns 0 when CuPy / CUDA is unavailable or the query fails.
    Callers treat that as "no GPU info, skip the guard".
    """
    try:
        import cupy as _cp

        free, _total = _cp.cuda.runtime.memGetInfo()
        return int(free)
    except Exception:
        return 0


def _check_memory(rows, cols):
    """Raise MemoryError if the eager numpy pass would exceed 50% of RAM."""
    required = int(rows) * int(cols) * _BYTES_PER_PIXEL
    available = _available_memory_bytes()
    if required > 0.5 * available:
        raise MemoryError(
            f"sky_view_factor() on a {rows}x{cols} raster needs "
            f"~{required / 1e9:.1f} GB of working memory but only "
            f"~{available / 1e9:.1f} GB is available.  Use a dask-backed "
            f"DataArray for out-of-core processing or downsample the input."
        )


def _check_gpu_memory(rows, cols):
    """Raise MemoryError when the cupy allocation would not fit.

    Checks host memory first because the input may already be staged on
    the host before transfer.  Skips silently when the GPU query fails.
    """
    _check_memory(rows, cols)
    available = _available_gpu_memory_bytes()
    if available <= 0:
        return
    required = int(rows) * int(cols) * _BYTES_PER_PIXEL
    if required > 0.5 * available:
        raise MemoryError(
            f"sky_view_factor() on a {rows}x{cols} cupy raster needs "
            f"~{required / 1e9:.1f} GB of GPU memory but only "
            f"~{available / 1e9:.1f} GB is free on the active device.  "
            f"Use a dask+cupy DataArray for out-of-core processing or "
            f"downsample the input."
        )


# ---------------------------------------------------------------------------
# CPU kernel
# ---------------------------------------------------------------------------

@ngjit
def _svf_cpu(data, max_radius, n_directions, cellsize_x, cellsize_y):
    """Compute SVF over an entire 2-D array on the CPU.

    *cellsize_x* and *cellsize_y* are the ground distance per cell along
    the x and y axes, in the same units as the elevation values.  They
    are used to convert the integer ray step *r* into a true ground
    distance for the horizon angle calculation.
    """
    rows, cols = data.shape
    out = np.empty((rows, cols), dtype=np.float64)
    out[:] = np.nan

    for y in range(rows):
        for x in range(cols):
            center = data[y, x]
            if center != center:  # NaN check
                continue

            svf_sum = 0.0
            for d in range(n_directions):
                angle = 2.0 * _pi * d / n_directions
                dx = _cos(angle)
                dy = _sin(angle)

                max_elev_angle = 0.0
                for r in range(1, max_radius + 1):
                    sx = x + int(round(r * dx))
                    sy = y + int(round(r * dy))
                    if sx < 0 or sx >= cols or sy < 0 or sy >= rows:
                        break
                    elev = data[sy, sx]
                    if elev != elev:  # NaN
                        break
                    dz = elev - center
                    gx = (sx - x) * cellsize_x
                    gy = (sy - y) * cellsize_y
                    dist = _sqrt(gx * gx + gy * gy)
                    elev_angle = _atan2(dz, dist)
                    if elev_angle > max_elev_angle:
                        max_elev_angle = elev_angle

                svf_sum += _sin(max_elev_angle)

            out[y, x] = 1.0 - svf_sum / n_directions
    return out


# ---------------------------------------------------------------------------
# GPU kernels
# ---------------------------------------------------------------------------

@cuda.jit
def _svf_gpu(data, out, max_radius, n_directions, cellsize_x, cellsize_y):
    """CUDA global kernel: one thread per cell."""
    y, x = cuda.grid(2)
    rows, cols = data.shape

    if y >= rows or x >= cols:
        return

    center = data[y, x]
    if center != center:  # NaN
        return

    svf_sum = 0.0
    pi2 = 2.0 * 3.141592653589793

    for d in range(n_directions):
        angle = pi2 * d / n_directions
        dx = _cos(angle)
        dy = _sin(angle)

        max_elev_angle = 0.0
        for r in range(1, max_radius + 1):
            sx = x + int(round(r * dx))
            sy = y + int(round(r * dy))
            if sx < 0 or sx >= cols or sy < 0 or sy >= rows:
                break
            elev = data[sy, sx]
            if elev != elev:  # NaN
                break
            dz = elev - center
            gx = (sx - x) * cellsize_x
            gy = (sy - y) * cellsize_y
            dist = _sqrt(gx * gx + gy * gy)
            elev_angle = _atan2(dz, dist)
            if elev_angle > max_elev_angle:
                max_elev_angle = elev_angle

        svf_sum += _sin(max_elev_angle)

    out[y, x] = 1.0 - svf_sum / n_directions


# ---------------------------------------------------------------------------
# Backend wrappers
# ---------------------------------------------------------------------------

def _run_numpy(data, max_radius, n_directions, cellsize_x, cellsize_y):
    _check_memory(data.shape[0], data.shape[1])
    data = data.astype(np.float64)
    return _svf_cpu(data, max_radius, n_directions, cellsize_x, cellsize_y)


def _run_cupy(data, max_radius, n_directions, cellsize_x, cellsize_y):
    _check_gpu_memory(data.shape[0], data.shape[1])
    data = data.astype(cupy.float64)
    out = cupy.full(data.shape, cupy.nan, dtype=cupy.float64)
    griddim, blockdim = cuda_args(data.shape)
    _svf_gpu[griddim, blockdim](
        data, out, max_radius, n_directions, cellsize_x, cellsize_y
    )
    return out


def _run_dask_numpy(data, max_radius, n_directions, cellsize_x, cellsize_y):
    data = data.astype(np.float64)
    _func = partial(
        _svf_cpu,
        max_radius=max_radius,
        n_directions=n_directions,
        cellsize_x=cellsize_x,
        cellsize_y=cellsize_y,
    )
    out = data.map_overlap(
        _func,
        depth=(max_radius, max_radius),
        boundary=np.nan,
        meta=np.array(()),
    )
    return out


def _run_dask_cupy(data, max_radius, n_directions, cellsize_x, cellsize_y):
    data = data.astype(cupy.float64)
    _func = partial(
        _run_cupy,
        max_radius=max_radius,
        n_directions=n_directions,
        cellsize_x=cellsize_x,
        cellsize_y=cellsize_y,
    )
    out = data.map_overlap(
        _func,
        depth=(max_radius, max_radius),
        boundary=cupy.nan,
        meta=cupy.array(()),
    )
    return out


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------

[docs] @supports_dataset def sky_view_factor( agg: xr.DataArray, max_radius: int = 10, n_directions: int = 16, cellsize_x: Optional[float] = None, cellsize_y: Optional[float] = None, name: Optional[str] = 'sky_view_factor', ) -> xr.DataArray: """Compute the sky-view factor for each cell of a DEM. SVF measures the fraction of the sky hemisphere visible from each cell on a scale from 0 (fully obstructed) to 1 (flat open terrain). Rays are cast at *n_directions* evenly spaced azimuths out to *max_radius* cells, and the maximum elevation angle along each ray determines the horizon obstruction. The horizon angle along each ray uses true ground distance (``cell_step * cellsize``), so cell size must be in the same unit as the elevation values (e.g. meters for both). Parameters ---------- agg : xarray.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask xarray DataArray of elevation values. If a Dataset is passed, the operation is applied to each data variable independently. max_radius : int, default=10 Maximum search distance in cells along each ray direction. n_directions : int, default=16 Number of azimuth directions to sample, evenly spaced around 360 degrees. cellsize_x : float, optional Ground distance per cell along the x axis, in the same unit as the elevation values. If not provided, it is read from ``agg.attrs['res']`` or computed from the x coordinate. cellsize_y : float, optional Ground distance per cell along the y axis, in the same unit as the elevation values. If not provided, it is read from ``agg.attrs['res']`` or computed from the y coordinate. name : str, default='sky_view_factor' Name of the output DataArray. Returns ------- xarray.DataArray or xr.Dataset 2D array of SVF values in [0, 1] with the same shape, coords, dims, and attrs as the input. Edge cells use truncated rays that stop at the raster boundary. References ---------- Zakek, K., Ostir, K., Kokalj, Z. (2011). Sky-View Factor as a Relief Visualization Technique. Remote Sensing, 3(2), 398-415. Examples -------- >>> from xrspatial import sky_view_factor >>> svf = sky_view_factor(dem, max_radius=100, n_directions=16) """ _validate_raster(agg, func_name='sky_view_factor', name='agg') _validate_scalar(max_radius, func_name='sky_view_factor', name='max_radius', dtype=int, min_val=1) _validate_scalar(n_directions, func_name='sky_view_factor', name='n_directions', dtype=int, min_val=1) if cellsize_x is None or cellsize_y is None: try: res_x, res_y = get_dataarray_resolution(agg) except Exception: res_x, res_y = 1.0, 1.0 if cellsize_x is None: cellsize_x = float(abs(res_x)) if res_x else 1.0 if cellsize_y is None: cellsize_y = float(abs(res_y)) if res_y else 1.0 else: cellsize_x = float(abs(cellsize_x)) cellsize_y = float(abs(cellsize_y)) if cellsize_x <= 0: cellsize_x = 1.0 if cellsize_y <= 0: cellsize_y = 1.0 mapper = ArrayTypeFunctionMapping( numpy_func=_run_numpy, cupy_func=_run_cupy, dask_func=_run_dask_numpy, dask_cupy_func=_run_dask_cupy, ) out = mapper(agg)( agg.data, max_radius, n_directions, cellsize_x, cellsize_y ) return xr.DataArray( out, name=name, coords=agg.coords, dims=agg.dims, attrs=agg.attrs, )