Source code for xrspatial.fire

"""Fire behavior, burn severity, and fire danger functions.

Provides per-cell raster operations for wildfire analysis:

Burn severity
    - :func:`dnbr` -- differenced Normalized Burn Ratio
    - :func:`rdnbr` -- relative differenced Normalized Burn Ratio
    - :func:`burn_severity_class` -- USGS 7-class burn severity

Fire behavior
    - :func:`fireline_intensity` -- Byram's fireline intensity (kW/m)
    - :func:`flame_length` -- flame length from intensity (m)
    - :func:`rate_of_spread` -- simplified Rothermel spread rate (m/min)

Fire danger
    - :func:`kbdi` -- Keetch-Byram Drought Index (single time-step)
"""
from __future__ import annotations

from math import exp, log, sqrt, tan

import numpy as np
import xarray as xr
from numba import cuda

from xrspatial.dataset_support import supports_dataset
from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    _validate_raster,
    _validate_scalar,
    cuda_args,
    ngjit,
    validate_arrays,
)

# 3rd-party (optional)
try:
    import cupy
except ImportError:
    class cupy:
        ndarray = False

try:
    import dask.array as da
except ImportError:
    da = None


# ============================================================================
# Anderson 13 fuel model table (metric units)
# ============================================================================
# Columns: fuel_load (kg/m^2), SAV (1/m), bed_depth (m), moisture_ext
#           (fraction), heat_content (kJ/kg), S_T, S_e, rho_p (kg/m^3)
#
# Converted from Anderson (1982) "Aids to Determining Fuel Models for
# Estimating Fire Behavior", USDA Forest Service Gen. Tech. Rep. INT-122.
# Original imperial values converted: tons/acre -> kg/m^2 (* 0.22417),
# ft -> m (* 0.3048), Btu/lb -> kJ/kg (* 2.326).
ANDERSON_13 = np.array([
    # Model  1: short grass (1 ft)
    [0.1662, 11483., 0.3048, 0.12, 18608., 0.0555, 0.010, 513.],
    # Model  2: timber (grass and understory)
    [0.8983, 9186., 0.3048, 0.15, 18608., 0.0555, 0.010, 513.],
    # Model  3: tall grass (2.5 ft)
    [0.6730, 4921., 0.7620, 0.25, 18608., 0.0555, 0.010, 513.],
    # Model  4: chaparral (6 ft)
    [2.6810, 6562., 1.8288, 0.20, 18608., 0.0555, 0.010, 513.],
    # Model  5: brush (2 ft)
    [0.2242, 6562., 0.6096, 0.20, 18608., 0.0555, 0.010, 513.],
    # Model  6: dormant brush / hardwood slash
    [0.3362, 5741., 0.7620, 0.25, 18608., 0.0555, 0.010, 513.],
    # Model  7: southern rough
    [0.2578, 5741., 0.7620, 0.40, 18608., 0.0555, 0.010, 513.],
    # Model  8: closed timber litter
    [0.2242, 6562., 0.0610, 0.30, 18608., 0.0555, 0.010, 513.],
    # Model  9: hardwood litter
    [0.1541, 8202., 0.0610, 0.25, 18608., 0.0555, 0.010, 513.],
    # Model 10: timber (litter and understory)
    [0.6730, 6562., 0.3048, 0.25, 18608., 0.0555, 0.010, 513.],
    # Model 11: light logging slash
    [0.3362, 4921., 0.3048, 0.15, 18608., 0.0555, 0.010, 513.],
    # Model 12: medium logging slash
    [0.8983, 4921., 0.7010, 0.20, 18608., 0.0555, 0.010, 513.],
    # Model 13: heavy logging slash
    [1.5714, 4921., 0.9144, 0.25, 18608., 0.0555, 0.010, 513.],
], dtype=np.float64)


# ============================================================================
# Rothermel pre-computation (scalar fuel-model params -> derived constants)
# ============================================================================
def _rothermel_fuel_constants(fuel_model: int):
    """Pre-compute Rothermel constants from an Anderson-13 fuel model row.

    Returns a tuple of 12 float64 scalars that get passed into the per-pixel
    kernel so only moisture/wind/slope vary per cell.
    """
    row = ANDERSON_13[fuel_model - 1]
    w_0 = row[0]       # fuel load, kg/m^2
    sigma = row[1]      # SAV ratio, 1/m
    delta = row[2]      # bed depth, m
    M_x = row[3]        # moisture of extinction, fraction
    h = row[4]          # heat content, kJ/kg
    S_T = row[5]        # total mineral content
    S_e = row[6]        # effective mineral content
    rho_p = row[7]      # oven-dry particle density, kg/m^3

    # packing ratio
    rho_b = w_0 / delta if delta > 0 else 0.0
    beta = rho_b / rho_p if rho_p > 0 else 0.0
    beta_op = 3.348 * sigma ** (-0.8189)
    ratio = beta / beta_op if beta_op > 0 else 0.0

    # maximum reaction velocity
    A = 133.0 * sigma ** (-0.7913)
    Gamma_max = sigma ** 1.5 / (495.0 + 0.0594 * sigma ** 1.5)
    Gamma = Gamma_max * (ratio ** A) * exp(A * (1.0 - ratio))

    # mineral damping
    eta_s = max(0.174 * S_e ** (-0.19), 0.0)
    if eta_s > 1.0:
        eta_s = 1.0

    # propagating flux ratio
    xi = exp((0.792 + 0.681 * sigma ** 0.5) * (beta + 0.1)) / \
        (192.0 + 0.2595 * sigma)

    # heat of pre-ignition
    epsilon = exp(-138.0 / sigma) if sigma > 0 else 0.0

    # wind factor exponents
    C_w = 7.47 * exp(-0.133 * sigma ** 0.55)
    B_w = 0.02526 * sigma ** 0.54
    E_w = 0.715 * exp(-3.59e-4 * sigma)

    return (w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon,
            C_w, B_w, E_w)


# ============================================================================
# dNBR
# ============================================================================
@ngjit
def _dnbr_cpu(pre_data, post_data):
    out = np.full(pre_data.shape, np.nan, dtype=np.float32)
    rows, cols = pre_data.shape
    for y in range(rows):
        for x in range(cols):
            pre = pre_data[y, x]
            post = post_data[y, x]
            if pre != pre or post != post:
                continue
            out[y, x] = pre - post
    return out


@cuda.jit
def _dnbr_gpu(pre_data, post_data, out):
    y, x = cuda.grid(2)
    if y < out.shape[0] and x < out.shape[1]:
        pre = pre_data[y, x]
        post = post_data[y, x]
        if pre == pre and post == post:
            out[y, x] = pre - post


def _dnbr_dask(pre_data, post_data):
    return da.map_blocks(_dnbr_cpu, pre_data, post_data, meta=np.array(()))


def _dnbr_cupy(pre_data, post_data):
    griddim, blockdim = cuda_args(pre_data.shape)
    out = cupy.empty(pre_data.shape, dtype='f4')
    out[:] = cupy.nan
    _dnbr_gpu[griddim, blockdim](pre_data, post_data, out)
    return out


def _dnbr_dask_cupy(pre_data, post_data):
    return da.map_blocks(_dnbr_cupy, pre_data, post_data,
                         dtype=cupy.float32, meta=cupy.array(()))


[docs] def dnbr(pre_nbr_agg: xr.DataArray, post_nbr_agg: xr.DataArray, name: str = 'dnbr') -> xr.DataArray: """Differenced Normalized Burn Ratio (dNBR). Computes ``pre_nbr - post_nbr``. Higher values indicate greater burn severity. Parameters ---------- pre_nbr_agg : xr.DataArray Pre-fire NBR values. post_nbr_agg : xr.DataArray Post-fire NBR values. name : str, default='dnbr' Name of output DataArray. Returns ------- xr.DataArray dNBR values (float32). """ _validate_raster(pre_nbr_agg, func_name='dnbr', name='pre_nbr_agg') _validate_raster(post_nbr_agg, func_name='dnbr', name='post_nbr_agg') validate_arrays(pre_nbr_agg, post_nbr_agg) mapper = ArrayTypeFunctionMapping( numpy_func=_dnbr_cpu, cupy_func=_dnbr_cupy, dask_func=_dnbr_dask, dask_cupy_func=_dnbr_dask_cupy, ) out = mapper(pre_nbr_agg)( pre_nbr_agg.data.astype('f4'), post_nbr_agg.data.astype('f4'), ) return xr.DataArray(out, name=name, coords=pre_nbr_agg.coords, dims=pre_nbr_agg.dims, attrs=pre_nbr_agg.attrs)
# ============================================================================ # RdNBR # ============================================================================ @ngjit def _rdnbr_cpu(dnbr_data, pre_data): out = np.full(dnbr_data.shape, np.nan, dtype=np.float32) rows, cols = dnbr_data.shape for y in range(rows): for x in range(cols): d = dnbr_data[y, x] p = pre_data[y, x] if d != d or p != p: continue denom = abs(p / 1000.0) if denom < 1e-10: continue out[y, x] = d / sqrt(denom) return out @cuda.jit def _rdnbr_gpu(dnbr_data, pre_data, out): y, x = cuda.grid(2) if y < out.shape[0] and x < out.shape[1]: d = dnbr_data[y, x] p = pre_data[y, x] if d == d and p == p: denom = abs(p / 1000.0) if denom >= 1e-10: out[y, x] = d / (denom ** 0.5) def _rdnbr_dask(dnbr_data, pre_data): return da.map_blocks(_rdnbr_cpu, dnbr_data, pre_data, meta=np.array(())) def _rdnbr_cupy(dnbr_data, pre_data): griddim, blockdim = cuda_args(dnbr_data.shape) out = cupy.empty(dnbr_data.shape, dtype='f4') out[:] = cupy.nan _rdnbr_gpu[griddim, blockdim](dnbr_data, pre_data, out) return out def _rdnbr_dask_cupy(dnbr_data, pre_data): return da.map_blocks(_rdnbr_cupy, dnbr_data, pre_data, dtype=cupy.float32, meta=cupy.array(()))
[docs] def rdnbr(dnbr_agg: xr.DataArray, pre_nbr_agg: xr.DataArray, name: str = 'rdnbr') -> xr.DataArray: """Relative differenced Normalized Burn Ratio (RdNBR). Computes ``dNBR / sqrt(abs(pre_NBR / 1000))``. Normalises dNBR by pre-fire vegetation density so that burn severity is comparable across different vegetation types. Parameters ---------- dnbr_agg : xr.DataArray dNBR values (e.g. from :func:`dnbr`). pre_nbr_agg : xr.DataArray Pre-fire NBR values. name : str, default='rdnbr' Name of output DataArray. Returns ------- xr.DataArray RdNBR values (float32). Pixels where ``abs(pre_NBR) < 1e-7`` are set to NaN to avoid division by near-zero. """ _validate_raster(dnbr_agg, func_name='rdnbr', name='dnbr_agg') _validate_raster(pre_nbr_agg, func_name='rdnbr', name='pre_nbr_agg') validate_arrays(dnbr_agg, pre_nbr_agg) mapper = ArrayTypeFunctionMapping( numpy_func=_rdnbr_cpu, cupy_func=_rdnbr_cupy, dask_func=_rdnbr_dask, dask_cupy_func=_rdnbr_dask_cupy, ) out = mapper(dnbr_agg)( dnbr_agg.data.astype('f4'), pre_nbr_agg.data.astype('f4'), ) return xr.DataArray(out, name=name, coords=dnbr_agg.coords, dims=dnbr_agg.dims, attrs=dnbr_agg.attrs)
# ============================================================================ # Burn severity classification (USGS 7-class) # ============================================================================ # Class thresholds from USGS: # 1: Enhanced regrowth (high) dNBR < -0.251 # 2: Enhanced regrowth (low) -0.251 <= dNBR < -0.101 # 3: Unburned -0.101 <= dNBR < 0.099 # 4: Low severity 0.099 <= dNBR < 0.269 # 5: Moderate-low severity 0.269 <= dNBR < 0.439 # 6: Moderate-high severity 0.439 <= dNBR < 0.659 # 7: High severity 0.659 <= dNBR # 0: nodata (NaN input) _BSC_THRESHOLDS = np.array([-0.251, -0.101, 0.099, 0.269, 0.439, 0.659], dtype=np.float64) @ngjit def _bsc_cpu(data): out = np.zeros(data.shape, dtype=np.int8) rows, cols = data.shape for y in range(rows): for x in range(cols): v = data[y, x] if v != v: continue # 0 = nodata if v < -0.251: out[y, x] = 1 elif v < -0.101: out[y, x] = 2 elif v < 0.099: out[y, x] = 3 elif v < 0.269: out[y, x] = 4 elif v < 0.439: out[y, x] = 5 elif v < 0.659: out[y, x] = 6 else: out[y, x] = 7 return out @cuda.jit def _bsc_gpu(data, out): y, x = cuda.grid(2) if y < out.shape[0] and x < out.shape[1]: v = data[y, x] if v == v: if v < -0.251: out[y, x] = 1 elif v < -0.101: out[y, x] = 2 elif v < 0.099: out[y, x] = 3 elif v < 0.269: out[y, x] = 4 elif v < 0.439: out[y, x] = 5 elif v < 0.659: out[y, x] = 6 else: out[y, x] = 7 def _bsc_dask(data): return da.map_blocks(_bsc_cpu, data, dtype=np.int8, meta=np.array(())) def _bsc_cupy(data): griddim, blockdim = cuda_args(data.shape) out = cupy.zeros(data.shape, dtype=np.int8) _bsc_gpu[griddim, blockdim](data, out) return out def _bsc_dask_cupy(data): return da.map_blocks(_bsc_cupy, data, dtype=np.int8, meta=cupy.array(()))
[docs] @supports_dataset def burn_severity_class(dnbr_agg: xr.DataArray, name: str = 'burn_severity_class') -> xr.DataArray: """Classify dNBR into USGS 7-class burn severity. Classes: 1 = enhanced regrowth (high), 2 = enhanced regrowth (low), 3 = unburned, 4 = low severity, 5 = moderate-low, 6 = moderate-high, 7 = high severity. 0 = nodata. Parameters ---------- dnbr_agg : xr.DataArray or xr.Dataset dNBR values (e.g. from :func:`dnbr`). name : str, default='burn_severity_class' Name of output DataArray. Returns ------- xr.DataArray int8 class labels (0-7). """ _validate_raster(dnbr_agg, func_name='burn_severity_class', name='dnbr_agg') mapper = ArrayTypeFunctionMapping( numpy_func=_bsc_cpu, cupy_func=_bsc_cupy, dask_func=_bsc_dask, dask_cupy_func=_bsc_dask_cupy, ) out = mapper(dnbr_agg)(dnbr_agg.data.astype('f4')) return xr.DataArray(out, name=name, coords=dnbr_agg.coords, dims=dnbr_agg.dims, attrs=dnbr_agg.attrs)
# ============================================================================ # Fireline intensity (Byram) # ============================================================================ @ngjit def _fli_cpu(fuel_data, spread_data, heat_content): out = np.full(fuel_data.shape, np.nan, dtype=np.float32) rows, cols = fuel_data.shape for y in range(rows): for x in range(cols): w = fuel_data[y, x] r = spread_data[y, x] if w != w or r != r: continue out[y, x] = heat_content * w * r return out @cuda.jit def _fli_gpu(fuel_data, spread_data, heat_content, out): y, x = cuda.grid(2) if y < out.shape[0] and x < out.shape[1]: w = fuel_data[y, x] r = spread_data[y, x] if w == w and r == r: out[y, x] = heat_content * w * r def _fli_dask(fuel_data, spread_data, heat_content): return da.map_blocks(_fli_cpu, fuel_data, spread_data, heat_content, meta=np.array(())) def _fli_cupy(fuel_data, spread_data, heat_content): griddim, blockdim = cuda_args(fuel_data.shape) out = cupy.empty(fuel_data.shape, dtype='f4') out[:] = cupy.nan _fli_gpu[griddim, blockdim](fuel_data, spread_data, heat_content, out) return out def _fli_dask_cupy(fuel_data, spread_data, heat_content): return da.map_blocks(_fli_cupy, fuel_data, spread_data, heat_content, dtype=cupy.float32, meta=cupy.array(()))
[docs] def fireline_intensity(fuel_consumed_agg: xr.DataArray, spread_rate_agg: xr.DataArray, heat_content: float = 18000.0, name: str = 'fireline_intensity') -> xr.DataArray: """Byram's fireline intensity. ``I = H * w * R`` where *H* is heat content (kJ/kg), *w* is fuel consumed (kg/m^2), and *R* is rate of spread (m/s). Parameters ---------- fuel_consumed_agg : xr.DataArray Fuel consumed per unit area (kg/m^2). spread_rate_agg : xr.DataArray Rate of spread (m/s). heat_content : float, default=18000 Heat content of fuel (kJ/kg). name : str, default='fireline_intensity' Name of output DataArray. Returns ------- xr.DataArray Fireline intensity in kW/m (float32). """ _validate_raster(fuel_consumed_agg, func_name='fireline_intensity', name='fuel_consumed_agg') _validate_raster(spread_rate_agg, func_name='fireline_intensity', name='spread_rate_agg') validate_arrays(fuel_consumed_agg, spread_rate_agg) _validate_scalar(heat_content, func_name='fireline_intensity', name='heat_content', dtype=(int, float), min_val=0) mapper = ArrayTypeFunctionMapping( numpy_func=_fli_cpu, cupy_func=_fli_cupy, dask_func=_fli_dask, dask_cupy_func=_fli_dask_cupy, ) out = mapper(fuel_consumed_agg)( fuel_consumed_agg.data.astype('f4'), spread_rate_agg.data.astype('f4'), float(heat_content), ) return xr.DataArray(out, name=name, coords=fuel_consumed_agg.coords, dims=fuel_consumed_agg.dims, attrs=fuel_consumed_agg.attrs)
# ============================================================================ # Flame length # ============================================================================ @ngjit def _fl_cpu(intensity_data): out = np.full(intensity_data.shape, np.nan, dtype=np.float32) rows, cols = intensity_data.shape for y in range(rows): for x in range(cols): v = intensity_data[y, x] if v != v: continue if v <= 0.0: out[y, x] = 0.0 else: out[y, x] = 0.0775 * v ** 0.46 return out @cuda.jit def _fl_gpu(intensity_data, out): y, x = cuda.grid(2) if y < out.shape[0] and x < out.shape[1]: v = intensity_data[y, x] if v == v: if v <= 0.0: out[y, x] = 0.0 else: out[y, x] = 0.0775 * v ** 0.46 def _fl_dask(intensity_data): return da.map_blocks(_fl_cpu, intensity_data, meta=np.array(())) def _fl_cupy(intensity_data): griddim, blockdim = cuda_args(intensity_data.shape) out = cupy.empty(intensity_data.shape, dtype='f4') out[:] = cupy.nan _fl_gpu[griddim, blockdim](intensity_data, out) return out def _fl_dask_cupy(intensity_data): return da.map_blocks(_fl_cupy, intensity_data, dtype=cupy.float32, meta=cupy.array(()))
[docs] @supports_dataset def flame_length(intensity_agg: xr.DataArray, name: str = 'flame_length') -> xr.DataArray: """Flame length from fireline intensity. Uses Byram's equation: ``L = 0.0775 * I^0.46``. Negative or zero intensity yields zero flame length. Parameters ---------- intensity_agg : xr.DataArray or xr.Dataset Fireline intensity (kW/m). name : str, default='flame_length' Name of output DataArray. Returns ------- xr.DataArray Flame length in metres (float32). """ _validate_raster(intensity_agg, func_name='flame_length', name='intensity_agg') mapper = ArrayTypeFunctionMapping( numpy_func=_fl_cpu, cupy_func=_fl_cupy, dask_func=_fl_dask, dask_cupy_func=_fl_dask_cupy, ) out = mapper(intensity_agg)(intensity_agg.data.astype('f4')) return xr.DataArray(out, name=name, coords=intensity_agg.coords, dims=intensity_agg.dims, attrs=intensity_agg.attrs)
# ============================================================================ # Rate of spread (simplified Rothermel) # ============================================================================ @ngjit def _ros_cpu(slope_data, wind_data, moisture_data, w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon, C_w, B_w, E_w): out = np.full(slope_data.shape, np.nan, dtype=np.float32) rows, cols = slope_data.shape # pre-ignition heat (constant) Q_ig = 250.0 + 1116.0 * 0.0 # moisture term added per-pixel for y in range(rows): for x in range(cols): slp = slope_data[y, x] wind = wind_data[y, x] moist = moisture_data[y, x] if slp != slp or wind != wind or moist != moist: continue # moisture ratio rm = moist / M_x if M_x > 0 else 0.0 if rm < 0.0: rm = 0.0 # moisture damping coefficient eta_M = 1.0 - 2.59 * rm + 5.11 * rm * rm - 3.52 * rm * rm * rm if eta_M < 0.0: eta_M = 0.0 if eta_M > 1.0: eta_M = 1.0 # reaction intensity (kJ/m^2/min) I_R = Gamma * w_0 * h * eta_M * eta_s # wind factor (convert km/h to m/min: * 1000/60) U_mmin = wind * 1000.0 / 60.0 if U_mmin < 0.0: U_mmin = 0.0 phi_w = C_w * U_mmin ** B_w if beta > 0.0: phi_w = phi_w * beta ** (-E_w) # slope factor tan_slp = tan(slp * 3.141592653589793 / 180.0) phi_s = 5.275 * (beta ** (-0.3)) * tan_slp * tan_slp if beta > 0 else 0.0 # heat of pre-ignition Q_ig = 250.0 + 1116.0 * moist # rate of spread (m/min) denom = rho_b * epsilon * Q_ig if denom > 0.0: R = I_R * xi * (1.0 + phi_w + phi_s) / denom else: R = 0.0 if R < 0.0: R = 0.0 out[y, x] = R return out @cuda.jit def _ros_gpu(slope_data, wind_data, moisture_data, w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon, C_w, B_w, E_w, out): y, x = cuda.grid(2) if y < out.shape[0] and x < out.shape[1]: slp = slope_data[y, x] wind = wind_data[y, x] moist = moisture_data[y, x] if slp != slp or wind != wind or moist != moist: return rm = moist / M_x if M_x > 0 else 0.0 if rm < 0.0: rm = 0.0 eta_M = 1.0 - 2.59 * rm + 5.11 * rm * rm - 3.52 * rm * rm * rm if eta_M < 0.0: eta_M = 0.0 if eta_M > 1.0: eta_M = 1.0 I_R = Gamma * w_0 * h * eta_M * eta_s U_mmin = wind * 1000.0 / 60.0 if U_mmin < 0.0: U_mmin = 0.0 phi_w = C_w * U_mmin ** B_w if beta > 0.0: phi_w = phi_w * beta ** (-E_w) tan_slp = tan(slp * 3.141592653589793 / 180.0) phi_s = 5.275 * (beta ** (-0.3)) * tan_slp * tan_slp if beta > 0 else 0.0 Q_ig = 250.0 + 1116.0 * moist denom = rho_b * epsilon * Q_ig if denom > 0.0: R = I_R * xi * (1.0 + phi_w + phi_s) / denom else: R = 0.0 if R < 0.0: R = 0.0 out[y, x] = R def _ros_dask(slope_data, wind_data, moisture_data, w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon, C_w, B_w, E_w): return da.map_blocks( _ros_cpu, slope_data, wind_data, moisture_data, w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon, C_w, B_w, E_w, meta=np.array(()), ) def _ros_cupy(slope_data, wind_data, moisture_data, w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon, C_w, B_w, E_w): griddim, blockdim = cuda_args(slope_data.shape) out = cupy.empty(slope_data.shape, dtype='f4') out[:] = cupy.nan _ros_gpu[griddim, blockdim]( slope_data, wind_data, moisture_data, w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon, C_w, B_w, E_w, out, ) return out def _ros_dask_cupy(slope_data, wind_data, moisture_data, w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon, C_w, B_w, E_w): return da.map_blocks( _ros_cupy, slope_data, wind_data, moisture_data, w_0, h, M_x, beta, rho_b, Gamma, eta_s, xi, epsilon, C_w, B_w, E_w, dtype=cupy.float32, meta=cupy.array(()), )
[docs] def rate_of_spread(slope_agg: xr.DataArray, wind_speed_agg: xr.DataArray, fuel_moisture_agg: xr.DataArray, fuel_model: int = 1, name: str = 'rate_of_spread') -> xr.DataArray: """Rate of spread using a simplified Rothermel model. Uses the Anderson 13 fuel model lookup table and computes per-pixel spread rate from slope, wind speed, and fuel moisture. Parameters ---------- slope_agg : xr.DataArray Terrain slope in degrees. wind_speed_agg : xr.DataArray Mid-flame wind speed in km/h. fuel_moisture_agg : xr.DataArray Dead fuel moisture content as a fraction (0-1). fuel_model : int, default=1 Anderson 13 fuel model number (1-13). name : str, default='rate_of_spread' Name of output DataArray. Returns ------- xr.DataArray Rate of spread in m/min (float32). """ _validate_raster(slope_agg, func_name='rate_of_spread', name='slope_agg') _validate_raster(wind_speed_agg, func_name='rate_of_spread', name='wind_speed_agg') _validate_raster(fuel_moisture_agg, func_name='rate_of_spread', name='fuel_moisture_agg') validate_arrays(slope_agg, wind_speed_agg, fuel_moisture_agg) _validate_scalar(fuel_model, func_name='rate_of_spread', name='fuel_model', dtype=int, min_val=1, max_val=13) constants = _rothermel_fuel_constants(fuel_model) mapper = ArrayTypeFunctionMapping( numpy_func=_ros_cpu, cupy_func=_ros_cupy, dask_func=_ros_dask, dask_cupy_func=_ros_dask_cupy, ) out = mapper(slope_agg)( slope_agg.data.astype('f4'), wind_speed_agg.data.astype('f4'), fuel_moisture_agg.data.astype('f4'), *constants, ) return xr.DataArray(out, name=name, coords=slope_agg.coords, dims=slope_agg.dims, attrs=slope_agg.attrs)
# ============================================================================ # KBDI (Keetch-Byram Drought Index, single time-step) # ============================================================================ @ngjit def _kbdi_cpu(kbdi_prev_data, max_temp_data, precip_data, annual_precip): out = np.full(kbdi_prev_data.shape, np.nan, dtype=np.float32) rows, cols = kbdi_prev_data.shape for y in range(rows): for x in range(cols): Q_prev = kbdi_prev_data[y, x] T = max_temp_data[y, x] P = precip_data[y, x] if Q_prev != Q_prev or T != T or P != P: continue # net precipitation (subtract 5.08 mm canopy interception) net_P = P - 5.08 if net_P < 0.0: net_P = 0.0 # reduce drought index by net precip Q = Q_prev - net_P if Q < 0.0: Q = 0.0 # drought factor (only accumulates when T > 50 F = 10 C) if T > 10.0: # convert Celsius to Fahrenheit for empirical formula T_f = T * 9.0 / 5.0 + 32.0 R = annual_precip # mm # KBDI drought factor (metric form) numer = (800.0 - Q) * (0.968 * exp(0.0486 * T_f) - 8.30) denom = 1.0 + 10.46 * exp(-0.0116 * R) dQ = numer * 0.001 / denom if dQ < 0.0: dQ = 0.0 Q = Q + dQ # clamp to 0-800 if Q < 0.0: Q = 0.0 if Q > 800.0: Q = 800.0 out[y, x] = Q return out @cuda.jit def _kbdi_gpu(kbdi_prev_data, max_temp_data, precip_data, annual_precip, out): y, x = cuda.grid(2) if y < out.shape[0] and x < out.shape[1]: Q_prev = kbdi_prev_data[y, x] T = max_temp_data[y, x] P = precip_data[y, x] if Q_prev != Q_prev or T != T or P != P: return net_P = P - 5.08 if net_P < 0.0: net_P = 0.0 Q = Q_prev - net_P if Q < 0.0: Q = 0.0 if T > 10.0: T_f = T * 9.0 / 5.0 + 32.0 R = annual_precip numer = (800.0 - Q) * (0.968 * exp(0.0486 * T_f) - 8.30) denom = 1.0 + 10.46 * exp(-0.0116 * R) dQ = numer * 0.001 / denom if dQ < 0.0: dQ = 0.0 Q = Q + dQ if Q < 0.0: Q = 0.0 if Q > 800.0: Q = 800.0 out[y, x] = Q def _kbdi_dask(kbdi_prev_data, max_temp_data, precip_data, annual_precip): return da.map_blocks( _kbdi_cpu, kbdi_prev_data, max_temp_data, precip_data, annual_precip, meta=np.array(()), ) def _kbdi_cupy(kbdi_prev_data, max_temp_data, precip_data, annual_precip): griddim, blockdim = cuda_args(kbdi_prev_data.shape) out = cupy.empty(kbdi_prev_data.shape, dtype='f4') out[:] = cupy.nan _kbdi_gpu[griddim, blockdim]( kbdi_prev_data, max_temp_data, precip_data, annual_precip, out, ) return out def _kbdi_dask_cupy(kbdi_prev_data, max_temp_data, precip_data, annual_precip): return da.map_blocks( _kbdi_cupy, kbdi_prev_data, max_temp_data, precip_data, annual_precip, dtype=cupy.float32, meta=cupy.array(()), )
[docs] def kbdi(kbdi_prev_agg: xr.DataArray, max_temp_agg: xr.DataArray, precip_agg: xr.DataArray, annual_precip: float, name: str = 'kbdi') -> xr.DataArray: """Keetch-Byram Drought Index (single time-step update). Advances the KBDI by one day given previous KBDI, daily max temperature, and daily precipitation. Parameters ---------- kbdi_prev_agg : xr.DataArray Previous day's KBDI values (0-800 mm deficit). max_temp_agg : xr.DataArray Daily maximum temperature in degrees Celsius. precip_agg : xr.DataArray Daily precipitation in mm. annual_precip : float Mean annual precipitation in mm (scalar, constant across domain). name : str, default='kbdi' Name of output DataArray. Returns ------- xr.DataArray Updated KBDI values (float32), clamped to 0-800. """ _validate_raster(kbdi_prev_agg, func_name='kbdi', name='kbdi_prev_agg') _validate_raster(max_temp_agg, func_name='kbdi', name='max_temp_agg') _validate_raster(precip_agg, func_name='kbdi', name='precip_agg') validate_arrays(kbdi_prev_agg, max_temp_agg, precip_agg) _validate_scalar(annual_precip, func_name='kbdi', name='annual_precip', dtype=(int, float), min_val=0, min_exclusive=True) mapper = ArrayTypeFunctionMapping( numpy_func=_kbdi_cpu, cupy_func=_kbdi_cupy, dask_func=_kbdi_dask, dask_cupy_func=_kbdi_dask_cupy, ) out = mapper(kbdi_prev_agg)( kbdi_prev_agg.data.astype('f4'), max_temp_agg.data.astype('f4'), precip_agg.data.astype('f4'), float(annual_precip), ) return xr.DataArray(out, name=name, coords=kbdi_prev_agg.coords, dims=kbdi_prev_agg.dims, attrs=kbdi_prev_agg.attrs)