Source code for xrspatial.mcda.combine

"""Combination methods for MCDA criterion layers.

All functions take an ``xr.Dataset`` of standardized (0-1) criterion
layers and return a single composite ``xr.DataArray``.
"""

from __future__ import annotations

import numpy as np
import xarray as xr


def _validate_criteria(criteria: xr.Dataset) -> None:
    if not isinstance(criteria, xr.Dataset):
        raise TypeError(
            f"criteria must be an xr.Dataset, got {type(criteria).__name__}"
        )
    if len(criteria.data_vars) == 0:
        raise ValueError("criteria Dataset has no variables")


def _validate_weights(
    weights: dict[str, float], criteria: xr.Dataset
) -> None:
    criteria_names = set(criteria.data_vars)
    weight_names = set(weights)
    missing = criteria_names - weight_names
    if missing:
        raise ValueError(f"Missing weights for criteria: {missing}")
    extra = weight_names - criteria_names
    if extra:
        raise ValueError(
            f"Weights contain keys not in criteria Dataset: {extra}"
        )
    total = sum(weights.values())
    if abs(total - 1.0) > 0.01:
        raise ValueError(
            f"Weights must sum to ~1.0, got {total:.4f}"
        )


[docs] def wlc( criteria: xr.Dataset, weights: dict[str, float], name: str = "wlc", ) -> xr.DataArray: """Weighted Linear Combination. Fully compensatory: per-pixel ``sum(w_i * x_i)``. Parameters ---------- criteria : xr.Dataset Standardized criterion layers (0-1). weights : dict ``{criterion_name: weight}``, must sum to ~1.0. name : str Name of the output DataArray. Returns ------- xr.DataArray Composite suitability surface. """ _validate_criteria(criteria) _validate_weights(weights, criteria) result = None for var_name in criteria.data_vars: w = weights[var_name] term = criteria[var_name] * w result = term if result is None else result + term result.name = name return result
[docs] def wpm( criteria: xr.Dataset, weights: dict[str, float], name: str = "wpm", ) -> xr.DataArray: """Weighted Product Model. Multiplicative combination: per-pixel ``product(x_i ^ w_i)``. More sensitive to low scores than WLC. Parameters ---------- criteria : xr.Dataset Standardized criterion layers (0-1). weights : dict ``{criterion_name: weight}``, must sum to ~1.0. name : str Name of the output DataArray. Returns ------- xr.DataArray Composite suitability surface. """ _validate_criteria(criteria) _validate_weights(weights, criteria) result = None for var_name in criteria.data_vars: w = weights[var_name] term = criteria[var_name] ** w result = term if result is None else result * term result.name = name return result
[docs] def owa( criteria: xr.Dataset, criterion_weights: dict[str, float], order_weights: list[float], name: str = "owa", ) -> xr.DataArray: """Ordered Weighted Averaging. Generalizes WLC by adding position-based order weights that control risk attitude. Order weights are applied to criterion values sorted by magnitude at each pixel, not by criterion identity. Parameters ---------- criteria : xr.Dataset Standardized criterion layers (0-1). criterion_weights : dict ``{criterion_name: weight}``, must sum to ~1.0. order_weights : list of float Weights applied by rank position (index 0 = highest value). Must have the same length as the number of criteria and sum to ~1.0. name : str Name of the output DataArray. Returns ------- xr.DataArray Composite suitability surface. """ _validate_criteria(criteria) _validate_weights(criterion_weights, criteria) n = len(criteria.data_vars) if len(order_weights) != n: raise ValueError( f"order_weights length ({len(order_weights)}) must match " f"number of criteria ({n})" ) ow_sum = sum(order_weights) if abs(ow_sum - 1.0) > 0.01: raise ValueError( f"order_weights must sum to ~1.0, got {ow_sum:.4f}" ) order_weights_arr = np.array(order_weights, dtype=np.float64) # First apply criterion weights weighted_layers = [] for var_name in criteria.data_vars: w = criterion_weights[var_name] weighted_layers.append(criteria[var_name] * w * n) # Stack, sort descending along criterion axis, apply order weights stacked = xr.concat(weighted_layers, dim="__mcda_criterion") sorted_data = _sort_descending(stacked.data, axis=0) # Apply order weights along the criterion axis # Reshape order weights for broadcasting shape = [n] + [1] * (sorted_data.ndim - 1) try: import dask.array as da if isinstance(sorted_data, da.Array): ow = da.from_array(order_weights_arr.reshape(shape), chunks=-1) else: ow = order_weights_arr.reshape(shape) except ImportError: ow = order_weights_arr.reshape(shape) result_data = (sorted_data * ow).sum(axis=0) # Pick dims/coords from first data var first_var = list(criteria.data_vars)[0] template = criteria[first_var] return xr.DataArray( result_data, name=name, dims=template.dims, coords=template.coords, attrs=template.attrs, )
def _sort_descending(data, axis): """Sort array descending along axis, supporting dask.""" try: import dask.array as da if isinstance(data, da.Array): # Negate, sort, negate back to get descending order return -da.sort(-data, axis=axis) except ImportError: pass try: import cupy if isinstance(data, cupy.ndarray): return -cupy.sort(-data, axis=axis) except ImportError: pass return -np.sort(-data, axis=axis)
[docs] def fuzzy_overlay( criteria: xr.Dataset, operator: str = "and", gamma: float = 0.9, name: str = "fuzzy_overlay", ) -> xr.DataArray: """Combine criteria using fuzzy set operators. No explicit weights. All criteria contribute equally through the chosen fuzzy operator. Parameters ---------- criteria : xr.Dataset Standardized criterion layers (0-1). operator : str Fuzzy operator: ``"and"`` (min), ``"or"`` (max), ``"sum"``, ``"product"``, or ``"gamma"``. gamma : float Gamma parameter for the ``"gamma"`` operator (0 = product, 1 = sum). Only used when ``operator="gamma"``. name : str Name of the output DataArray. Returns ------- xr.DataArray Composite suitability surface. """ _validate_criteria(criteria) var_names = list(criteria.data_vars) layers = [criteria[v] for v in var_names] if operator == "and": result = layers[0] for layer in layers[1:]: result = np.minimum(result, layer) elif operator == "or": result = layers[0] for layer in layers[1:]: result = np.maximum(result, layer) elif operator == "product": result = layers[0] for layer in layers[1:]: result = result * layer elif operator == "sum": # Fuzzy algebraic sum: 1 - product(1 - x_i) complement = layers[0] * 0.0 + 1.0 for layer in layers: complement = complement * (1.0 - layer) result = 1.0 - complement elif operator == "gamma": if not 0 <= gamma <= 1: raise ValueError(f"gamma must be in [0, 1], got {gamma}") # product part prod = layers[0] for layer in layers[1:]: prod = prod * layer # sum part complement = layers[0] * 0.0 + 1.0 for layer in layers: complement = complement * (1.0 - layer) fuzzy_sum = 1.0 - complement result = (fuzzy_sum ** gamma) * (prod ** (1.0 - gamma)) else: raise ValueError( f"Unknown operator {operator!r}. Choose from " "'and', 'or', 'sum', 'product', 'gamma'" ) result.name = name return result
[docs] def boolean_overlay( criteria: dict[str, xr.DataArray], operator: str = "and", name: str = "boolean_overlay", ) -> xr.DataArray: """Combine binary (boolean) criterion masks. Parameters ---------- criteria : dict of str to xr.DataArray Named boolean/binary criterion masks. operator : str ``"and"`` (intersection) or ``"or"`` (union). name : str Name of the output DataArray. Returns ------- xr.DataArray Binary suitability mask. """ if not criteria: raise ValueError("criteria dict is empty") # Cast to bool to handle numeric input (0/1, float thresholds) layers = [v.astype(bool) for v in criteria.values()] if operator == "and": result = layers[0] for layer in layers[1:]: result = result & layer elif operator == "or": result = layers[0] for layer in layers[1:]: result = result | layer else: raise ValueError( f"Unknown operator {operator!r}. Choose from 'and', 'or'" ) result.name = name return result