"""Combination methods for MCDA criterion layers.
All functions take an ``xr.Dataset`` of standardized (0-1) criterion
layers and return a single composite ``xr.DataArray``.
"""
from __future__ import annotations
import numpy as np
import xarray as xr
# =====================================================================
# Memory guards
# =====================================================================
#
# ``owa()`` builds a temporary stack of every weighted criterion layer
# along a new ``__mcda_criterion`` axis, then sorts that stack
# descending along axis 0. Peak working set on the eager numpy path
# is dominated by the float64 stack itself:
#
# stack : float64 -> 8 bytes per pixel per criterion
#
# So bytes per output pixel scale with the criterion count rather than
# being a fixed constant, which is why this module's check takes
# ``n_criteria`` as input instead of using a per-pixel constant.
_BYTES_PER_VALUE = 8
def _available_memory_bytes():
"""Best-effort estimate of available host memory in bytes."""
try:
with open('/proc/meminfo', 'r') as f:
for line in f:
if line.startswith('MemAvailable:'):
return int(line.split()[1]) * 1024 # kB -> bytes
except (OSError, ValueError, IndexError):
pass
try:
import psutil
return psutil.virtual_memory().available
except (ImportError, AttributeError):
pass
return 2 * 1024 ** 3
def _check_owa_memory(n_criteria, shape):
"""Raise MemoryError if the OWA stack would exceed 50% of RAM.
The eager numpy path materializes the full
``(n_criteria,) + shape`` float64 stack, which is the dominant
working buffer. The dask path is bounded per chunk and skips this
check at the call site.
"""
pixels = 1
for dim in shape:
pixels *= int(dim)
required = int(n_criteria) * pixels * _BYTES_PER_VALUE
available = _available_memory_bytes()
if required > 0.5 * available:
shape_str = "x".join(str(int(d)) for d in shape)
raise MemoryError(
f"owa with {int(n_criteria)} criteria on a {shape_str} "
f"grid requires ~{required / 1e9:.1f} GB of working memory "
f"but only ~{available / 1e9:.1f} GB is available. Use a "
f"dask-backed Dataset for out-of-core processing."
)
def _validate_criteria(criteria: xr.Dataset) -> None:
if not isinstance(criteria, xr.Dataset):
raise TypeError(
f"criteria must be an xr.Dataset, got {type(criteria).__name__}"
)
if len(criteria.data_vars) == 0:
raise ValueError("criteria Dataset has no variables")
def _check_finite_weights(weights: dict[str, float], label: str) -> None:
"""Reject weight dicts that contain NaN or infinite values.
Without this guard, a single NaN weight slips past the sum-to-1 check
(``abs(NaN - 1.0) > 0.01`` is False) and propagates through every
pixel, returning an all-NaN raster with no error.
"""
bad = []
for name, value in weights.items():
try:
v = float(value)
except (TypeError, ValueError):
raise ValueError(
f"{label}[{name!r}] must be a real number, got {value!r}"
)
if not np.isfinite(v):
bad.append((name, v))
if bad:
details = ", ".join(f"{n!r}={v}" for n, v in bad)
raise ValueError(
f"{label} must be finite; got non-finite value(s): {details}"
)
def _validate_weights(
weights: dict[str, float], criteria: xr.Dataset
) -> None:
criteria_names = set(criteria.data_vars)
weight_names = set(weights)
missing = criteria_names - weight_names
if missing:
raise ValueError(f"Missing weights for criteria: {missing}")
extra = weight_names - criteria_names
if extra:
raise ValueError(
f"Weights contain keys not in criteria Dataset: {extra}"
)
_check_finite_weights(weights, "weights")
total = sum(weights.values())
if abs(total - 1.0) > 0.01:
raise ValueError(
f"Weights must sum to ~1.0, got {total:.4f}"
)
[docs]
def wlc(
criteria: xr.Dataset,
weights: dict[str, float],
name: str = "wlc",
) -> xr.DataArray:
"""Weighted Linear Combination.
Fully compensatory: per-pixel ``sum(w_i * x_i)``.
Parameters
----------
criteria : xr.Dataset
Standardized criterion layers (0-1).
weights : dict
``{criterion_name: weight}``, must sum to ~1.0.
name : str
Name of the output DataArray.
Returns
-------
xr.DataArray
Composite suitability surface.
"""
_validate_criteria(criteria)
_validate_weights(weights, criteria)
result = None
for var_name in criteria.data_vars:
w = weights[var_name]
term = criteria[var_name] * w
result = term if result is None else result + term
result.name = name
return result
def _check_wpm_positive(criteria: xr.Dataset) -> None:
"""Reject criterion layers with zero or negative values.
``wpm`` computes ``prod(x_i ** w_i)``. For non-integer weights this is
only well-defined when ``x_i > 0``: a single zero collapses the
product to zero (masking upstream bugs) and a negative base produces
NaN with no error. NaN values are allowed through so the documented
NaN-propagation behaviour is preserved.
"""
bad = []
for var_name in criteria.data_vars:
arr = criteria[var_name].data
# Mask NaN so they pass through; we only want to flag <= 0.
try:
import dask.array as da
if isinstance(arr, da.Array):
# Compute once; cheap relative to the full product pass.
min_val = float(da.nanmin(arr).compute())
else:
min_val = float(np.nanmin(arr))
except ImportError:
min_val = float(np.nanmin(arr))
except ValueError:
# All-NaN slice; nothing to flag.
continue
if not np.isnan(min_val) and min_val <= 0.0:
bad.append((var_name, min_val))
if bad:
details = ", ".join(f"{n!r} (min={v})" for n, v in bad)
raise ValueError(
"wpm requires strictly positive criterion values; got "
f"non-positive value(s) in: {details}"
)
[docs]
def wpm(
criteria: xr.Dataset,
weights: dict[str, float],
name: str = "wpm",
) -> xr.DataArray:
"""Weighted Product Model.
Multiplicative combination: per-pixel ``product(x_i ^ w_i)``.
More sensitive to low scores than WLC.
Parameters
----------
criteria : xr.Dataset
Standardized criterion layers. Values must be strictly positive
(``> 0``); zero or negative inputs raise ``ValueError`` because
``x ** w`` is undefined or collapses to zero for non-integer
weights. NaN values propagate through to the output.
weights : dict
``{criterion_name: weight}``, must sum to ~1.0.
name : str
Name of the output DataArray.
Returns
-------
xr.DataArray
Composite suitability surface.
"""
_validate_criteria(criteria)
_validate_weights(weights, criteria)
_check_wpm_positive(criteria)
result = None
for var_name in criteria.data_vars:
w = weights[var_name]
term = criteria[var_name] ** w
result = term if result is None else result * term
result.name = name
return result
[docs]
def owa(
criteria: xr.Dataset,
criterion_weights: dict[str, float],
order_weights: list[float],
name: str = "owa",
) -> xr.DataArray:
"""Ordered Weighted Averaging.
Generalizes WLC by adding position-based order weights that control
risk attitude. Order weights are applied to criterion values sorted
by magnitude at each pixel, not by criterion identity.
Parameters
----------
criteria : xr.Dataset
Standardized criterion layers (0-1).
criterion_weights : dict
``{criterion_name: weight}``, must sum to ~1.0.
order_weights : list of float
Weights applied by rank position (index 0 = highest value).
Must have the same length as the number of criteria and
sum to ~1.0.
name : str
Name of the output DataArray.
Returns
-------
xr.DataArray
Composite suitability surface.
"""
_validate_criteria(criteria)
_validate_weights(criterion_weights, criteria)
n = len(criteria.data_vars)
if len(order_weights) != n:
raise ValueError(
f"order_weights length ({len(order_weights)}) must match "
f"number of criteria ({n})"
)
# Reject NaN/Inf entries before the sum check, since
# ``abs(NaN - 1.0) > 0.01`` is False and would let the bad value through.
bad_ow = []
for i, value in enumerate(order_weights):
try:
v = float(value)
except (TypeError, ValueError):
raise ValueError(
f"order_weights[{i}] must be a real number, got {value!r}"
)
if not np.isfinite(v):
bad_ow.append((i, v))
if bad_ow:
details = ", ".join(f"[{i}]={v}" for i, v in bad_ow)
raise ValueError(
f"order_weights must be finite; got non-finite value(s): "
f"{details}"
)
ow_sum = sum(order_weights)
if abs(ow_sum - 1.0) > 0.01:
raise ValueError(
f"order_weights must sum to ~1.0, got {ow_sum:.4f}"
)
order_weights_arr = np.array(order_weights, dtype=np.float64)
# Memory guard: the eager path materializes the full
# (n, *shape) float64 stack via xr.concat below, so refuse early
# when that buffer would dominate available RAM. Dask-backed
# inputs are bounded per chunk and skip the check.
first_var = list(criteria.data_vars)[0]
first_data = criteria[first_var].data
is_dask = False
try:
import dask.array as _da
is_dask = isinstance(first_data, _da.Array)
except ImportError:
pass
if not is_dask:
_check_owa_memory(n, criteria[first_var].shape)
# First apply criterion weights
weighted_layers = []
for var_name in criteria.data_vars:
w = criterion_weights[var_name]
weighted_layers.append(criteria[var_name] * w * n)
# Stack, sort descending along criterion axis, apply order weights
stacked = xr.concat(weighted_layers, dim="__mcda_criterion")
sorted_data = _sort_descending(stacked.data, axis=0)
# Apply order weights along the criterion axis
# Reshape order weights for broadcasting
shape = [n] + [1] * (sorted_data.ndim - 1)
try:
import dask.array as da
if isinstance(sorted_data, da.Array):
ow = da.from_array(order_weights_arr.reshape(shape),
chunks=-1)
else:
ow = order_weights_arr.reshape(shape)
except ImportError:
ow = order_weights_arr.reshape(shape)
result_data = (sorted_data * ow).sum(axis=0)
# Pick dims/coords from first data var
template = criteria[first_var]
return xr.DataArray(
result_data, name=name, dims=template.dims,
coords=template.coords, attrs=template.attrs,
)
def _sort_descending(data, axis):
"""Sort array descending along axis, supporting dask."""
try:
import dask.array as da
if isinstance(data, da.Array):
# Negate, sort, negate back to get descending order
return -da.sort(-data, axis=axis)
except ImportError:
pass
try:
import cupy
if isinstance(data, cupy.ndarray):
return -cupy.sort(-data, axis=axis)
except ImportError:
pass
return -np.sort(-data, axis=axis)
[docs]
def fuzzy_overlay(
criteria: xr.Dataset,
operator: str = "and",
gamma: float = 0.9,
name: str = "fuzzy_overlay",
) -> xr.DataArray:
"""Combine criteria using fuzzy set operators.
No explicit weights. All criteria contribute equally through the
chosen fuzzy operator.
Parameters
----------
criteria : xr.Dataset
Standardized criterion layers (0-1).
operator : str
Fuzzy operator: ``"and"`` (min), ``"or"`` (max),
``"sum"``, ``"product"``, or ``"gamma"``.
gamma : float
Gamma parameter for the ``"gamma"`` operator (0 = product,
1 = sum). Only used when ``operator="gamma"``.
name : str
Name of the output DataArray.
Returns
-------
xr.DataArray
Composite suitability surface.
"""
_validate_criteria(criteria)
var_names = list(criteria.data_vars)
layers = [criteria[v] for v in var_names]
if operator == "and":
result = layers[0]
for layer in layers[1:]:
result = np.minimum(result, layer)
elif operator == "or":
result = layers[0]
for layer in layers[1:]:
result = np.maximum(result, layer)
elif operator == "product":
result = layers[0]
for layer in layers[1:]:
result = result * layer
elif operator == "sum":
# Fuzzy algebraic sum: 1 - product(1 - x_i)
complement = layers[0] * 0.0 + 1.0
for layer in layers:
complement = complement * (1.0 - layer)
result = 1.0 - complement
elif operator == "gamma":
if not 0 <= gamma <= 1:
raise ValueError(f"gamma must be in [0, 1], got {gamma}")
# product part
prod = layers[0]
for layer in layers[1:]:
prod = prod * layer
# sum part
complement = layers[0] * 0.0 + 1.0
for layer in layers:
complement = complement * (1.0 - layer)
fuzzy_sum = 1.0 - complement
result = (fuzzy_sum ** gamma) * (prod ** (1.0 - gamma))
else:
raise ValueError(
f"Unknown operator {operator!r}. Choose from "
"'and', 'or', 'sum', 'product', 'gamma'"
)
result.name = name
return result
[docs]
def boolean_overlay(
criteria: dict[str, xr.DataArray],
operator: str = "and",
name: str = "boolean_overlay",
) -> xr.DataArray:
"""Combine binary (boolean) criterion masks.
Parameters
----------
criteria : dict of str to xr.DataArray
Named boolean/binary criterion masks.
operator : str
``"and"`` (intersection) or ``"or"`` (union).
name : str
Name of the output DataArray.
Returns
-------
xr.DataArray
Binary suitability mask.
"""
if not criteria:
raise ValueError("criteria dict is empty")
# Cast to bool to handle numeric input (0/1, float thresholds)
layers = [v.astype(bool) for v in criteria.values()]
if operator == "and":
result = layers[0]
for layer in layers[1:]:
result = result & layer
elif operator == "or":
result = layers[0]
for layer in layers[1:]:
result = result | layer
else:
raise ValueError(
f"Unknown operator {operator!r}. Choose from 'and', 'or'"
)
result.name = name
return result