Source code for xrspatial.dasymetric

"""Raster-based dasymetric mapping.

Dasymetric mapping redistributes aggregate zone-level data (e.g. population
per census tract) onto a finer raster grid using ancillary weight information
(land cover, nighttime lights, etc.).  This is the spatial inverse of
``zonal.stats``.

Three redistribution methods are supported via :func:`disaggregate`:

* ``'binary'`` -- binarize weights (nonzero -> 1), split value equally among
  nonzero-weight pixels in each zone.
* ``'weighted'`` (default) -- distribute proportionally:
  ``pixel = zone_value * pixel_weight / zone_weight_sum``.
* ``'limiting_variable'`` -- three-class dasymetric with per-class density
  caps and iterative redistribution (numpy-only).

Additionally, :func:`pycnophylactic` implements Tobler's smooth
pycnophylactic interpolation, which preserves zone totals through iterative
Laplacian smoothing with mass correction.

:func:`validate_disaggregation` checks that zone totals are preserved within
tolerance after any redistribution.

All four backends (numpy, cupy, dask+numpy, dask+cupy) are supported for the
binary and weighted methods of ``disaggregate`` and for
``validate_disaggregation``.  ``pycnophylactic`` supports numpy and cupy
(via CPU fallback).
"""

from __future__ import annotations

from typing import Union

import numpy as np
import pandas as pd
import xarray as xr

try:
    import dask
    import dask.array as da
    from dask import delayed
except ImportError:
    dask = None
    da = None
    delayed = None

try:
    import cupy
except ImportError:
    class cupy:
        ndarray = False

from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    has_cuda_and_cupy,
    has_dask_array,
    is_cupy_array,
    is_dask_cupy,
)

VALID_METHODS = ('binary', 'weighted', 'limiting_variable')


# ---------------------------------------------------------------------------
# helpers
# ---------------------------------------------------------------------------

def _available_memory_bytes():
    """Best-effort estimate of available memory in bytes."""
    try:
        with open('/proc/meminfo', 'r') as f:
            for line in f:
                if line.startswith('MemAvailable:'):
                    return int(line.split()[1]) * 1024
    except (OSError, ValueError, IndexError):
        pass
    try:
        import psutil
        return psutil.virtual_memory().available
    except (ImportError, AttributeError):
        pass
    return 2 * 1024 ** 3


def _check_disaggregate_memory(shape, method, n_zones):
    """Guard the peak working memory of the numpy disaggregate paths.

    ``method='weighted'`` and ``method='binary'`` keep ~5 full-shape arrays
    alive (zones_arr, weight_arr, invalid bool, mask bool, result float64).
    ``method='limiting_variable'`` adds ``pixel_class`` (int32) and per-zone
    masks during the inner loop, peaking around 7-8x input size.

    Raises MemoryError before the first allocation when the projected
    footprint exceeds 50% of available RAM.
    """
    n_pixels = int(shape[0]) * int(shape[1])
    # bytes per pixel: each float64 array is 8, each bool array is 1,
    # int32 array is 4.  See module docstring for the breakdown.
    if method == 'limiting_variable':
        # 2 float64 (zones_arr, weight_arr, result share buffers)
        # 4 bool full-shape (invalid, zmask, cls_mask, overflow_mask)
        # 1 int32 (pixel_class)
        # 1 float64 (result)
        bytes_per_pixel = 3 * 8 + 4 * 1 + 4
    else:
        bytes_per_pixel = 3 * 8 + 2 * 1
    required_bytes = n_pixels * bytes_per_pixel
    available = _available_memory_bytes()
    if required_bytes > 0.5 * available:
        raise MemoryError(
            f"disaggregate() with shape={shape}, method={method!r} "
            f"requires ~{required_bytes / 1e9:.1f} GB of working memory, "
            f"but only {available / 1e9:.1f} GB is available.  "
            f"Use a smaller raster or rechunk a dask-backed input."
        )


def _check_pycnophylactic_memory(shape, n_zones):
    """Guard the peak working memory of pycnophylactic.

    ``_pycnophylactic_numpy`` keeps four float64 full-shape arrays
    (surface, smoothed, neighbour_sum, neighbour_count), two bool
    arrays (valid, has_neighbours), plus one full-shape bool mask per
    zone in ``zone_masks``.  For 1000 zones on a 10000x10000 raster the
    masks alone come to ~100 GB.

    Raises MemoryError before the first allocation when the projected
    footprint exceeds 50% of available RAM.
    """
    n_pixels = int(shape[0]) * int(shape[1])
    # 4 float64 + 2 bool + n_zones bool masks
    bytes_per_pixel = 4 * 8 + 2 * 1 + max(0, int(n_zones)) * 1
    required_bytes = n_pixels * bytes_per_pixel
    available = _available_memory_bytes()
    if required_bytes > 0.5 * available:
        raise MemoryError(
            f"pycnophylactic() with shape={shape}, n_zones={n_zones} "
            f"requires ~{required_bytes / 1e9:.1f} GB of working memory "
            f"(per-zone masks scale with len(values)), "
            f"but only {available / 1e9:.1f} GB is available.  "
            f"Use a smaller raster or fewer zones."
        )


def _normalize_values(values):
    """Convert *values* (dict or pd.Series) to ``dict[int, float]``."""
    if isinstance(values, pd.Series):
        return {int(k): float(v) for k, v in values.items()}
    if isinstance(values, dict):
        return {int(k): float(v) for k, v in values.items()}
    raise TypeError(
        f"'values' must be a dict or pd.Series, got {type(values).__name__}"
    )


# ---------------------------------------------------------------------------
# numpy backend
# ---------------------------------------------------------------------------

def _disaggregate_numpy(zones, weight, values_dict, method, nodata_zone):
    """Core numpy implementation.  Returns a float64 ndarray."""
    zones_arr = np.asarray(zones, dtype=np.float64)
    weight_arr = np.asarray(weight, dtype=np.float64)

    # clamp negative weights to 0
    weight_arr = np.where(weight_arr < 0, 0.0, weight_arr)

    # binarize for binary method
    if method == 'binary':
        weight_arr = np.where(weight_arr != 0, 1.0, 0.0)

    # mask: NaN in zones or weight, or nodata_zone
    invalid = np.isnan(zones_arr) | np.isnan(weight_arr)
    if nodata_zone is not None:
        invalid |= (zones_arr == nodata_zone)

    # compute zone weight sums
    zone_ids = np.array(sorted(values_dict.keys()), dtype=np.float64)
    zone_sums = {}
    for zid in zone_ids:
        mask = (~invalid) & (zones_arr == zid)
        zone_sums[int(zid)] = float(np.nansum(weight_arr[mask]))

    return _disaggregate_numpy_with_sums(
        zones_arr, weight_arr, values_dict, zone_sums, invalid,
    )


def _disaggregate_numpy_with_sums(
    zones_arr, weight_arr, values_dict, zone_sums, invalid,
):
    """Distribute using precomputed zone weight sums.

    Used directly by numpy and also called per-chunk by the dask backend.
    """
    result = np.full(zones_arr.shape, np.nan, dtype=np.float64)

    for zid, zval in values_dict.items():
        mask = (~invalid) & (zones_arr == zid)
        wsum = zone_sums.get(int(zid), 0.0)
        if wsum == 0.0:
            # zero total weight -> NaN
            result[mask] = np.nan
        else:
            result[mask] = zval * weight_arr[mask] / wsum

    return result


# ---------------------------------------------------------------------------
# cupy backend -- CPU fallback
# ---------------------------------------------------------------------------

def _disaggregate_cupy(zones, weight, values_dict, method, nodata_zone):
    """CuPy fallback: transfer to CPU, run numpy, transfer back."""
    zones_np = zones.get()
    weight_np = weight.get()
    result_np = _disaggregate_numpy(zones_np, weight_np, values_dict,
                                    method, nodata_zone)
    return cupy.asarray(result_np)


# ---------------------------------------------------------------------------
# dask+numpy backend
# ---------------------------------------------------------------------------

def _compute_zone_sums_dask(zones_da, weight_da, zone_ids, method,
                            nodata_zone):
    """Eagerly compute per-zone weight sums across all dask chunks.

    Returns a plain ``dict[int, float]``.
    """
    zone_ids_set = set(int(z) for z in zone_ids)

    def _chunk_sums(z_block, w_block):
        z = np.asarray(z_block, dtype=np.float64)
        w = np.asarray(w_block, dtype=np.float64)
        w = np.where(w < 0, 0.0, w)
        if method == 'binary':
            w = np.where(w != 0, 1.0, 0.0)
        invalid = np.isnan(z) | np.isnan(w)
        if nodata_zone is not None:
            invalid |= (z == nodata_zone)
        sums = {}
        for zid in zone_ids_set:
            mask = (~invalid) & (z == zid)
            sums[zid] = float(np.nansum(w[mask]))
        return sums

    # collect delayed chunk sums
    z_blocks = zones_da.to_delayed().ravel()
    w_blocks = weight_da.to_delayed().ravel()
    chunk_results = dask.compute(*[
        delayed(_chunk_sums)(zb, wb) for zb, wb in zip(z_blocks, w_blocks)
    ])

    # merge across chunks
    merged = {int(zid): 0.0 for zid in zone_ids_set}
    for cr in chunk_results:
        for zid, s in cr.items():
            merged[int(zid)] += s
    return merged


def _distribute_chunk(z_block, w_block, values_dict, zone_sums, method,
                      nodata_zone):
    """Map-blocks worker: distribute within a single chunk."""
    z = np.asarray(z_block, dtype=np.float64)
    w = np.asarray(w_block, dtype=np.float64)
    w = np.where(w < 0, 0.0, w)
    if method == 'binary':
        w = np.where(w != 0, 1.0, 0.0)
    invalid = np.isnan(z) | np.isnan(w)
    if nodata_zone is not None:
        invalid |= (z == nodata_zone)
    return _disaggregate_numpy_with_sums(z, w, values_dict, zone_sums,
                                         invalid)


def _disaggregate_dask_numpy(zones_da, weight_da, values_dict, method,
                             nodata_zone):
    """Dask+numpy: eagerly compute zone sums, lazily distribute."""
    zone_ids = list(values_dict.keys())
    zone_sums = _compute_zone_sums_dask(zones_da, weight_da, zone_ids,
                                        method, nodata_zone)

    result = da.map_blocks(
        _distribute_chunk,
        zones_da,
        weight_da,
        values_dict=values_dict,
        zone_sums=zone_sums,
        method=method,
        nodata_zone=nodata_zone,
        dtype=np.float64,
    )
    return result


# ---------------------------------------------------------------------------
# dask+cupy backend
# ---------------------------------------------------------------------------

def _cupy_to_numpy_block(block):
    """Convert a cupy array block to numpy."""
    if hasattr(block, 'get'):
        return block.get()
    return np.asarray(block)


def _disaggregate_dask_cupy(zones_da, weight_da, values_dict, method,
                            nodata_zone):
    """Dask+cupy: convert chunks to numpy, delegate to dask+numpy path."""
    zones_np = da.map_blocks(_cupy_to_numpy_block, zones_da,
                             dtype=zones_da.dtype,
                             meta=np.array((), dtype=zones_da.dtype))
    weight_np = da.map_blocks(_cupy_to_numpy_block, weight_da,
                              dtype=weight_da.dtype,
                              meta=np.array((), dtype=weight_da.dtype))
    return _disaggregate_dask_numpy(zones_np, weight_np, values_dict,
                                   method, nodata_zone)


# ---------------------------------------------------------------------------
# limiting variable (numpy-only)
# ---------------------------------------------------------------------------

def _disaggregate_limiting_numpy(zones, weight, values_dict, nodata_zone,
                                 class_breaks=(0.0,), density_caps=None):
    """Three-class dasymetric with density caps and iterative redistribution.

    Parameters
    ----------
    zones, weight : numpy arrays
    values_dict : dict mapping zone_id -> value
    nodata_zone : int or None
    class_breaks : tuple of floats
        Thresholds that split weight values into classes.  For example
        ``(0.0,)`` creates two classes: zero-weight and nonzero-weight.
    density_caps : sequence of floats or None
        Maximum density (value per pixel) for each class.  If None,
        no capping is applied and the result equals the weighted method.
    """
    zones_arr = np.asarray(zones, dtype=np.float64)
    weight_arr = np.asarray(weight, dtype=np.float64)
    weight_arr = np.where(weight_arr < 0, 0.0, weight_arr)

    invalid = np.isnan(zones_arr) | np.isnan(weight_arr)
    if nodata_zone is not None:
        invalid |= (zones_arr == nodata_zone)

    # classify pixels
    breaks = sorted(class_breaks)
    n_classes = len(breaks) + 1
    pixel_class = np.zeros(zones_arr.shape, dtype=np.int32)
    for i, b in enumerate(breaks):
        pixel_class[weight_arr > b] = i + 1

    if density_caps is None:
        # Class 0 captures zero-weight (uninhabitable) pixels, so its cap
        # must be 0 to prevent population landing on water/parks/etc.
        density_caps = [0.0] + [np.inf] * (n_classes - 1)

    result = np.full(zones_arr.shape, np.nan, dtype=np.float64)

    for zid, zval in values_dict.items():
        zmask = (~invalid) & (zones_arr == zid)
        remaining = zval

        for cls in range(n_classes):
            cls_mask = zmask & (pixel_class == cls)
            n_pixels = int(cls_mask.sum())
            if n_pixels == 0:
                continue
            cap = density_caps[cls]
            allocated = min(remaining, n_pixels * cap)
            if n_pixels > 0:
                result[cls_mask] = allocated / n_pixels
            remaining -= allocated
            if remaining <= 0:
                break

        # distribute any leftover to habitable pixels only (cap > 0)
        if remaining > 0:
            overflow_mask = zmask.copy()
            for cls in range(n_classes):
                if density_caps[cls] <= 0:
                    overflow_mask &= ~(pixel_class == cls)
            n_overflow = int(overflow_mask.sum())
            if n_overflow > 0:
                result[overflow_mask] += remaining / n_overflow

    return result


# ---------------------------------------------------------------------------
# public API
# ---------------------------------------------------------------------------

[docs] def disaggregate( zones: xr.DataArray, values: Union[dict, pd.Series], weight: xr.DataArray, method: str = 'weighted', nodata_zone: Union[int, None] = None, class_breaks: Union[tuple, None] = None, density_caps: Union[list, None] = None, name: str = 'disaggregate', ) -> xr.DataArray: """Redistribute zone-level values onto a raster using ancillary weights. This is the spatial inverse of :func:`~xrspatial.zonal.stats`: given aggregate values per zone (e.g. population per census tract) and a weight surface (e.g. land cover), ``disaggregate`` distributes each zone's total across its pixels proportionally to their weights. Parameters ---------- zones : xr.DataArray 2-D integer raster identifying zone membership for each pixel. values : dict or pd.Series Mapping of ``zone_id -> value`` to redistribute. weight : xr.DataArray 2-D float raster of ancillary weights (e.g. land-cover suitability). Negative weights are clamped to zero. method : str, default ``'weighted'`` Redistribution method: * ``'binary'`` -- binarize weights (nonzero -> 1), split equally. * ``'weighted'`` -- proportional: ``pixel = zone_value * pixel_weight / zone_weight_sum``. * ``'limiting_variable'`` -- three-class dasymetric with per-class density caps (numpy-only). nodata_zone : int or None, default None Zone ID to treat as no-data (output NaN). class_breaks : tuple of float or None, default None Only used when ``method='limiting_variable'``. Thresholds that split weight values into classes. For example ``(0.0,)`` creates two classes: zero-weight and nonzero-weight. Defaults to ``(0.0,)`` if None. density_caps : list of float or None, default None Only used when ``method='limiting_variable'``. Maximum density (value per pixel) for each class. Must have ``len(class_breaks)+1`` entries. If None, the first class (zero-weight pixels) gets cap 0 and all others get unlimited capacity. name : str, default ``'disaggregate'`` Name for the output DataArray. Returns ------- xr.DataArray Float raster with the same shape, coordinates, and attributes as *zones*. The sum of output pixels within each zone equals the input zone value (conservation property), except where all weights are zero (result is NaN). Examples -------- >>> import numpy as np, xarray as xr >>> from xrspatial import disaggregate >>> zones = xr.DataArray(np.array([[1, 1], [2, 2]]), dims=['y', 'x']) >>> weight = xr.DataArray(np.array([[1.0, 3.0], [2.0, 2.0]]), dims=['y', 'x']) >>> result = disaggregate(zones, {1: 100, 2: 50}, weight) >>> result.values array([[25., 75.], [25., 25.]]) """ # --- validation --- if not isinstance(zones, xr.DataArray): raise TypeError( f"'zones' must be an xr.DataArray, got {type(zones).__name__}" ) if not isinstance(weight, xr.DataArray): raise TypeError( f"'weight' must be an xr.DataArray, got {type(weight).__name__}" ) if zones.ndim != 2: raise ValueError( f"'zones' must be 2-D, got {zones.ndim}-D" ) if weight.ndim != 2: raise ValueError( f"'weight' must be 2-D, got {weight.ndim}-D" ) if zones.shape != weight.shape: raise ValueError( f"'zones' and 'weight' must have the same shape, " f"got {zones.shape} and {weight.shape}" ) method = method.lower() if method not in VALID_METHODS: raise ValueError( f"Invalid method {method!r}. Must be one of {VALID_METHODS}" ) values_dict = _normalize_values(values) # --- limiting_variable: numpy-only --- if method == 'limiting_variable': _data = zones.data if has_dask_array() and isinstance(_data, da.Array): raise NotImplementedError( "method='limiting_variable' is not supported for dask arrays" ) if has_cuda_and_cupy() and is_cupy_array(_data): raise NotImplementedError( "method='limiting_variable' is not supported for cupy arrays" ) # peak working memory check before any allocation _check_disaggregate_memory(zones.shape, method, len(values_dict)) lv_breaks = class_breaks if class_breaks is not None else (0.0,) result_data = _disaggregate_limiting_numpy( zones.data, weight.data, values_dict, nodata_zone, class_breaks=lv_breaks, density_caps=density_caps, ) return xr.DataArray( result_data, dims=zones.dims, coords=zones.coords, attrs=zones.attrs, name=name, ) # peak working memory check for in-RAM backends before any allocation; # dask paths are bounded per chunk so we skip the guard there. _data_in = zones.data materializes = isinstance(_data_in, np.ndarray) or ( has_cuda_and_cupy() and is_cupy_array(_data_in) ) if materializes: _check_disaggregate_memory(zones.shape, method, len(values_dict)) # --- dispatch by backend --- mapper = ArrayTypeFunctionMapping( numpy_func=_disaggregate_numpy, cupy_func=_disaggregate_cupy, dask_func=_disaggregate_dask_numpy, dask_cupy_func=_disaggregate_dask_cupy, ) func = mapper(zones) result_data = func(zones.data, weight.data, values_dict, method, nodata_zone) return xr.DataArray( result_data, dims=zones.dims, coords=zones.coords, attrs=zones.attrs, name=name, )
# --------------------------------------------------------------------------- # pycnophylactic interpolation # --------------------------------------------------------------------------- def _pycnophylactic_numpy(zones_arr, values_dict, nodata_zone, max_iterations, convergence): """Tobler's pycnophylactic interpolation (numpy). Algorithm --------- 1. Initialise: each pixel gets ``zone_value / zone_pixel_count``. 2. Smooth: replace each pixel with the mean of its 4-connected neighbours (edge pixels use available neighbours only). 3. Correct: scale every pixel in each zone so the zone sum matches the original value exactly. 4. Repeat 2-3 until the maximum per-pixel change is below *convergence*. """ zones = np.asarray(zones_arr, dtype=np.float64) nrows, ncols = zones.shape # identify valid pixels per zone and build initial surface invalid = np.isnan(zones) if nodata_zone is not None: invalid |= (zones == nodata_zone) surface = np.full(zones.shape, np.nan, dtype=np.float64) zone_masks = {} zone_counts = {} zone_vals = {} for zid, zval in values_dict.items(): mask = (~invalid) & (zones == zid) n = int(mask.sum()) if n == 0: continue zone_masks[zid] = mask zone_counts[zid] = n zone_vals[zid] = zval surface[mask] = zval / n # pixels that belong to some zone (valid for smoothing) valid = ~np.isnan(surface) for _ in range(max_iterations): # Laplacian smoothing: mean of 4-connected neighbours smoothed = np.full_like(surface, np.nan) neighbour_sum = np.zeros_like(surface) neighbour_count = np.zeros_like(surface) # shift in each direction, accumulate if nrows > 1: neighbour_sum[1:, :] += np.where(valid[:-1, :], surface[:-1, :], 0) neighbour_count[1:, :] += valid[:-1, :].astype(np.float64) neighbour_sum[:-1, :] += np.where(valid[1:, :], surface[1:, :], 0) neighbour_count[:-1, :] += valid[1:, :].astype(np.float64) if ncols > 1: neighbour_sum[:, 1:] += np.where(valid[:, :-1], surface[:, :-1], 0) neighbour_count[:, 1:] += valid[:, :-1].astype(np.float64) neighbour_sum[:, :-1] += np.where(valid[:, 1:], surface[:, 1:], 0) neighbour_count[:, :-1] += valid[:, 1:].astype(np.float64) has_neighbours = neighbour_count > 0 smoothed[valid & has_neighbours] = ( neighbour_sum[valid & has_neighbours] / neighbour_count[valid & has_neighbours] ) # pixels with no valid neighbours keep their current value smoothed[valid & ~has_neighbours] = surface[valid & ~has_neighbours] # mass correction: rescale each zone to preserve total for zid in zone_masks: mask = zone_masks[zid] current_sum = np.nansum(smoothed[mask]) if current_sum != 0: smoothed[mask] *= zone_vals[zid] / current_sum else: # all smoothed to zero -- revert to uniform smoothed[mask] = zone_vals[zid] / zone_counts[zid] # convergence check max_change = np.nanmax(np.abs(smoothed[valid] - surface[valid])) surface = smoothed if max_change < convergence: break return surface
[docs] def pycnophylactic( zones: xr.DataArray, values: Union[dict, pd.Series], max_iterations: int = 100, convergence: float = 1e-5, nodata_zone: Union[int, None] = None, name: str = 'pycnophylactic', ) -> xr.DataArray: """Tobler's pycnophylactic interpolation preserving zone totals. Produces a smooth surface by iteratively applying Laplacian smoothing and then rescaling each zone so its pixel sum matches the original value. Unlike :func:`disaggregate`, no ancillary weight raster is needed -- smoothness alone drives the redistribution. Parameters ---------- zones : xr.DataArray 2-D integer raster identifying zone membership for each pixel. values : dict or pd.Series Mapping of ``zone_id -> value`` to redistribute. max_iterations : int, default 100 Maximum number of smoothing-correction iterations. convergence : float, default 1e-5 Stop when the largest per-pixel change is below this threshold. nodata_zone : int or None, default None Zone ID to treat as no-data (output NaN). name : str, default ``'pycnophylactic'`` Name for the output DataArray. Returns ------- xr.DataArray Float raster with the same shape, coordinates, and attributes as *zones*. Zone totals are preserved (pycnophylactic property). Notes ----- Currently supports numpy and cupy (via CPU fallback) backends. Dask arrays raise ``NotImplementedError`` because the algorithm is inherently iterative and requires global zone sums each iteration. References ---------- Tobler, W. R. (1979). "Smooth Pycnophylactic Interpolation for Geographical Regions". *Journal of the American Statistical Association*, 74(367), 519-530. Examples -------- >>> import numpy as np, xarray as xr >>> from xrspatial.dasymetric import pycnophylactic >>> zones = xr.DataArray(np.array([[1, 1], [2, 2]]), dims=['y', 'x']) >>> result = pycnophylactic(zones, {1: 100, 2: 50}) >>> float(np.nansum(result.values[:1, :])) # zone 1 sum 100.0 """ # --- validation --- if not isinstance(zones, xr.DataArray): raise TypeError( f"'zones' must be an xr.DataArray, got {type(zones).__name__}" ) if zones.ndim != 2: raise ValueError( f"'zones' must be 2-D, got {zones.ndim}-D" ) values_dict = _normalize_values(values) _data = zones.data # dask not supported (iterative algorithm needs global state each step) if has_dask_array() and isinstance(_data, da.Array): raise NotImplementedError( "pycnophylactic is not supported for dask arrays. " "Compute zones first with zones.compute()." ) # peak working memory check before any allocation; per-zone masks # scale linearly with len(values). _check_pycnophylactic_memory(zones.shape, len(values_dict)) # cupy: CPU fallback if has_cuda_and_cupy() and is_cupy_array(_data): zones_np = _data.get() result_data = _pycnophylactic_numpy( zones_np, values_dict, nodata_zone, max_iterations, convergence, ) result_data = cupy.asarray(result_data) else: result_data = _pycnophylactic_numpy( _data, values_dict, nodata_zone, max_iterations, convergence, ) return xr.DataArray( result_data, dims=zones.dims, coords=zones.coords, attrs=zones.attrs, name=name, )
# --------------------------------------------------------------------------- # validation helper # ---------------------------------------------------------------------------
[docs] def validate_disaggregation( result: xr.DataArray, zones: xr.DataArray, values: Union[dict, pd.Series], tolerance: float = 1e-6, ) -> bool: """Check that zone totals in *result* match *values* within tolerance. Parameters ---------- result : xr.DataArray Output of :func:`disaggregate` or :func:`pycnophylactic`. zones : xr.DataArray The zone raster used to produce *result*. values : dict or pd.Series The original ``zone_id -> value`` mapping. tolerance : float, default 1e-6 Maximum allowed relative error per zone. Zones whose total weight is zero (result all NaN) are skipped. Returns ------- bool ``True`` if all zone totals are within tolerance. Raises ------ ValueError If any zone total exceeds the tolerance, with details on which zones failed. Examples -------- >>> from xrspatial.dasymetric import disaggregate, validate_disaggregation >>> result = disaggregate(zones, values, weight) >>> validate_disaggregation(result, zones, values) True """ values_dict = _normalize_values(values) # Memory guard: validation requires both arrays in RAM. for arr, label in [(result.data, 'result'), (zones.data, 'zones')]: if has_dask_array() and isinstance(arr, da.Array): est = np.prod(arr.shape) * arr.dtype.itemsize try: from xrspatial.zonal import _available_memory_bytes avail = _available_memory_bytes() except ImportError: avail = 2 * 1024**3 if est * 2 > 0.8 * avail: raise MemoryError( f"validate_disaggregation needs to materialize the " f"{label} array (~{est / 1e9:.1f} GB) but only " f"~{avail / 1e9:.1f} GB available." ) # extract numpy arrays from any backend result_np = _to_numpy(result.data) zones_np = _to_numpy(zones.data) failures = [] for zid, expected in values_dict.items(): mask = zones_np == zid if not np.any(mask): continue actual = float(np.nansum(result_np[mask])) # skip zones that are all NaN (zero-weight zones) if np.all(np.isnan(result_np[mask])): continue if expected == 0: if abs(actual) > tolerance: failures.append((zid, expected, actual)) else: rel_err = abs(actual - expected) / abs(expected) if rel_err > tolerance: failures.append((zid, expected, actual)) if failures: lines = [f" zone {z}: expected={e}, actual={a}" for z, e, a in failures] raise ValueError( f"Zone totals not preserved (tolerance={tolerance}):\n" + "\n".join(lines) ) return True
def _to_numpy(data): """Convert array data from any backend to a numpy ndarray.""" if has_dask_array() and isinstance(data, da.Array): data = data.compute() if has_cuda_and_cupy() and is_cupy_array(data): data = data.get() return np.asarray(data)