Source code for xrspatial.mcda.sensitivity

"""Sensitivity analysis for MCDA weight stability."""

from __future__ import annotations

import numpy as np
import xarray as xr

from xrspatial.mcda.combine import (
    _check_wpm_positive,
    _validate_criteria,
    _validate_weights,
    wlc,
    wpm,
)
from xrspatial.mcda.standardize import _get_xp
from xrspatial.utils import _dask_task_name_kwargs


[docs] def sensitivity( criteria: xr.Dataset, weights: dict[str, float], method: str = "one_at_a_time", combine_method: str = "wlc", delta: float = 0.05, n_samples: int = 1000, seed: int = 42, name: str = "sensitivity", ) -> xr.DataArray | xr.Dataset: """Assess how weight changes affect the suitability surface. Parameters ---------- criteria : xr.Dataset Standardized criterion layers. weights : dict Base weights (must sum to ~1.0). method : str ``"one_at_a_time"`` or ``"monte_carlo"``. combine_method : str Combination function: ``"wlc"`` or ``"wpm"``. delta : float Perturbation magnitude for one-at-a-time (default 0.05). n_samples : int Number of random weight vectors for Monte Carlo (default 1000). Must be >= 1 when ``method="monte_carlo"``. seed : int Random seed for Monte Carlo reproducibility. name : str Name prefix for output arrays. Returns ------- xr.Dataset or xr.DataArray For ``"one_at_a_time"``: Dataset with per-criterion DataArrays showing the absolute score difference under perturbation. For ``"monte_carlo"``: DataArray of per-pixel coefficient of variation across random weight samples. Dask-backed criteria produce a lazy dask-backed result; the sample loop runs inside each chunk, so memory stays bounded by chunk size. """ combine_fn = {"wlc": wlc, "wpm": wpm}.get(combine_method) if combine_fn is None: raise ValueError( f"Unknown combine_method {combine_method!r}. " "Choose 'wlc' or 'wpm'" ) if method == "one_at_a_time": return _oat(criteria, weights, combine_fn, delta, name) elif method == "monte_carlo": if not isinstance(n_samples, (int, np.integer)) or n_samples < 1: raise ValueError( f"n_samples must be a positive integer >= 1, got {n_samples!r}" ) return _monte_carlo( criteria, weights, combine_method, int(n_samples), seed, name ) else: raise ValueError( f"Unknown method {method!r}. " "Choose 'one_at_a_time' or 'monte_carlo'" )
def _perturb_weights( weights: dict[str, float], target: str, delta: float ) -> tuple[dict[str, float], dict[str, float]]: """Create +delta and -delta weight variants for one criterion. The target criterion's weight shifts by delta; remaining weights are rescaled proportionally to maintain a sum of 1.0. """ names = list(weights.keys()) base_w = weights[target] others = [k for k in names if k != target] other_sum = sum(weights[k] for k in others) # Single criterion: weight must stay at 1.0, perturbation is a no-op if not others: copy = dict(weights) return copy, dict(copy) results = [] for sign in (+1, -1): new_target = max(0.0, min(1.0, base_w + sign * delta)) remainder = 1.0 - new_target perturbed = {} for k in names: if k == target: perturbed[k] = new_target else: if other_sum > 0: perturbed[k] = weights[k] / other_sum * remainder else: perturbed[k] = remainder / len(others) results.append(perturbed) return results[0], results[1] def _oat(criteria, weights, combine_fn, delta, name): """One-at-a-time sensitivity: perturb each weight +/- delta.""" result_vars = {} for criterion in weights: w_plus, w_minus = _perturb_weights(weights, criterion, delta) score_plus = combine_fn(criteria, w_plus) score_minus = combine_fn(criteria, w_minus) diff = abs(score_plus - score_minus) diff.name = f"{name}_{criterion}" result_vars[criterion] = diff return xr.Dataset(result_vars) def _is_dask_dataset(criteria): """Check if any variable in the Dataset is backed by dask.""" try: import dask.array as da return any( isinstance(criteria[v].data, da.Array) for v in criteria.data_vars ) except ImportError: return False def _mc_block(block, weight_matrix, combine_method): """Run the Monte Carlo sample loop on one in-memory block. ``block`` has shape ``(n_criteria,) + grid`` with weight columns in the same criterion order. Welford's online algorithm keeps only the running mean and M2 buffers alive, so peak memory per block is a few times the block size regardless of ``n_samples``. Works on numpy and cupy blocks alike. """ xp = _get_xp(block) n_samples = weight_matrix.shape[0] n_criteria = block.shape[0] shape = block.shape[1:] running_mean = xp.zeros(shape, dtype=np.float64) running_m2 = xp.zeros(shape, dtype=np.float64) for i in range(n_samples): w = weight_matrix[i] # Same per-pixel arithmetic as combine.wlc / combine.wpm (the # canonical definitions), inlined on raw arrays so the loop # runs inside a single dask chunk. Keep in sync with those. if combine_method == "wpm": score = block[0] ** float(w[0]) for j in range(1, n_criteria): score = score * block[j] ** float(w[j]) else: score = block[0] * float(w[0]) for j in range(1, n_criteria): score = score + block[j] * float(w[j]) delta_val = score - running_mean running_mean = running_mean + delta_val / (i + 1) delta2 = score - running_mean running_m2 = running_m2 + delta_val * delta2 variance = running_m2 / n_samples std_dev = xp.sqrt(variance) # Coefficient of variation with np.errstate(divide="ignore", invalid="ignore"): return xp.where( running_mean != 0, std_dev / xp.abs(running_mean), 0.0 ) def _monte_carlo(criteria, weights, combine_method, n_samples, seed, name): """Monte Carlo sensitivity: random weight vectors, measure CV.""" rng = np.random.default_rng(seed) n_criteria = len(weights) criteria_names = list(weights.keys()) # Generate random weight vectors using Dirichlet distribution # Use base weights as concentration parameters (scaled up) alpha = np.array([weights[k] for k in criteria_names]) * n_criteria * 5 alpha = np.maximum(alpha, 0.5) # floor to avoid zero concentrations # Draw sequentially so a given seed reproduces the sample stream of # the previous per-iteration implementation. weight_matrix = np.stack( [rng.dirichlet(alpha) for _ in range(n_samples)] ) # Validate once up front with the first sampled vector (it is # finite and sums to 1, so only key-set mismatches can fire) the # way the first combine call used to. _validate_criteria(criteria) probe = {k: float(weight_matrix[0][j]) for j, k in enumerate(criteria_names)} _validate_weights(probe, criteria) if combine_method == "wpm": _check_wpm_positive(criteria) first_var = list(criteria.data_vars)[0] template = criteria[first_var] # Stack criterion layers in data_vars order (the order wlc/wpm # combine in) and permute the weight columns to match. var_order = list(criteria.data_vars) col = {k: j for j, k in enumerate(criteria_names)} ordered_weights = weight_matrix[:, [col[v] for v in var_order]] if _is_dask_dataset(criteria): import dask.array as da base_chunks = next( criteria[v].data.chunks for v in var_order if isinstance(criteria[v].data, da.Array) ) layers = [] for v in var_order: arr = criteria[v].data if isinstance(arr, da.Array): arr = arr.rechunk(base_chunks) else: arr = da.from_array(arr, chunks=base_chunks) layers.append(arr) # The sample loop runs inside one chunk function, so the graph # stays at one task per chunk and nothing materializes the full # criteria stack. stacked = da.stack(layers).rechunk({0: -1}) cv_vals = da.map_blocks( _mc_block, stacked, weight_matrix=ordered_weights, combine_method=combine_method, drop_axis=0, dtype=np.float64, meta=stacked._meta.sum(axis=0), **_dask_task_name_kwargs('xrspatial.sensitivity'), ) else: stacked = np.stack([criteria[v].data for v in var_order]) cv_vals = _mc_block(stacked, ordered_weights, combine_method) result = xr.DataArray( cv_vals, name=name, dims=template.dims, coords=template.coords, attrs=template.attrs, ) return result