Source code for xrspatial.sky_view_factor

"""Sky-view factor (SVF) for digital elevation models.

SVF measures the fraction of the sky hemisphere visible from each cell,
ranging from 0 (fully obstructed) to 1 (flat open terrain).  For each
cell, rays are cast at *n_directions* evenly spaced azimuths out to
*max_radius* cells.  Along each ray the maximum elevation angle to the
horizon is recorded and SVF is computed as:

    SVF(i,j) = 1 - mean(sin(max_horizon_angle(d)) for d in directions)

References
----------
Zakek, K., Ostir, K., Kokalj, Z. (2011).  Sky-View Factor as a Relief
Visualization Technique.  Remote Sensing, 3(2), 398-415.

Yokoyama, R., Sidle, R.C., Noguchi, S. (2002).  Application of
Topographic Shading to Terrain Visualization.  International Journal
of Geographical Information Science, 16(5), 489-502.
"""
from __future__ import annotations

from functools import partial
from math import atan2 as _atan2, cos as _cos, pi as _pi, sin as _sin, sqrt as _sqrt
from typing import Optional

import numpy as np
import xarray as xr
from numba import cuda

try:
    import cupy
except ImportError:
    class cupy:
        ndarray = False

try:
    import dask.array as da
except ImportError:
    da = None

from xrspatial.dataset_support import supports_dataset
from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    _boundary_to_dask,
    _validate_raster,
    _validate_scalar,
    cuda_args,
    ngjit,
)


# ---------------------------------------------------------------------------
# CPU kernel
# ---------------------------------------------------------------------------

@ngjit
def _svf_cpu(data, max_radius, n_directions):
    """Compute SVF over an entire 2-D array on the CPU."""
    rows, cols = data.shape
    out = np.empty((rows, cols), dtype=np.float64)
    out[:] = np.nan

    for y in range(rows):
        for x in range(cols):
            center = data[y, x]
            if center != center:  # NaN check
                continue

            svf_sum = 0.0
            for d in range(n_directions):
                angle = 2.0 * _pi * d / n_directions
                dx = _cos(angle)
                dy = _sin(angle)

                max_elev_angle = 0.0
                for r in range(1, max_radius + 1):
                    sx = x + int(round(r * dx))
                    sy = y + int(round(r * dy))
                    if sx < 0 or sx >= cols or sy < 0 or sy >= rows:
                        break
                    elev = data[sy, sx]
                    if elev != elev:  # NaN
                        break
                    dz = elev - center
                    dist = float(r)
                    elev_angle = _atan2(dz, dist)
                    if elev_angle > max_elev_angle:
                        max_elev_angle = elev_angle

                svf_sum += _sin(max_elev_angle)

            out[y, x] = 1.0 - svf_sum / n_directions
    return out


# ---------------------------------------------------------------------------
# GPU kernels
# ---------------------------------------------------------------------------

@cuda.jit
def _svf_gpu(data, out, max_radius, n_directions):
    """CUDA global kernel: one thread per cell."""
    y, x = cuda.grid(2)
    rows, cols = data.shape

    if y >= rows or x >= cols:
        return

    center = data[y, x]
    if center != center:  # NaN
        return

    svf_sum = 0.0
    pi2 = 2.0 * 3.141592653589793

    for d in range(n_directions):
        angle = pi2 * d / n_directions
        dx = _cos(angle)
        dy = _sin(angle)

        max_elev_angle = 0.0
        for r in range(1, max_radius + 1):
            sx = x + int(round(r * dx))
            sy = y + int(round(r * dy))
            if sx < 0 or sx >= cols or sy < 0 or sy >= rows:
                break
            elev = data[sy, sx]
            if elev != elev:  # NaN
                break
            dz = elev - center
            dist = float(r)
            elev_angle = _atan2(dz, dist)
            if elev_angle > max_elev_angle:
                max_elev_angle = elev_angle

        svf_sum += _sin(max_elev_angle)

    out[y, x] = 1.0 - svf_sum / n_directions


# ---------------------------------------------------------------------------
# Backend wrappers
# ---------------------------------------------------------------------------

def _run_numpy(data, max_radius, n_directions):
    data = data.astype(np.float64)
    return _svf_cpu(data, max_radius, n_directions)


def _run_cupy(data, max_radius, n_directions):
    data = data.astype(cupy.float64)
    out = cupy.full(data.shape, cupy.nan, dtype=cupy.float64)
    griddim, blockdim = cuda_args(data.shape)
    _svf_gpu[griddim, blockdim](data, out, max_radius, n_directions)
    return out


def _run_dask_numpy(data, max_radius, n_directions):
    data = data.astype(np.float64)
    _func = partial(_svf_cpu, max_radius=max_radius, n_directions=n_directions)
    out = data.map_overlap(
        _func,
        depth=(max_radius, max_radius),
        boundary=np.nan,
        meta=np.array(()),
    )
    return out


def _run_dask_cupy(data, max_radius, n_directions):
    data = data.astype(cupy.float64)
    _func = partial(_run_cupy, max_radius=max_radius, n_directions=n_directions)
    out = data.map_overlap(
        _func,
        depth=(max_radius, max_radius),
        boundary=cupy.nan,
        meta=cupy.array(()),
    )
    return out


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------

[docs] @supports_dataset def sky_view_factor( agg: xr.DataArray, max_radius: int = 10, n_directions: int = 16, name: Optional[str] = 'sky_view_factor', ) -> xr.DataArray: """Compute the sky-view factor for each cell of a DEM. SVF measures the fraction of the sky hemisphere visible from each cell on a scale from 0 (fully obstructed) to 1 (flat open terrain). Rays are cast at *n_directions* evenly spaced azimuths out to *max_radius* cells, and the maximum elevation angle along each ray determines the horizon obstruction. Parameters ---------- agg : xarray.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask xarray DataArray of elevation values. If a Dataset is passed, the operation is applied to each data variable independently. max_radius : int, default=10 Maximum search distance in cells along each ray direction. Cells within *max_radius* of the raster edge will be NaN. n_directions : int, default=16 Number of azimuth directions to sample, evenly spaced around 360 degrees. name : str, default='sky_view_factor' Name of the output DataArray. Returns ------- xarray.DataArray or xr.Dataset 2D array of SVF values in [0, 1] with the same shape, coords, dims, and attrs as the input. Edge cells use truncated rays that stop at the raster boundary. References ---------- Zakek, K., Ostir, K., Kokalj, Z. (2011). Sky-View Factor as a Relief Visualization Technique. Remote Sensing, 3(2), 398-415. Examples -------- >>> from xrspatial import sky_view_factor >>> svf = sky_view_factor(dem, max_radius=100, n_directions=16) """ _validate_raster(agg, func_name='sky_view_factor', name='agg') _validate_scalar(max_radius, func_name='sky_view_factor', name='max_radius', dtype=int, min_val=1) _validate_scalar(n_directions, func_name='sky_view_factor', name='n_directions', dtype=int, min_val=1) mapper = ArrayTypeFunctionMapping( numpy_func=_run_numpy, cupy_func=_run_cupy, dask_func=_run_dask_numpy, dask_cupy_func=_run_dask_cupy, ) out = mapper(agg)(agg.data, max_radius, n_directions) return xr.DataArray( out, name=name, coords=agg.coords, dims=agg.dims, attrs=agg.attrs, )