Source code for xrspatial.hydro.twi_d8

"""Topographic Wetness Index (TWI).

Computes ``ln(a / tan(β))`` where *a* is the specific catchment area
(flow accumulation × cell size) and *β* is slope in radians.

Flat areas (slope ≈ 0) are clamped to ``tan(0.001°)`` to avoid
division by zero and infinite TWI values.
"""

from __future__ import annotations

import numpy as np
import xarray as xr

try:
    import dask.array as da
except ImportError:
    da = None

from xrspatial.utils import (_validate_raster, get_dataarray_resolution, has_cuda_and_cupy,
                             is_cupy_array, is_dask_cupy)

# Minimum tan(slope) clamp: tan(0.001°)
_TAN_MIN = np.tan(np.radians(0.001))


[docs] def twi_d8(flow_accum: xr.DataArray, slope_agg: xr.DataArray, name: str = 'twi') -> xr.DataArray: """Compute the Topographic Wetness Index. TWI = ln(a / tan(β)), where a = flow_accum × cellsize (specific catchment area) and β = slope in radians. Slope input is expected in degrees (matching the output of ``slope()``). Parameters ---------- flow_accum : xarray.DataArray 2D flow accumulation grid (cell counts). slope_agg : xarray.DataArray 2D slope grid in degrees. name : str, default 'twi' Name of output DataArray. Returns ------- xarray.DataArray 2D float64 TWI grid. NaN where either input is NaN. """ _validate_raster(flow_accum, func_name='twi', name='flow_accum') _validate_raster(slope_agg, func_name='twi', name='slope_agg') cellsize_x, cellsize_y = get_dataarray_resolution(flow_accum) if not (np.isfinite(cellsize_x) and cellsize_x != 0 and np.isfinite(cellsize_y) and cellsize_y != 0): raise ValueError( f"twi(): cellsize must be finite and non-zero " f"(got cellsize_x={cellsize_x}, cellsize_y={cellsize_y}). " f"Ensure flow_accum has at least 2 cells per spatial dimension " f"with finite coords." ) cellsize = np.sqrt(abs(cellsize_x * cellsize_y)) fa_data = flow_accum.data sl_data = slope_agg.data if has_cuda_and_cupy() and is_dask_cupy(flow_accum): out = _twi_dask_cupy(fa_data, sl_data, cellsize) elif has_cuda_and_cupy() and is_cupy_array(fa_data): out = _twi_cupy(fa_data, sl_data, cellsize) elif da is not None and isinstance(fa_data, da.Array): out = _twi_dask(fa_data, sl_data, cellsize) elif isinstance(fa_data, np.ndarray): out = _twi_numpy(fa_data, sl_data, cellsize) else: raise TypeError(f"Unsupported array type: {type(fa_data)}") return xr.DataArray(out, name=name, coords=flow_accum.coords, dims=flow_accum.dims, attrs=flow_accum.attrs)
def _twi_numpy(fa, sl, cellsize): fa = np.asarray(fa, dtype=np.float64) sl = np.asarray(sl, dtype=np.float64) sca = fa * cellsize tan_slope = np.tan(np.radians(sl)) tan_slope = np.where(tan_slope < _TAN_MIN, _TAN_MIN, tan_slope) return np.log(sca / tan_slope) def _twi_cupy(fa, sl, cellsize): import cupy as cp fa = cp.asarray(fa, dtype=cp.float64) sl = cp.asarray(sl, dtype=cp.float64) sca = fa * cellsize tan_slope = cp.tan(cp.radians(sl)) tan_slope = cp.where(tan_slope < _TAN_MIN, _TAN_MIN, tan_slope) return cp.log(sca / tan_slope) def _twi_dask(fa, sl, cellsize): import dask.array as _da sca = fa * cellsize tan_slope = _da.tan(_da.radians(sl)) tan_slope = _da.where(tan_slope < _TAN_MIN, _TAN_MIN, tan_slope) return _da.log(sca / tan_slope) def _twi_dask_cupy(fa, sl, cellsize): import cupy as cp fa_np = fa.map_blocks( lambda b: b.get(), dtype=fa.dtype, meta=np.array((), dtype=fa.dtype), ) sl_np = sl.map_blocks( lambda b: b.get(), dtype=sl.dtype, meta=np.array((), dtype=sl.dtype), ) result = _twi_dask(fa_np, sl_np, cellsize) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), )