Source code for xrspatial.flood

"""Flood prediction and management tools.

Provides per-cell flood analysis functions that build on the existing
hydrology module (HAND, flow accumulation, flow length, etc.):

- ``flood_depth``: water depth above terrain given a HAND raster and
  water level
- ``inundation``: binary flood/no-flood mask
- ``curve_number_runoff``: SCS/NRCS curve number runoff estimation
- ``travel_time``: overland flow travel time via simplified Manning's
  equation
- ``vegetation_roughness``: Manning's n from land cover or NDVI
- ``vegetation_curve_number``: SCS curve number from land cover and
  hydrologic soil group
- ``flood_depth_vegetation``: vegetation-adjusted flood depth via
  Manning's normal depth equation
"""

from __future__ import annotations

from typing import Union

import numpy as np
import xarray as xr

try:
    import dask.array as da
except ImportError:
    da = None

from xrspatial.utils import (
    _validate_raster,
    has_cuda_and_cupy,
    is_cupy_array,
    is_dask_cupy,
)

# Minimum tan(slope) clamp: tan(0.001 deg), same as TWI
_TAN_MIN = np.tan(np.radians(0.001))


def _validate_mannings_n_dataarray(mannings_n):
    """Enforce finite, strictly positive values on a mannings_n DataArray.

    Mirrors the scalar-path check (`if mannings_n <= 0: raise`).  Without
    this guard, a roughness raster containing 0 silently produces inf
    velocity / 0 travel time, and negatives produce negative travel time.
    """
    _validate_raster(mannings_n, func_name='flood',
                     name='mannings_n', ndim=2)
    arr = np.asarray(mannings_n.values)
    if arr.size and (
        not np.isfinite(arr).all() or not (arr > 0).all()
    ):
        raise ValueError(
            "mannings_n DataArray must contain finite, strictly positive "
            "values (no zeros, negatives, NaN, or Inf)."
        )

# ---------------------------------------------------------------------------
# NLCD-to-Manning's n lookup (Chow 1959; Arcement & Schneider 1989)
# ---------------------------------------------------------------------------
NLCD_MANNINGS_N = {
    11: 0.025,   # Open Water
    21: 0.040,   # Developed, Open Space
    22: 0.100,   # Developed, Low Intensity
    23: 0.080,   # Developed, Medium Intensity
    24: 0.150,   # Developed, High Intensity
    31: 0.028,   # Barren Land
    41: 0.100,   # Deciduous Forest
    42: 0.120,   # Evergreen Forest
    43: 0.100,   # Mixed Forest
    52: 0.070,   # Shrub/Scrub
    71: 0.035,   # Grassland/Herbaceous
    81: 0.033,   # Pasture/Hay
    82: 0.038,   # Cultivated Crops
    90: 0.100,   # Woody Wetlands
    95: 0.070,   # Emergent Herbaceous Wetlands
}

# ---------------------------------------------------------------------------
# NLCD + Hydrologic Soil Group -> SCS Curve Number (TR-55, HEC-HMS)
# Soil groups: 1=A, 2=B, 3=C, 4=D.  "Good" hydrologic condition assumed.
# ---------------------------------------------------------------------------
NLCD_CURVE_NUMBER = {
    (11, 1): 100, (11, 2): 100, (11, 3): 100, (11, 4): 100,  # Open Water
    (21, 1):  39, (21, 2):  61, (21, 3):  74, (21, 4):  80,   # Dev Open
    (22, 1):  57, (22, 2):  72, (22, 3):  81, (22, 4):  86,   # Dev Low
    (23, 1):  77, (23, 2):  85, (23, 3):  90, (23, 4):  92,   # Dev Med
    (24, 1):  89, (24, 2):  92, (24, 3):  94, (24, 4):  95,   # Dev High
    (31, 1):  77, (31, 2):  86, (31, 3):  91, (31, 4):  94,   # Barren
    (41, 1):  30, (41, 2):  55, (41, 3):  70, (41, 4):  77,   # Deciduous
    (42, 1):  30, (42, 2):  55, (42, 3):  70, (42, 4):  77,   # Evergreen
    (43, 1):  30, (43, 2):  55, (43, 3):  70, (43, 4):  77,   # Mixed Forest
    (52, 1):  35, (52, 2):  56, (52, 3):  70, (52, 4):  77,   # Shrub
    (71, 1):  39, (71, 2):  61, (71, 3):  74, (71, 4):  80,   # Grassland
    (81, 1):  39, (81, 2):  61, (81, 3):  74, (81, 4):  80,   # Pasture
    (82, 1):  67, (82, 2):  78, (82, 3):  85, (82, 4):  89,   # Crops
    (90, 1):  30, (90, 2):  58, (90, 3):  71, (90, 4):  78,   # Woody Wetland
    (95, 1):  30, (95, 2):  58, (95, 3):  71, (95, 4):  78,   # Herb Wetland
}

# NDVI-to-Manning's n interpolation breakpoints
_NDVI_BREAKPOINTS = np.array([0.0, 0.1, 0.3, 0.6, 1.0])
_NDVI_MANNINGS_N = np.array([0.02, 0.03, 0.05, 0.10, 0.16])


# ---------------------------------------------------------------------------
# flood_depth
# ---------------------------------------------------------------------------

[docs] def flood_depth( hand_agg: xr.DataArray, water_level: float, name: str = 'flood_depth', ) -> xr.DataArray: """Compute flood depth from a HAND raster and a water level. ``depth = water_level - HAND`` where ``HAND <= water_level``, ``NaN`` elsewhere. Parameters ---------- hand_agg : xarray.DataArray 2D Height Above Nearest Drainage raster (output of ``hand()``). water_level : float Water surface elevation above the drainage network (>= 0). name : str, default 'flood_depth' Name of the output DataArray. Returns ------- xarray.DataArray 2D float64 grid of flood depths. ``NaN`` where not inundated or where the input is ``NaN``. """ _validate_raster(hand_agg, func_name='flood_depth', name='hand_agg') if not isinstance(water_level, (int, float)): raise TypeError("water_level must be numeric") if water_level < 0: raise ValueError("water_level must be >= 0") data = hand_agg.data if has_cuda_and_cupy() and is_dask_cupy(hand_agg): out = _flood_depth_dask_cupy(data, water_level) elif has_cuda_and_cupy() and is_cupy_array(data): out = _flood_depth_cupy(data, water_level) elif da is not None and isinstance(data, da.Array): out = _flood_depth_dask(data, water_level) elif isinstance(data, np.ndarray): out = _flood_depth_numpy(data, water_level) else: raise TypeError(f"Unsupported array type: {type(data)}") return xr.DataArray(out, name=name, coords=hand_agg.coords, dims=hand_agg.dims, attrs=hand_agg.attrs)
def _flood_depth_numpy(hand, wl): hand = np.asarray(hand, dtype=np.float64) depth = np.where(hand <= wl, wl - hand, np.nan) # preserve input NaN depth = np.where(np.isnan(hand), np.nan, depth) return depth def _flood_depth_cupy(hand, wl): import cupy as cp hand = cp.asarray(hand, dtype=cp.float64) depth = cp.where(hand <= wl, wl - hand, cp.nan) depth = cp.where(cp.isnan(hand), cp.nan, depth) return depth def _flood_depth_dask(hand, wl): import dask.array as _da depth = _da.where(hand <= wl, wl - hand, np.nan) depth = _da.where(_da.isnan(hand), np.nan, depth) return depth def _flood_depth_dask_cupy(hand, wl): import cupy as cp hand_np = hand.map_blocks( lambda b: b.get(), dtype=hand.dtype, meta=np.array((), dtype=hand.dtype), ) result = _flood_depth_dask(hand_np, wl) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), ) # --------------------------------------------------------------------------- # inundation # ---------------------------------------------------------------------------
[docs] def inundation( hand_agg: xr.DataArray, water_level: float, name: str = 'inundation', ) -> xr.DataArray: """Create a binary inundation mask from a HAND raster. Returns ``1.0`` where ``HAND <= water_level``, ``0.0`` elsewhere. ``NaN`` cells in the input remain ``NaN``. Parameters ---------- hand_agg : xarray.DataArray 2D HAND raster (output of ``hand()``). water_level : float Water surface elevation above the drainage network (>= 0). name : str, default 'inundation' Name of the output DataArray. Returns ------- xarray.DataArray 2D float64 binary mask. """ _validate_raster(hand_agg, func_name='inundation', name='hand_agg') if not isinstance(water_level, (int, float)): raise TypeError("water_level must be numeric") if water_level < 0: raise ValueError("water_level must be >= 0") data = hand_agg.data if has_cuda_and_cupy() and is_dask_cupy(hand_agg): out = _inundation_dask_cupy(data, water_level) elif has_cuda_and_cupy() and is_cupy_array(data): out = _inundation_cupy(data, water_level) elif da is not None and isinstance(data, da.Array): out = _inundation_dask(data, water_level) elif isinstance(data, np.ndarray): out = _inundation_numpy(data, water_level) else: raise TypeError(f"Unsupported array type: {type(data)}") return xr.DataArray(out, name=name, coords=hand_agg.coords, dims=hand_agg.dims, attrs=hand_agg.attrs)
def _inundation_numpy(hand, wl): hand = np.asarray(hand, dtype=np.float64) mask = np.where(hand <= wl, 1.0, 0.0) mask = np.where(np.isnan(hand), np.nan, mask) return mask def _inundation_cupy(hand, wl): import cupy as cp hand = cp.asarray(hand, dtype=cp.float64) mask = cp.where(hand <= wl, 1.0, 0.0) mask = cp.where(cp.isnan(hand), cp.nan, mask) return mask def _inundation_dask(hand, wl): import dask.array as _da mask = _da.where(hand <= wl, 1.0, 0.0) mask = _da.where(_da.isnan(hand), np.nan, mask) return mask def _inundation_dask_cupy(hand, wl): import cupy as cp hand_np = hand.map_blocks( lambda b: b.get(), dtype=hand.dtype, meta=np.array((), dtype=hand.dtype), ) result = _inundation_dask(hand_np, wl) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), ) # --------------------------------------------------------------------------- # curve_number_runoff # ---------------------------------------------------------------------------
[docs] def curve_number_runoff( rainfall: xr.DataArray, curve_number: Union[float, xr.DataArray], name: str = 'curve_number_runoff', ) -> xr.DataArray: """Estimate runoff depth using the SCS/NRCS curve number method. Computes:: S = (25400 / CN) - 254 Ia = 0.2 * S Q = (P - Ia)^2 / (P + 0.8 * S) where P > Ia Q = 0 where P <= Ia Parameters ---------- rainfall : xarray.DataArray 2D rainfall depth raster in millimetres (>= 0). curve_number : float or xarray.DataArray SCS curve number, range (0, 100]. A scalar applies uniformly; a DataArray allows spatially varying CN. name : str, default 'curve_number_runoff' Name of the output DataArray. Returns ------- xarray.DataArray 2D float64 runoff depth in millimetres. """ _validate_raster(rainfall, func_name='curve_number_runoff', name='rainfall') if isinstance(curve_number, xr.DataArray): cn_data = curve_number.data elif isinstance(curve_number, (int, float)): if curve_number <= 0 or curve_number > 100: raise ValueError("curve_number must be in (0, 100]") cn_data = float(curve_number) else: raise TypeError("curve_number must be numeric or an xarray.DataArray") data = rainfall.data if has_cuda_and_cupy() and is_dask_cupy(rainfall): out = _cn_runoff_dask_cupy(data, cn_data) elif has_cuda_and_cupy() and is_cupy_array(data): out = _cn_runoff_cupy(data, cn_data) elif da is not None and isinstance(data, da.Array): out = _cn_runoff_dask(data, cn_data) elif isinstance(data, np.ndarray): out = _cn_runoff_numpy(data, cn_data) else: raise TypeError(f"Unsupported array type: {type(data)}") return xr.DataArray(out, name=name, coords=rainfall.coords, dims=rainfall.dims, attrs=rainfall.attrs)
def _cn_runoff_numpy(p, cn): p = np.asarray(p, dtype=np.float64) cn = np.asarray(cn, dtype=np.float64) s = (25400.0 / cn) - 254.0 ia = 0.2 * s q = np.where(p > ia, (p - ia) ** 2 / (p + 0.8 * s), 0.0) # propagate NaN from rainfall or curve number q = np.where(np.isnan(p) | np.isnan(cn), np.nan, q) return q def _cn_runoff_cupy(p, cn): import cupy as cp p = cp.asarray(p, dtype=cp.float64) cn = cp.asarray(cn, dtype=cp.float64) s = (25400.0 / cn) - 254.0 ia = 0.2 * s q = cp.where(p > ia, (p - ia) ** 2 / (p + 0.8 * s), 0.0) q = cp.where(cp.isnan(p) | cp.isnan(cn), cp.nan, q) return q def _cn_runoff_dask(p, cn): import dask.array as _da s = (25400.0 / cn) - 254.0 ia = 0.2 * s q = _da.where(p > ia, (p - ia) ** 2 / (p + 0.8 * s), 0.0) q = _da.where(_da.isnan(p) | _da.isnan(cn), np.nan, q) return q def _cn_runoff_dask_cupy(p, cn): import cupy as cp p_np = p.map_blocks( lambda b: b.get(), dtype=p.dtype, meta=np.array((), dtype=p.dtype), ) if hasattr(cn, 'map_blocks'): cn_np = cn.map_blocks( lambda b: b.get(), dtype=cn.dtype, meta=np.array((), dtype=cn.dtype), ) else: cn_np = cn result = _cn_runoff_dask(p_np, cn_np) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), ) # --------------------------------------------------------------------------- # travel_time # ---------------------------------------------------------------------------
[docs] def travel_time( flow_length_agg: xr.DataArray, slope_agg: xr.DataArray, mannings_n: Union[float, xr.DataArray], name: str = 'travel_time', ) -> xr.DataArray: """Estimate overland flow travel time via simplified Manning's equation. Velocity is estimated as ``v = (1/n) * sqrt(tan(slope))`` and travel time as ``flow_length / v``. Near-zero slopes are clamped to ``tan(0.001 deg)`` (same as TWI) to avoid division by zero. The *time of concentration* for a catchment is simply ``travel_time(...).max()``. Parameters ---------- flow_length_agg : xarray.DataArray 2D flow length raster (output of ``flow_length()``). slope_agg : xarray.DataArray 2D slope raster in degrees. mannings_n : float or xarray.DataArray Manning's roughness coefficient (> 0). A scalar applies uniformly; a DataArray allows spatially varying roughness. name : str, default 'travel_time' Name of the output DataArray. Returns ------- xarray.DataArray 2D float64 travel time grid (same time units as flow_length distance units and velocity units, typically seconds when flow_length is in metres). """ _validate_raster(flow_length_agg, func_name='travel_time', name='flow_length_agg') _validate_raster(slope_agg, func_name='travel_time', name='slope_agg') if isinstance(mannings_n, xr.DataArray): _validate_mannings_n_dataarray(mannings_n) n_data = mannings_n.data elif isinstance(mannings_n, (int, float)): if mannings_n <= 0: raise ValueError("mannings_n must be > 0") n_data = float(mannings_n) else: raise TypeError( "mannings_n must be numeric or an xarray.DataArray") fl_data = flow_length_agg.data sl_data = slope_agg.data if has_cuda_and_cupy() and is_dask_cupy(flow_length_agg): out = _travel_time_dask_cupy(fl_data, sl_data, n_data) elif has_cuda_and_cupy() and is_cupy_array(fl_data): out = _travel_time_cupy(fl_data, sl_data, n_data) elif da is not None and isinstance(fl_data, da.Array): out = _travel_time_dask(fl_data, sl_data, n_data) elif isinstance(fl_data, np.ndarray): out = _travel_time_numpy(fl_data, sl_data, n_data) else: raise TypeError(f"Unsupported array type: {type(fl_data)}") return xr.DataArray(out, name=name, coords=flow_length_agg.coords, dims=flow_length_agg.dims, attrs=flow_length_agg.attrs)
def _travel_time_numpy(fl, sl, n): fl = np.asarray(fl, dtype=np.float64) sl = np.asarray(sl, dtype=np.float64) n = np.asarray(n, dtype=np.float64) tan_slope = np.tan(np.radians(sl)) tan_slope = np.where(tan_slope < _TAN_MIN, _TAN_MIN, tan_slope) velocity = (1.0 / n) * np.sqrt(tan_slope) return fl / velocity def _travel_time_cupy(fl, sl, n): import cupy as cp fl = cp.asarray(fl, dtype=cp.float64) sl = cp.asarray(sl, dtype=cp.float64) n = cp.asarray(n, dtype=cp.float64) tan_slope = cp.tan(cp.radians(sl)) tan_slope = cp.where(tan_slope < _TAN_MIN, _TAN_MIN, tan_slope) velocity = (1.0 / n) * cp.sqrt(tan_slope) return fl / velocity def _travel_time_dask(fl, sl, n): import dask.array as _da tan_slope = _da.tan(_da.radians(sl)) tan_slope = _da.where(tan_slope < _TAN_MIN, _TAN_MIN, tan_slope) velocity = (1.0 / n) * _da.sqrt(tan_slope) return fl / velocity def _travel_time_dask_cupy(fl, sl, n): import cupy as cp fl_np = fl.map_blocks( lambda b: b.get(), dtype=fl.dtype, meta=np.array((), dtype=fl.dtype), ) sl_np = sl.map_blocks( lambda b: b.get(), dtype=sl.dtype, meta=np.array((), dtype=sl.dtype), ) if hasattr(n, 'map_blocks'): n_np = n.map_blocks( lambda b: b.get(), dtype=n.dtype, meta=np.array((), dtype=n.dtype), ) else: n_np = n result = _travel_time_dask(fl_np, sl_np, n_np) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), ) # --------------------------------------------------------------------------- # vegetation_roughness # --------------------------------------------------------------------------- def vegetation_roughness( vegetation_agg: xr.DataArray, mode: str = 'nlcd', lookup: dict | None = None, name: str = 'mannings_n', ) -> xr.DataArray: """Derive Manning's n roughness from land cover or NDVI. Parameters ---------- vegetation_agg : xarray.DataArray 2D raster. For ``mode='nlcd'`` this should contain integer NLCD land cover codes. For ``mode='ndvi'`` this should contain continuous NDVI values (typically -1 to 1). mode : {'nlcd', 'ndvi'}, default 'nlcd' How to convert vegetation data to roughness. * ``'nlcd'``: categorical lookup from NLCD codes to Manning's n. * ``'ndvi'``: piecewise linear interpolation of NDVI to Manning's n. lookup : dict, optional Override the default NLCD-to-n mapping. Keys are integer NLCD codes, values are Manning's n floats. Only used when ``mode='nlcd'``. name : str, default 'mannings_n' Name of the output DataArray. Returns ------- xarray.DataArray 2D float64 Manning's n raster. Unrecognized NLCD codes and NaN inputs map to NaN. """ _validate_raster(vegetation_agg, func_name='vegetation_roughness', name='vegetation_agg') if mode not in ('nlcd', 'ndvi'): raise ValueError(f"mode must be 'nlcd' or 'ndvi', got {mode!r}") data = vegetation_agg.data if mode == 'nlcd': lut_dict = dict(NLCD_MANNINGS_N) if lookup is not None: lut_dict.update(lookup) max_code = max(lut_dict) lut = np.full(max_code + 1, np.nan, dtype=np.float64) for code, val in lut_dict.items(): lut[code] = val if has_cuda_and_cupy() and is_dask_cupy(vegetation_agg): out = _veg_roughness_nlcd_dask_cupy(data, lut) elif has_cuda_and_cupy() and is_cupy_array(data): out = _veg_roughness_nlcd_cupy(data, lut) elif da is not None and isinstance(data, da.Array): out = _veg_roughness_nlcd_dask(data, lut) elif isinstance(data, np.ndarray): out = _veg_roughness_nlcd_numpy(data, lut) else: raise TypeError(f"Unsupported array type: {type(data)}") else: if has_cuda_and_cupy() and is_dask_cupy(vegetation_agg): out = _veg_roughness_ndvi_dask_cupy(data) elif has_cuda_and_cupy() and is_cupy_array(data): out = _veg_roughness_ndvi_cupy(data) elif da is not None and isinstance(data, da.Array): out = _veg_roughness_ndvi_dask(data) elif isinstance(data, np.ndarray): out = _veg_roughness_ndvi_numpy(data) else: raise TypeError(f"Unsupported array type: {type(data)}") return xr.DataArray(out, name=name, coords=vegetation_agg.coords, dims=vegetation_agg.dims, attrs=vegetation_agg.attrs) def _veg_roughness_nlcd_numpy(data, lut): data = np.asarray(data) out = np.full(data.shape, np.nan, dtype=np.float64) not_nan = ~np.isnan(data) if data.dtype.kind == 'f' else True mask = (data >= 0) & (data < len(lut)) & not_nan idx = data[mask].astype(np.intp) out[mask] = lut[idx] return out def _veg_roughness_nlcd_cupy(data, lut): import cupy as cp lut_gpu = cp.asarray(lut) data = cp.asarray(data) out = cp.full(data.shape, cp.nan, dtype=cp.float64) not_nan = ~cp.isnan(data) if data.dtype.kind == 'f' else True mask = (data >= 0) & (data < len(lut_gpu)) & not_nan idx = data[mask].astype(cp.intp) out[mask] = lut_gpu[idx] return out def _veg_roughness_nlcd_dask(data, lut): import dask.array as _da def _apply_lut(block, lut=lut): return _veg_roughness_nlcd_numpy(block, lut) return _da.map_blocks( _apply_lut, data, dtype=np.float64, meta=np.array((), dtype=np.float64), ) def _veg_roughness_nlcd_dask_cupy(data, lut): import cupy as cp data_np = data.map_blocks( lambda b: b.get(), dtype=data.dtype, meta=np.array((), dtype=data.dtype), ) result = _veg_roughness_nlcd_dask(data_np, lut) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), ) def _veg_roughness_ndvi_numpy(data): data = np.asarray(data, dtype=np.float64) out = np.interp(data, _NDVI_BREAKPOINTS, _NDVI_MANNINGS_N) out = np.where(np.isnan(data), np.nan, out) return out def _veg_roughness_ndvi_cupy(data): import cupy as cp data = cp.asarray(data, dtype=cp.float64) bp = cp.asarray(_NDVI_BREAKPOINTS) mn = cp.asarray(_NDVI_MANNINGS_N) out = cp.interp(data.ravel(), bp, mn).reshape(data.shape) out = cp.where(cp.isnan(data), cp.nan, out) return out def _veg_roughness_ndvi_dask(data): import dask.array as _da def _apply(block): return _veg_roughness_ndvi_numpy(block) return _da.map_blocks( _apply, data, dtype=np.float64, meta=np.array((), dtype=np.float64), ) def _veg_roughness_ndvi_dask_cupy(data): import cupy as cp data_np = data.map_blocks( lambda b: b.get(), dtype=data.dtype, meta=np.array((), dtype=data.dtype), ) result = _veg_roughness_ndvi_dask(data_np) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), ) # --------------------------------------------------------------------------- # vegetation_curve_number # --------------------------------------------------------------------------- def vegetation_curve_number( landcover_agg: xr.DataArray, soil_group_agg: xr.DataArray, lookup: dict | None = None, name: str = 'curve_number', ) -> xr.DataArray: """Derive SCS curve number from NLCD land cover and hydrologic soil group. Parameters ---------- landcover_agg : xarray.DataArray 2D integer raster of NLCD land cover codes. soil_group_agg : xarray.DataArray 2D integer raster of hydrologic soil groups (1=A, 2=B, 3=C, 4=D). lookup : dict, optional Override the default ``(nlcd_code, soil_group) -> CN`` mapping. name : str, default 'curve_number' Name of the output DataArray. Returns ------- xarray.DataArray 2D float64 curve number raster. Unknown ``(code, group)`` pairs and NaN inputs map to NaN. """ _validate_raster(landcover_agg, func_name='vegetation_curve_number', name='landcover_agg') _validate_raster(soil_group_agg, func_name='vegetation_curve_number', name='soil_group_agg') lut_dict = dict(NLCD_CURVE_NUMBER) if lookup is not None: lut_dict.update(lookup) max_code = max(k[0] for k in lut_dict) max_sg = max(k[1] for k in lut_dict) lut = np.full((max_code + 1, max_sg + 1), np.nan, dtype=np.float64) for (code, sg), cn in lut_dict.items(): lut[code, sg] = cn lc_data = landcover_agg.data sg_data = soil_group_agg.data if has_cuda_and_cupy() and is_dask_cupy(landcover_agg): out = _veg_cn_dask_cupy(lc_data, sg_data, lut) elif has_cuda_and_cupy() and is_cupy_array(lc_data): out = _veg_cn_cupy(lc_data, sg_data, lut) elif da is not None and isinstance(lc_data, da.Array): out = _veg_cn_dask(lc_data, sg_data, lut) elif isinstance(lc_data, np.ndarray): out = _veg_cn_numpy(lc_data, sg_data, lut) else: raise TypeError(f"Unsupported array type: {type(lc_data)}") return xr.DataArray(out, name=name, coords=landcover_agg.coords, dims=landcover_agg.dims, attrs=landcover_agg.attrs) def _veg_cn_numpy(lc, sg, lut): lc = np.asarray(lc) sg = np.asarray(sg) max_code, max_sg = lut.shape out = np.full(lc.shape, np.nan, dtype=np.float64) lc_ok = ~np.isnan(lc) if lc.dtype.kind == 'f' else True sg_ok = ~np.isnan(sg) if sg.dtype.kind == 'f' else True mask = ( (lc >= 0) & (lc < max_code) & (sg >= 0) & (sg < max_sg) & lc_ok & sg_ok ) lc_idx = lc[mask].astype(np.intp) sg_idx = sg[mask].astype(np.intp) out[mask] = lut[lc_idx, sg_idx] return out def _veg_cn_cupy(lc, sg, lut): import cupy as cp lut_gpu = cp.asarray(lut) lc = cp.asarray(lc) sg = cp.asarray(sg) max_code, max_sg = lut_gpu.shape out = cp.full(lc.shape, cp.nan, dtype=cp.float64) lc_ok = ~cp.isnan(lc) if lc.dtype.kind == 'f' else True sg_ok = ~cp.isnan(sg) if sg.dtype.kind == 'f' else True mask = ( (lc >= 0) & (lc < max_code) & (sg >= 0) & (sg < max_sg) & lc_ok & sg_ok ) lc_idx = lc[mask].astype(cp.intp) sg_idx = sg[mask].astype(cp.intp) out[mask] = lut_gpu[lc_idx, sg_idx] return out def _veg_cn_dask(lc, sg, lut): import dask.array as _da def _apply(lc_block, sg_block, lut=lut): return _veg_cn_numpy(lc_block, sg_block, lut) return _da.map_blocks( _apply, lc, sg, dtype=np.float64, meta=np.array((), dtype=np.float64), ) def _veg_cn_dask_cupy(lc, sg, lut): import cupy as cp lc_np = lc.map_blocks( lambda b: b.get(), dtype=lc.dtype, meta=np.array((), dtype=lc.dtype), ) sg_np = sg.map_blocks( lambda b: b.get(), dtype=sg.dtype, meta=np.array((), dtype=sg.dtype), ) result = _veg_cn_dask(lc_np, sg_np, lut) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), ) # --------------------------------------------------------------------------- # flood_depth_vegetation # --------------------------------------------------------------------------- def flood_depth_vegetation( hand_agg: xr.DataArray, slope_agg: xr.DataArray, mannings_n: Union[float, xr.DataArray], unit_discharge: float, name: str = 'flood_depth_vegetation', ) -> xr.DataArray: """Compute vegetation-adjusted flood depth via Manning's normal depth. For each cell, the equilibrium (normal) depth for steady uniform flow is:: h = (q * n / sqrt(tan(slope)))^(3/5) where *q* is unit discharge (m^2/s), *n* is Manning's roughness, and *slope* is terrain slope in degrees. The flood depth is ``h - HAND`` where ``h > HAND``, and NaN elsewhere. Higher roughness (denser vegetation) produces deeper, slower water. Parameters ---------- hand_agg : xarray.DataArray 2D HAND raster (output of ``hand()``). slope_agg : xarray.DataArray 2D slope raster in degrees. mannings_n : float or xarray.DataArray Manning's roughness coefficient. A scalar applies uniformly; a DataArray (e.g. output of ``vegetation_roughness()``) allows spatially varying roughness. unit_discharge : float Unit discharge in m^2/s (discharge per unit width, > 0). name : str, default 'flood_depth_vegetation' Name of the output DataArray. Returns ------- xarray.DataArray 2D float64 flood depth grid. NaN where not inundated or where inputs are NaN. """ _validate_raster(hand_agg, func_name='flood_depth_vegetation', name='hand_agg') _validate_raster(slope_agg, func_name='flood_depth_vegetation', name='slope_agg') if not isinstance(unit_discharge, (int, float)): raise TypeError("unit_discharge must be numeric") if unit_discharge <= 0: raise ValueError("unit_discharge must be > 0") if isinstance(mannings_n, xr.DataArray): _validate_mannings_n_dataarray(mannings_n) n_data = mannings_n.data elif isinstance(mannings_n, (int, float)): if mannings_n <= 0: raise ValueError("mannings_n must be > 0") n_data = float(mannings_n) else: raise TypeError( "mannings_n must be numeric or an xarray.DataArray") h_data = hand_agg.data sl_data = slope_agg.data q = float(unit_discharge) if has_cuda_and_cupy() and is_dask_cupy(hand_agg): out = _fdv_dask_cupy(h_data, sl_data, n_data, q) elif has_cuda_and_cupy() and is_cupy_array(h_data): out = _fdv_cupy(h_data, sl_data, n_data, q) elif da is not None and isinstance(h_data, da.Array): out = _fdv_dask(h_data, sl_data, n_data, q) elif isinstance(h_data, np.ndarray): out = _fdv_numpy(h_data, sl_data, n_data, q) else: raise TypeError(f"Unsupported array type: {type(h_data)}") return xr.DataArray(out, name=name, coords=hand_agg.coords, dims=hand_agg.dims, attrs=hand_agg.attrs) def _fdv_numpy(hand, sl, n, q): hand = np.asarray(hand, dtype=np.float64) sl = np.asarray(sl, dtype=np.float64) n = np.asarray(n, dtype=np.float64) tan_s = np.tan(np.radians(sl)) tan_s = np.where(tan_s < _TAN_MIN, _TAN_MIN, tan_s) h_normal = (q * n / np.sqrt(tan_s)) ** 0.6 depth = np.where(h_normal > hand, h_normal - hand, np.nan) # propagate NaN from any input any_nan = np.isnan(hand) | np.isnan(sl) | np.isnan(n) depth = np.where(any_nan, np.nan, depth) return depth def _fdv_cupy(hand, sl, n, q): import cupy as cp hand = cp.asarray(hand, dtype=cp.float64) sl = cp.asarray(sl, dtype=cp.float64) n = cp.asarray(n, dtype=cp.float64) tan_s = cp.tan(cp.radians(sl)) tan_s = cp.where(tan_s < _TAN_MIN, _TAN_MIN, tan_s) h_normal = (q * n / cp.sqrt(tan_s)) ** 0.6 depth = cp.where(h_normal > hand, h_normal - hand, cp.nan) any_nan = cp.isnan(hand) | cp.isnan(sl) | cp.isnan(n) depth = cp.where(any_nan, cp.nan, depth) return depth def _fdv_dask(hand, sl, n, q): import dask.array as _da tan_s = _da.tan(_da.radians(sl)) tan_s = _da.where(tan_s < _TAN_MIN, _TAN_MIN, tan_s) h_normal = (q * n / _da.sqrt(tan_s)) ** 0.6 depth = _da.where(h_normal > hand, h_normal - hand, np.nan) any_nan = _da.isnan(hand) | _da.isnan(sl) | _da.isnan(n) depth = _da.where(any_nan, np.nan, depth) return depth def _fdv_dask_cupy(hand, sl, n, q): import cupy as cp hand_np = hand.map_blocks( lambda b: b.get(), dtype=hand.dtype, meta=np.array((), dtype=hand.dtype), ) sl_np = sl.map_blocks( lambda b: b.get(), dtype=sl.dtype, meta=np.array((), dtype=sl.dtype), ) if hasattr(n, 'map_blocks'): n_np = n.map_blocks( lambda b: b.get(), dtype=n.dtype, meta=np.array((), dtype=n.dtype), ) else: n_np = n result = _fdv_dask(hand_np, sl_np, n_np, q) return result.map_blocks( cp.asarray, dtype=result.dtype, meta=cp.array((), dtype=result.dtype), )