Source code for xrspatial.corridor

"""Least-cost corridor analysis.

Identifies zones of low cumulative traversal cost between two (or more)
source locations on a friction surface.  Unlike a single-cell path, a
corridor shows all cells within a cost threshold of the optimal route.

Algorithm
---------
1. Compute ``cost_distance(friction, source_a)`` and
   ``cost_distance(friction, source_b)``.
2. Sum the two surfaces: ``corridor = cd_a + cd_b``.
3. Normalize: ``corridor - corridor.min()``.
4. Optionally threshold to produce a binary corridor mask.

Because the function is pure arithmetic on ``cost_distance`` outputs,
all four backends (NumPy, CuPy, Dask+NumPy, Dask+CuPy) are supported
natively with no extra kernel code.
"""

from __future__ import annotations

import itertools

import numpy as np
import xarray as xr

from xrspatial.cost_distance import cost_distance
from xrspatial.utils import _validate_raster


def _scalar_to_float(da_scalar):
    """Extract a Python float from a scalar DataArray (numpy/cupy/dask)."""
    data = da_scalar.data
    if hasattr(data, 'compute'):
        data = data.compute()
    if hasattr(data, 'get'):
        data = data.get()
    return float(data)


def _compute_corridor(
    cd_a: xr.DataArray,
    cd_b: xr.DataArray,
    threshold: float | None,
    relative: bool,
) -> xr.DataArray:
    """Sum two cost-distance surfaces, normalize, and optionally threshold."""
    corridor = cd_a + cd_b
    corridor_min = _scalar_to_float(corridor.min())

    if not np.isfinite(corridor_min):
        # Sources are mutually unreachable -- return all-NaN.
        return corridor * np.nan

    normalized = corridor - corridor_min

    if threshold is not None:
        if relative:
            cutoff = threshold * corridor_min
        else:
            cutoff = threshold
        data = normalized.data
        try:
            import dask.array as _da
            if isinstance(data, _da.Array):
                data = _da.where(data <= cutoff, data, np.nan)
            else:
                raise ImportError
        except ImportError:
            if hasattr(data, 'get'):  # cupy
                import cupy as cp
                data = cp.where(data <= cutoff, data, cp.nan)
            else:
                data = np.where(data <= cutoff, data, np.nan)
        normalized = normalized.copy(data=data)

    return normalized


[docs] def least_cost_corridor( friction: xr.DataArray, source_a: xr.DataArray | None = None, source_b: xr.DataArray | None = None, *, sources: list[xr.DataArray] | None = None, pairwise: bool = False, threshold: float | None = None, relative: bool = False, precomputed: bool = False, x: str = "x", y: str = "y", connectivity: int = 8, max_cost: float = np.inf, ) -> xr.DataArray | xr.Dataset: """Compute a least-cost corridor between two source regions. A corridor surface is the sum of two cost-distance fields (one from each source), normalized by the minimum corridor cost. Cells where the normalized value falls within *threshold* form the corridor. Parameters ---------- friction : xr.DataArray 2-D friction (cost) surface. Positive finite values are passable; NaN or ``<= 0`` marks barriers. Ignored when *precomputed* is True. source_a, source_b : xr.DataArray, optional Source rasters identifying start and end regions. Non-zero finite values mark source cells (same convention as ``cost_distance``). Required unless *sources* is given. When *precomputed* is True these are treated as pre-computed cost-distance surfaces. sources : list of xr.DataArray, optional Multiple source rasters for pairwise corridor computation. Mutually exclusive with *source_a* / *source_b*. pairwise : bool, default False When True and *sources* has more than two entries, compute corridors for every unique pair and return an ``xr.Dataset`` with one variable per pair (named ``corridor_i_j``). threshold : float, optional Cost threshold for masking the corridor. Cells whose normalized corridor cost exceeds this value are set to NaN. relative : bool, default False If True, *threshold* is a fraction of the minimum corridor cost (e.g. 0.10 means within 10 % of optimal). If False, *threshold* is in absolute cost units. precomputed : bool, default False If True, *source_a* / *source_b* (or entries in *sources*) are already cost-distance surfaces. Skips the internal ``cost_distance`` calls. x, y : str Coordinate names forwarded to ``cost_distance``. connectivity : int, default 8 Pixel connectivity (4 or 8), forwarded to ``cost_distance``. max_cost : float, default np.inf Maximum cost budget, forwarded to ``cost_distance``. Returns ------- xr.DataArray or xr.Dataset Normalized corridor surface (float). Values start at 0 along the optimal corridor and increase with deviation from it. When *threshold* is set, cells beyond the cutoff are NaN. With *pairwise=True* and multiple sources, returns a Dataset keyed by pair indices (``corridor_0_1``, ``corridor_0_2``, ...). """ # ------------------------------------------------------------------ # Input validation # ------------------------------------------------------------------ if sources is not None and (source_a is not None or source_b is not None): raise ValueError( "Provide either (source_a, source_b) or sources, not both" ) if sources is not None: if len(sources) < 2: raise ValueError("sources must contain at least 2 entries") for i, s in enumerate(sources): _validate_raster( s, func_name="least_cost_corridor", name=f"sources[{i}]" ) else: if source_a is None or source_b is None: raise ValueError( "source_a and source_b are required when sources is not given" ) _validate_raster( source_a, func_name="least_cost_corridor", name="source_a" ) _validate_raster( source_b, func_name="least_cost_corridor", name="source_b" ) sources = [source_a, source_b] if not precomputed: _validate_raster( friction, func_name="least_cost_corridor", name="friction" ) if threshold is not None and threshold < 0: raise ValueError("threshold must be non-negative") # ------------------------------------------------------------------ # Compute cost-distance surfaces (or use precomputed ones) # ------------------------------------------------------------------ if precomputed: cd_surfaces = list(sources) else: cd_surfaces = [ cost_distance( src, friction, x=x, y=y, connectivity=connectivity, max_cost=max_cost, ) for src in sources ] # ------------------------------------------------------------------ # Two-source case: return a single DataArray # ------------------------------------------------------------------ if len(cd_surfaces) == 2 and not pairwise: return _compute_corridor( cd_surfaces[0], cd_surfaces[1], threshold, relative ) # ------------------------------------------------------------------ # Multi-source pairwise: return a Dataset # ------------------------------------------------------------------ corridors = {} for i, j in itertools.combinations(range(len(cd_surfaces)), 2): name = f"corridor_{i}_{j}" corridors[name] = _compute_corridor( cd_surfaces[i], cd_surfaces[j], threshold, relative ) return xr.Dataset(corridors)