Source code for xrspatial.mcda.sensitivity

"""Sensitivity analysis for MCDA weight stability."""

from __future__ import annotations

import numpy as np
import xarray as xr

from xrspatial.mcda.combine import wlc, wpm


[docs] def sensitivity( criteria: xr.Dataset, weights: dict[str, float], method: str = "one_at_a_time", combine_method: str = "wlc", delta: float = 0.05, n_samples: int = 1000, seed: int = 42, name: str = "sensitivity", ) -> xr.DataArray | xr.Dataset: """Assess how weight changes affect the suitability surface. Parameters ---------- criteria : xr.Dataset Standardized criterion layers. weights : dict Base weights (must sum to ~1.0). method : str ``"one_at_a_time"`` or ``"monte_carlo"``. combine_method : str Combination function: ``"wlc"`` or ``"wpm"``. delta : float Perturbation magnitude for one-at-a-time (default 0.05). n_samples : int Number of random weight vectors for Monte Carlo (default 1000). Must be >= 1 when ``method="monte_carlo"``. seed : int Random seed for Monte Carlo reproducibility. name : str Name prefix for output arrays. Returns ------- xr.Dataset or xr.DataArray For ``"one_at_a_time"``: Dataset with per-criterion DataArrays showing the absolute score difference under perturbation. For ``"monte_carlo"``: DataArray of per-pixel coefficient of variation across random weight samples. """ combine_fn = {"wlc": wlc, "wpm": wpm}.get(combine_method) if combine_fn is None: raise ValueError( f"Unknown combine_method {combine_method!r}. " "Choose 'wlc' or 'wpm'" ) if method == "one_at_a_time": return _oat(criteria, weights, combine_fn, delta, name) elif method == "monte_carlo": if not isinstance(n_samples, (int, np.integer)) or n_samples < 1: raise ValueError( f"n_samples must be a positive integer >= 1, got {n_samples!r}" ) return _monte_carlo( criteria, weights, combine_fn, int(n_samples), seed, name ) else: raise ValueError( f"Unknown method {method!r}. " "Choose 'one_at_a_time' or 'monte_carlo'" )
def _perturb_weights( weights: dict[str, float], target: str, delta: float ) -> tuple[dict[str, float], dict[str, float]]: """Create +delta and -delta weight variants for one criterion. The target criterion's weight shifts by delta; remaining weights are rescaled proportionally to maintain a sum of 1.0. """ names = list(weights.keys()) base_w = weights[target] others = [k for k in names if k != target] other_sum = sum(weights[k] for k in others) # Single criterion: weight must stay at 1.0, perturbation is a no-op if not others: copy = dict(weights) return copy, dict(copy) results = [] for sign in (+1, -1): new_target = max(0.0, min(1.0, base_w + sign * delta)) remainder = 1.0 - new_target perturbed = {} for k in names: if k == target: perturbed[k] = new_target else: if other_sum > 0: perturbed[k] = weights[k] / other_sum * remainder else: perturbed[k] = remainder / len(others) results.append(perturbed) return results[0], results[1] def _oat(criteria, weights, combine_fn, delta, name): """One-at-a-time sensitivity: perturb each weight +/- delta.""" result_vars = {} for criterion in weights: w_plus, w_minus = _perturb_weights(weights, criterion, delta) score_plus = combine_fn(criteria, w_plus) score_minus = combine_fn(criteria, w_minus) diff = abs(score_plus - score_minus) diff.name = f"{name}_{criterion}" result_vars[criterion] = diff return xr.Dataset(result_vars) def _is_dask_dataset(criteria): """Check if any variable in the Dataset is backed by dask.""" try: import dask.array as da return any( isinstance(criteria[v].data, da.Array) for v in criteria.data_vars ) except ImportError: return False def _monte_carlo(criteria, weights, combine_fn, n_samples, seed, name): """Monte Carlo sensitivity: random weight vectors, measure CV.""" rng = np.random.default_rng(seed) n_criteria = len(weights) criteria_names = list(weights.keys()) # Generate random weight vectors using Dirichlet distribution # Use base weights as concentration parameters (scaled up) alpha = np.array([weights[k] for k in criteria_names]) * n_criteria * 5 alpha = np.maximum(alpha, 0.5) # floor to avoid zero concentrations first_var = list(criteria.data_vars)[0] template = criteria[first_var] use_dask = _is_dask_dataset(criteria) # Accumulate running mean and M2 for Welford's online algorithm # to avoid storing all n_samples surfaces in memory running_mean = template.values * 0.0 if not use_dask else np.zeros(template.shape) running_m2 = running_mean.copy() # For dask inputs: compute each score eagerly to avoid graph # explosion (each lazy iteration would chain ~35 graph nodes; # 1000 iterations = 35000 chained tasks with no parallelism). # The criterion layers are small enough per-pixel that the # bottleneck is the combination, not I/O. if use_dask: criteria_np = criteria.compute() else: criteria_np = criteria for i in range(n_samples): raw = rng.dirichlet(alpha) sample_weights = { k: float(raw[j]) for j, k in enumerate(criteria_names) } score = combine_fn(criteria_np, sample_weights) score_vals = score.values delta_val = score_vals - running_mean running_mean = running_mean + delta_val / (i + 1) delta2 = score_vals - running_mean running_m2 = running_m2 + delta_val * delta2 variance = running_m2 / n_samples std_dev = np.sqrt(variance) # Coefficient of variation with np.errstate(divide="ignore", invalid="ignore"): cv_vals = np.where(running_mean != 0, std_dev / np.abs(running_mean), 0.0) result = xr.DataArray( cv_vals, name=name, dims=template.dims, coords=template.coords, attrs=template.attrs, ) return result