Source code for xrspatial.zonal

from __future__ import annotations

# standard library
import copy
from math import sqrt
from typing import Callable, Dict, List, Optional, Union

# 3rd-party
try:
    import dask
    import dask.array as da
except ImportError:
    dask = None
    da = None

try:
    import dask.dataframe as dd
except ImportError:
    dd = None

try:
    from dask import delayed
except ImportError:
    delayed = lambda x: None  # noqa

import numpy as np
import pandas as pd
import xarray as xr
from xarray import DataArray

try:
    import cupy
except ImportError:
    class cupy(object):
        ndarray = False

# local modules
from xrspatial.utils import (ArrayTypeFunctionMapping, _classify_backend, _validate_raster,
                             cuda_args, has_cuda_and_cupy, has_dask_array, is_cupy_array,
                             is_dask_cupy, ngjit, validate_arrays)

TOTAL_COUNT = '_total_count'

_DEFAULT_STATS_NUMPY = [
    "mean", "max", "min", "sum", "std", "var", "count", "majority",
]

# 'majority' cannot be computed block-by-block, so it is omitted from the
# dask default list and rejected when explicitly requested on a dask input.
_DEFAULT_STATS_DASK = [
    "mean", "max", "min", "sum", "std", "var", "count",
]


def _maybe_rasterize_zones(zones, values, column=None, rasterize_kw=None):
    """If *zones* is vector data, rasterize it using *values* as the template.

    Accepts:
    - ``GeoDataFrame`` (requires *column* to identify the zone-ID field)
    - list of ``(geometry, value)`` pairs

    Returns a 2-D ``xr.DataArray`` of zone IDs aligned to *values*.
    If *zones* is already a DataArray it is returned unchanged.
    """
    if isinstance(zones, xr.DataArray):
        return zones

    # list-of-pairs: [(geom, value), ...]
    is_pairs = (
        isinstance(zones, (list, tuple))
        and len(zones) > 0
        and isinstance(zones[0], (list, tuple))
        and len(zones[0]) == 2
    )

    # GeoDataFrame
    is_gdf = False
    try:
        import geopandas as gpd
        is_gdf = isinstance(zones, gpd.GeoDataFrame)
    except ImportError:
        pass

    if not is_pairs and not is_gdf:
        return zones

    from .rasterize import rasterize

    # Build the template from values (first 2D variable if Dataset)
    if isinstance(values, xr.Dataset):
        for var in values.data_vars:
            da_var = values[var]
            if da_var.ndim >= 2 and 'y' in da_var.dims and 'x' in da_var.dims:
                like = da_var
                break
        else:
            raise ValueError(
                "values Dataset has no 2D variable with 'y' and 'x' "
                "dimensions to use as rasterize template"
            )
    elif isinstance(values, xr.DataArray):
        if values.ndim >= 2:
            like = values
        else:
            raise ValueError(
                "values must be at least 2D to use as rasterize template"
            )
    else:
        raise TypeError(
            f"values must be an xr.DataArray or xr.Dataset, got {type(values)}"
        )

    kw = dict(rasterize_kw or {})
    kw['like'] = like

    if is_gdf:
        if column is None:
            raise ValueError(
                "column is required when zones is a GeoDataFrame. "
                "Specify which column contains zone IDs."
            )
        kw['column'] = column
    elif is_pairs:
        if column is not None:
            raise ValueError(
                "column should not be set when zones is a list of "
                "(geometry, value) pairs"
            )

    return rasterize(zones, **kw)


def _unique_finite_zones(arr):
    """Sorted unique finite values from *arr* without full materialisation.

    For dask arrays uses ``da.unique`` (per-chunk reduction) so the full
    array is never pulled into RAM.
    """
    if da is not None and isinstance(arr, da.Array):
        uniq = da.unique(arr).compute()
        return uniq[np.isfinite(uniq)]
    return np.unique(arr[np.isfinite(arr)])


def _unique_finite_cats(arr, nodata_values):
    """Sorted unique values excluding NaN, Inf, and *nodata_values*.

    Dask-safe: uses ``da.unique`` so the full array is never materialised.
    """
    if da is not None and isinstance(arr, da.Array):
        uniq = da.unique(arr).compute()
        mask = np.isfinite(uniq)
        if nodata_values is not None:
            mask &= (uniq != nodata_values)
        return uniq[mask]
    mask = np.isfinite(arr)
    if nodata_values is not None:
        mask &= (arr != nodata_values)
    return np.unique(arr[mask])


def _stats_count(data):
    if isinstance(data, np.ndarray):
        # numpy case
        stats_count = np.ma.count(data)
    elif isinstance(data, cupy.ndarray):
        # cupy case
        stats_count = np.prod(data.shape)
    else:
        # dask case
        stats_count = data.size - da.ma.getmaskarray(data).sum()
    return stats_count


def _stats_majority(data):
    if isinstance(data, np.ndarray):
        # numpy case
        values, counts = np.unique(data, return_counts=True)
        return values[np.argmax(counts)]
    elif isinstance(data, cupy.ndarray):
        # cupy case
        values, counts = cupy.unique(data, return_counts=True)
        return values[cupy.argmax(counts)]
    else:
        # dask case
        values, counts = da.unique(data, return_counts=True)
        return values[da.argmax(counts)]


_DEFAULT_STATS = dict(
    mean=lambda z: z.mean(),
    max=lambda z: z.max(),
    min=lambda z: z.min(),
    sum=lambda z: z.sum(),
    std=lambda z: z.std(),
    var=lambda z: z.var(),
    count=lambda z: _stats_count(z),
    majority=lambda z: _stats_majority(z),
)


_DASK_BLOCK_STATS = dict(
    max=lambda z: z.max(),
    min=lambda z: z.min(),
    sum=lambda z: z.sum(),
    count=lambda z: _stats_count(z),
    sum_squares=lambda z: ((z - z.mean()) ** 2).sum()  # block-level M2
)


def _nanreduce_preserve_allnan(blocks, func):
    """Reduce across blocks, returning NaN when ALL blocks are NaN for a zone.

    ``np.nansum`` returns 0 for all-NaN input; we want NaN so that zones
    with no valid values propagate NaN, consistent with the numpy backend.
    """
    import warnings
    with warnings.catch_warnings():
        warnings.simplefilter('ignore', RuntimeWarning)
        result = func(blocks, axis=0)
    all_nan = np.all(np.isnan(blocks), axis=0)
    result[all_nan] = np.nan
    return result


def _count_reduce(blocks):
    """Sum per-block counts. An empty zone (NaN in every block) totals 0.

    Unlike the other reducers, count does not preserve all-NaN as NaN: an
    empty zone has zero valid cells, so its count is 0, not undefined.
    """
    import warnings
    with warnings.catch_warnings():
        warnings.simplefilter('ignore', RuntimeWarning)
        return np.nansum(blocks, axis=0)


_DASK_STATS = dict(
    max=lambda blocks: _nanreduce_preserve_allnan(blocks, np.nanmax),
    min=lambda blocks: _nanreduce_preserve_allnan(blocks, np.nanmin),
    sum=lambda blocks: _nanreduce_preserve_allnan(blocks, np.nansum),
    count=_count_reduce,
    sum_squares=lambda blocks: _nanreduce_preserve_allnan(blocks, np.nansum),
)


def _dask_mean(sums, counts):  # noqa
    return sums / counts


def _parallel_variance(block_counts, block_sums, block_m2s):
    """Population variance via Chan-Golub-LeVeque parallel merge.

    Each input is (n_blocks, n_zones).  ``block_m2s`` contains
    per-block M2 values (sum of squared deviations from the block mean),
    NOT raw sum-of-squares.  Returns (n_zones,) population variance,
    with NaN for zones that have no valid values in any block.
    """
    n_blocks = block_counts.shape[0]
    n_zones = block_counts.shape[1]

    n_acc = np.zeros(n_zones, dtype=np.float64)
    mean_acc = np.zeros(n_zones, dtype=np.float64)
    m2_acc = np.zeros(n_zones, dtype=np.float64)

    for i in range(n_blocks):
        nc = np.asarray(block_counts[i], dtype=np.float64)
        sc = np.asarray(block_sums[i], dtype=np.float64)
        m2_b = np.asarray(block_m2s[i], dtype=np.float64)

        has_data = np.isfinite(nc) & (nc > 0)
        nc_safe = np.where(has_data, nc, 1.0)  # avoid /0

        with np.errstate(invalid='ignore', divide='ignore'):
            mean_b = sc / nc_safe

        nc = np.where(has_data, nc, 0.0)
        n_ab = n_acc + nc

        delta = mean_b - mean_acc
        with np.errstate(invalid='ignore', divide='ignore'):
            n_ab_safe = np.where(n_ab > 0, n_ab, 1.0)
            correction = delta ** 2 * n_acc * nc / n_ab_safe
            new_mean = mean_acc + delta * nc / n_ab_safe

        m2_acc = np.where(has_data, m2_acc + m2_b + correction, m2_acc)
        mean_acc = np.where(has_data, new_mean, mean_acc)
        n_acc = np.where(has_data, n_ab, n_acc)

    with np.errstate(invalid='ignore', divide='ignore'):
        var = np.where(n_acc > 0, m2_acc / n_acc, np.nan)
    return var


@ngjit
def _strides(flatten_zones, unique_zones):
    num_elements = flatten_zones.shape[0]
    num_zones = len(unique_zones)
    strides = np.zeros(len(unique_zones), dtype=np.int64)

    count = 0
    for i in range(num_zones):
        while (count < num_elements) and (
                flatten_zones[count] == unique_zones[i]):
            count += 1
        strides[i] = count

    return strides


def _sort_and_stride(zones, values, unique_zones):
    flatten_zones = zones.ravel()
    sorted_indices = np.argsort(flatten_zones)
    sorted_zones = flatten_zones[sorted_indices]

    values_shape = values.shape
    if len(values_shape) == 3:
        values_by_zones = copy.deepcopy(values).reshape(
            values_shape[0], values_shape[1] * values_shape[2])
        for i in range(values_shape[0]):
            values_by_zones[i] = values_by_zones[i][sorted_indices]
    else:
        values_by_zones = values.ravel()[sorted_indices]

    # exclude nans from calculation
    # flatten_zones is already sorted, NaN elements (if any) are at the end
    # of the array, removing them will not affect data before them
    sorted_zones = sorted_zones[np.isfinite(sorted_zones)]
    zone_breaks = _strides(sorted_zones, unique_zones)

    return sorted_indices, values_by_zones, zone_breaks


def _empty_zone_value(stat_name: str) -> float:
    # 'count' is a cardinality: an empty zone has zero valid cells, so its
    # count is 0. Every other built-in stat (and any custom callable) is
    # undefined over no values, so it stays NaN.
    return 0.0 if stat_name == 'count' else np.nan


def _calc_stats(
    values_by_zones: np.array,
    zone_breaks: np.array,
    unique_zones: np.array,
    zone_ids: np.array,
    func: Callable,
    nodata_values: Union[int, float] = None,
    empty_zone_value: float = np.nan,
):
    # An "empty" zone exists in the zones raster but has no valid values
    # (all NaN, or all equal to nodata_values). Most stats leave NaN there
    # (empty_zone_value defaults to NaN), but count passes 0 because the
    # count of no values is a cardinality of zero, not undefined.
    start = 0
    results = np.full(unique_zones.shape, empty_zone_value, dtype=np.float64)
    for i in range(len(unique_zones)):
        end = zone_breaks[i]
        if unique_zones[i] in zone_ids:
            zone_values = values_by_zones[start:end]
            # filter out non-finite and nodata_values
            mask = np.isfinite(zone_values)
            if nodata_values is not None:
                mask = mask & (zone_values != nodata_values)
            zone_values = zone_values[mask]
            if len(zone_values) > 0:
                results[i] = func(zone_values)
        start = end
    return results


@delayed
def _single_stats_func(
    zones_block: np.array,
    values_block: np.array,
    unique_zones: np.array,
    zone_ids: np.array,
    func: Callable,
    nodata_values: Union[int, float] = None,
) -> pd.DataFrame:

    _, values_by_zones, zone_breaks = _sort_and_stride(zones_block, values_block, unique_zones)
    results = _calc_stats(values_by_zones, zone_breaks, unique_zones, zone_ids, func, nodata_values)
    return results


def _stats_dask_numpy(
    zones: da.Array,
    values: da.Array,
    zone_ids: List[Union[int, float]],
    stats_funcs: Dict,
    nodata_values: Union[int, float],
) -> pd.DataFrame:

    # find ids for all zones
    unique_zones = _unique_finite_zones(zones)

    select_all_zones = False
    # selecte zones to do analysis
    if zone_ids is None:
        zone_ids = unique_zones
        select_all_zones = True

    zones_blocks = zones.to_delayed().ravel()
    values_blocks = values.to_delayed().ravel()

    stats_dict = {}
    stats_dict["zone"] = da.from_delayed(  # zone column
        delayed(lambda x: x)(unique_zones),
        shape=(np.nan,), dtype=unique_zones.dtype,
    )

    compute_sum_squares = False
    compute_sum = False
    compute_count = False

    if any(s in stats_funcs for s in ('mean', 'std', 'var')):
        compute_sum = True
        compute_count = True

    if any(s in stats_funcs for s in ('std', 'var')):
        compute_sum_squares = True

    basis_stats = [s for s in _DASK_BLOCK_STATS if s in stats_funcs]
    if compute_count and 'count' not in basis_stats:
        basis_stats.append('count')
    if compute_sum and 'sum' not in basis_stats:
        basis_stats.append('sum')
    if compute_sum_squares:
        basis_stats.append('sum_squares')

    dask_dtypes = dict(
        max=values.dtype,
        min=values.dtype,
        sum=values.dtype,
        count=np.int64,
        sum_squares=values.dtype,
    )

    # Keep per-block stacked arrays for the parallel variance merge
    stacked_blocks = {}

    for s in basis_stats:
        if s == 'sum_squares' and not compute_sum_squares:
            continue
        stats_func = _DASK_BLOCK_STATS.get(s)
        stats_by_block = [
            da.from_delayed(
                delayed(_single_stats_func)(
                    z, v, unique_zones, zone_ids, stats_func, nodata_values
                ), shape=(np.nan,), dtype=dask_dtypes[s]
            )
            for z, v in zip(zones_blocks, values_blocks)
        ]
        zonal_stats = da.stack(stats_by_block, allow_unknown_chunksizes=True)

        if compute_sum_squares and s in ('count', 'sum', 'sum_squares'):
            stacked_blocks[s] = zonal_stats

        stats_func_by_block = delayed(_DASK_STATS[s])
        stats_dict[s] = da.from_delayed(
            stats_func_by_block(zonal_stats), shape=(np.nan,), dtype=np.float64
        )

    if 'mean' in stats_funcs:
        stats_dict['mean'] = _dask_mean(stats_dict['sum'], stats_dict['count'])

    if 'std' in stats_funcs or 'var' in stats_funcs:
        var_result = da.from_delayed(
            delayed(_parallel_variance)(
                stacked_blocks['count'],
                stacked_blocks['sum'],
                stacked_blocks['sum_squares'],
            ),
            shape=(np.nan,), dtype=np.float64,
        )
        if 'var' in stats_funcs:
            stats_dict['var'] = var_result
        if 'std' in stats_funcs:
            stats_dict['std'] = da.from_delayed(
                delayed(np.sqrt)(var_result),
                shape=(np.nan,), dtype=np.float64,
            )

    # generate dask dataframe
    stats_df = dd.concat(
        [dd.from_dask_array(s) for s in stats_dict.values()],
        axis=1, ignore_unknown_divisions=True,
    )
    # name columns
    stats_df.columns = stats_dict.keys()
    # select columns (only include stats that were actually computed)
    computed_stats = [s for s in stats_funcs.keys() if s in stats_dict]
    stats_df = stats_df[['zone'] + computed_stats]

    if not select_all_zones:
        # Filter to requested zones using boolean indexing (avoids
        # iterrows() which materializes every row one at a time).
        zone_set = set(zone_ids)
        stats_df = stats_df[stats_df['zone'].isin(zone_set)]

    return stats_df


def _stats_numpy(
    zones: xr.DataArray,
    values: xr.DataArray,
    zone_ids: List[Union[int, float]],
    stats_funcs: Dict,
    nodata_values: Union[int, float],
    return_type: str,
) -> Union[pd.DataFrame, np.ndarray]:

    # find ids for all zones
    unique_zones = _unique_finite_zones(zones)
    # selected zones to do analysis
    if zone_ids is None:
        zone_ids = unique_zones
    else:
        zone_ids = np.unique(zone_ids)
        # remove zones that do not exist in `zones` raster
        zone_ids = [z for z in zone_ids if z in unique_zones]

    sorted_indices, values_by_zones, zone_breaks = _sort_and_stride(zones, values, unique_zones)
    if return_type == 'pandas.DataFrame':
        stats_dict = {}
        stats_dict["zone"] = zone_ids
        selected_indexes = [i for i, z in enumerate(unique_zones) if z in zone_ids]
        for stats in stats_funcs:
            func = stats_funcs.get(stats)
            stats_dict[stats] = _calc_stats(
                values_by_zones, zone_breaks,
                unique_zones, zone_ids, func, nodata_values,
                empty_zone_value=_empty_zone_value(stats),
            )
            stats_dict[stats] = stats_dict[stats][selected_indexes]
        result = pd.DataFrame(stats_dict)

    else:
        _check_stats_dataarray_memory(len(stats_funcs), values.shape)
        result = np.full((len(stats_funcs), values.size), np.nan)
        zone_ids_map = {z: i for i, z in enumerate(unique_zones) if z in zone_ids}
        stats_id = 0
        for stats in stats_funcs:
            func = stats_funcs.get(stats)
            stats_results = _calc_stats(
                values_by_zones, zone_breaks,
                unique_zones, zone_ids, func, nodata_values,
                empty_zone_value=_empty_zone_value(stats),
            )
            for zone in zone_ids:
                iz = zone_ids_map[zone]  # position of zone in unique_zones
                if iz == 0:
                    zs = sorted_indices[: zone_breaks[iz]]
                else:
                    zs = sorted_indices[zone_breaks[iz-1]: zone_breaks[iz]]
                result[stats_id][zs] = stats_results[iz]
            stats_id += 1
        result = result.reshape(len(stats_funcs), *values.shape)
    return result


def _stats_cupy(
    orig_zones: xr.DataArray,
    orig_values: xr.DataArray,
    zone_ids: List[Union[int, float]],
    stats_funcs: Dict,
    nodata_values: Union[int, float],
) -> pd.DataFrame:

    # TODO add support for 3D input
    if len(orig_values.shape) > 2:
        raise TypeError('3D inputs not supported for cupy backend')

    zones = cupy.ravel(orig_zones)
    values = cupy.ravel(orig_values)

    # Sort by zone so each zone's values occupy a contiguous range. Build
    # unique_zones from the raw zones array (finite zone IDs only) BEFORE
    # filtering values: a zone whose values are all NaN or all nodata must
    # still appear in the output with NaN stats, matching the numpy path.
    sorted_indices = cupy.argsort(zones)
    sorted_zones = zones[sorted_indices]
    values_by_zone = values[sorted_indices]

    finite_zone_mask = cupy.isfinite(sorted_zones)
    sorted_zones = sorted_zones[finite_zone_mask]
    values_by_zone = values_by_zone[finite_zone_mask]

    unique_zones, unique_index, unique_counts = cupy.unique(
        sorted_zones, return_index=True, return_counts=True)

    # Transfer to the host
    unique_index = unique_index.get()
    unique_counts = unique_counts.get()
    unique_zones = unique_zones.get()

    if zone_ids is not None:
        # Match the numpy path: deduplicate / sort with np.unique and drop
        # zone_ids that don't exist in the raster so the zone column and
        # stats columns stay aligned and the row order matches numpy.
        zone_ids = np.unique(zone_ids)
        unique_index_lst = []
        unique_counts_lst = []
        kept_zones = []
        unique_zones_lst = list(unique_zones)
        for z in zone_ids:
            try:
                idx = unique_zones_lst.index(z)
            except ValueError:
                continue
            kept_zones.append(z)
            unique_index_lst.append(unique_index[idx])
            unique_counts_lst.append(unique_counts[idx])
        unique_zones = kept_zones
        unique_counts = unique_counts_lst
        unique_index = unique_index_lst

    # stats columns
    stats_dict = {'zone': []}
    for stats in stats_funcs:
        stats_dict[stats] = []

    for i in range(len(unique_zones)):
        zone_id = unique_zones[i]
        # skip zone_id == nodata_zones, and non-finite zone ids
        if not np.isfinite(zone_id):
            continue

        stats_dict['zone'].append(zone_id)

        # extract zone_values, then filter per-zone for non-finite values
        # and the nodata sentinel. If the zone has no valid values left,
        # emit the empty-zone value for every stat instead of dropping the
        # zone: 0 for count (a cardinality), NaN for everything else.
        zone_values = values_by_zone[unique_index[i]:unique_index[i]+unique_counts[i]]
        zone_mask = cupy.isfinite(zone_values)
        if nodata_values is not None:
            zone_mask = zone_mask & (zone_values != nodata_values)
        zone_values = zone_values[zone_mask]

        if zone_values.size == 0:
            for stats in stats_funcs:
                stats_dict[stats].append(_empty_zone_value(stats))
            continue

        # apply stats on the zone data
        for j, stats in enumerate(stats_funcs):
            stats_func = stats_funcs.get(stats)
            if not callable(stats_func):
                raise ValueError(stats)
            result = stats_func(zone_values)

            assert (len(result.shape) == 0)

            stats_dict[stats].append(cupy.float_(result))

    stats_df = pd.DataFrame(stats_dict)
    return stats_df


def _stats_dask_cupy(
    zones,
    values,
    zone_ids,
    stats_funcs,
    nodata_values,
):
    zones_cpu = zones.map_blocks(
        lambda x: x.get(), dtype=zones.dtype, meta=np.array(()),
    )
    values_cpu = values.map_blocks(
        lambda x: x.get(), dtype=values.dtype, meta=np.array(()),
    )
    return _stats_dask_numpy(
        zones_cpu, values_cpu, zone_ids, stats_funcs, nodata_values,
    )


[docs] def stats( zones, values: xr.DataArray, zone_ids: Optional[List[Union[int, float]]] = None, stats_funcs: Optional[Union[Dict, List]] = None, nodata_values: Union[int, float] = None, return_type: str = 'pandas.DataFrame', column: Optional[str] = None, rasterize_kw: Optional[dict] = None, ) -> Union[pd.DataFrame, dd.DataFrame, xr.DataArray]: """ Calculate summary statistics for each zone defined by a `zones` dataset, based on `values` aggregate. A single output value is computed for every zone in the input `zones` dataset. This function currently supports numpy backed, and dask with numpy backed xarray DataArrays. Parameters ---------- zones : xr.DataArray, GeoDataFrame, or list of (geometry, value) pairs Zone definitions. Can be: - A 2D xarray DataArray of numeric zone IDs. - A ``geopandas.GeoDataFrame`` (requires *column*). - A list of ``(shapely geometry, zone_id)`` pairs. When vector input is provided, ``rasterize()`` is called internally using *values* as the template grid. Results depend on raster resolution. values : xr.DataArray or xr.Dataset values is a 2D xarray DataArray of numeric values (integers or floats). The input `values` raster contains the input values used in calculating the output statistic for each zone. In dask case, the chunksizes of `zones` and `values` should be matching. If not, `values` will be rechunked to be the same as of `zones`. When a Dataset is passed, stats are computed for each variable and columns are prefixed with the variable name (e.g. ``elevation_mean``). For 3D time-series DataArrays, convert to a Dataset first using ``.to_dataset(dim='time')`` and pass the resulting Dataset. zone_ids : list of ints, or floats List of zones to be included in calculation. If no zone_ids provided, all zones will be used. stats_funcs : dict, or list of strings, optional The statistics to calculate for each zone. If a list, possible choices are subsets of the default options. In the dictionary case, all of its values must be callable. Function takes only one argument that is the `values` raster. The key becomes the column name in the output DataFrame. Defaults: ``['mean', 'max', 'min', 'sum', 'std', 'var', 'count', 'majority']`` for numpy/cupy and ``['mean', 'max', 'min', 'sum', 'std', 'var', 'count']`` for dask-backed inputs. ``'majority'`` cannot be computed block-by-block so requesting it on a dask input raises ``ValueError`` instead of being silently dropped. Note that if `zones` and `values` are dask-backed DataArrays, `stats_funcs` must be provided as a list (or left unset). nodata_values: int, float, default=None Nodata value in `values` raster. Cells with `nodata_values` do not belong to any zone, and thus excluded from calculation. return_type: str, default='pandas.DataFrame' Format of returned data. Must be one of: - ``'pandas.DataFrame'``: one row per zone, one column per statistic (plus a ``zone`` column). For dask-backed inputs the result is a ``dask.dataframe.DataFrame``. This is the only value supported for cupy, dask, and Dataset inputs. - ``'xarray.DataArray'``: a DataArray whose first dimension is ``'stats'`` and whose remaining dims match ``values``. Cells outside the requested zones are filled with NaN. Only supported for numpy-backed DataArray inputs. Any other value raises ``ValueError``. column : str, optional Column name in the GeoDataFrame that contains zone IDs. Required when *zones* is a GeoDataFrame; must not be set for list-of-pairs or DataArray input. rasterize_kw : dict, optional Extra keyword arguments forwarded to ``rasterize()`` when *zones* is vector input (e.g. ``{'all_touched': True}``). Notes ----- Empty zones. A zone that exists in ``zones`` but has no valid values (every cell is NaN, or every cell equals ``nodata_values``) still appears as a row in the output. For such a zone, ``count`` is ``0`` because the number of valid cells is zero. Every other statistic (``mean``, ``min``, ``max``, ``sum``, ``std``, ``var``, ``majority``, and any custom callable) is ``NaN``, since those values are undefined over an empty set. This holds across the numpy, cupy, and dask backends. Returns ------- stats_df : Union[pandas.DataFrame, dask.dataframe.DataFrame] A pandas DataFrame, or a dask DataFrame where each column is a statistic and each row is a zone with zone id. When ``values`` is a Dataset, the returned DataFrame has columns prefixed by the variable name (e.g. ``elevation_mean``, ``elevation_max``), and ``return_type`` must be ``'pandas.DataFrame'``. Examples -------- stats() works with NumPy backed DataArray .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.zonal import stats >>> height, width = 10, 10 >>> values_data = np.array([ [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [20, 21, 22, 23, 24, 25, 26, 27, 28, 29], [30, 31, 32, 33, 34, 35, 36, 37, 38, 39], [40, 41, 42, 43, 44, 45, 46, 47, 48, 49], [50, 51, 52, 53, 54, 55, 56, 57, 58, 59], [60, 61, 62, 63, 64, 65, 66, 67, 68, 69], [70, 71, 72, 73, 74, 75, 76, 77, 78, 79], [80, 81, 82, 83, 84, 85, 86, 87, 88, 89], [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]]) >>> values = xr.DataArray(values_data) >>> zones_data = np.array([ [ 0., 0., 0., 0., 0., 10., 10., 10., 10., 10.], [ 0., 0., 0., 0., 0., 10., 10., 10., 10., 10.], [ 0., 0., 0., 0., 0., 10., 10., 10., 10., 10.], [ 0., 0., 0., 0., 0., 10., 10., 10., 10., 10.], [ 0., 0., 0., 0., 0., 10., 10., 10., 10., 10.], [20., 20., 20., 20., 20., 30., 30., 30., 30., 30.], [20., 20., 20., 20., 20., 30., 30., 30., 30., 30.], [20., 20., 20., 20., 20., 30., 30., 30., 30., 30.], [20., 20., 20., 20., 20., 30., 30., 30., 30., 30.], [20., 20., 20., 20., 20., 30., 30., 30., 30., 30.]]) >>> zones = xr.DataArray(zones_data) >>> # Calculate Stats >>> stats_df = stats(zones=zones, values=values) >>> print(stats_df) zone mean max min sum std var count 0 0 22.0 44 0 550 14.21267 202.0 25 1 10 27.0 49 5 675 14.21267 202.0 25 2 20 72.0 94 50 1800 14.21267 202.0 25 3 30 77.0 99 55 1925 14.21267 202.0 25 >>> # Custom Stats >>> custom_stats ={'double_sum': lambda val: val.sum()*2} >>> custom_stats_df = stats(zones=zones, values=values, stats_funcs=custom_stats) >>> print(custom_stats_df) zone double_sum 0 0 1100 1 10 1350 2 20 3600 3 30 3850 stats() works with Dask with NumPy backed DataArray >>> import dask.array as da >>> import dask.array as da >>> values_dask = xr.DataArray(da.from_array(values_data, chunks=(3, 3))) >>> zones_dask = xr.DataArray(da.from_array(zones_data, chunks=(3, 3))) >>> # Calculate Stats with dask backed xarray DataArrays >>> dask_stats_df = stats(zones=zones_dask, values=values_dask) >>> print(type(dask_stats_df)) <class 'dask.dataframe.core.DataFrame'> >>> print(dask_stats_df.compute()) zone mean max min sum std var count 0 0 22.0 44 0 550 14.21267 202.0 25 1 10 27.0 49 5 675 14.21267 202.0 25 2 20 72.0 94 50 1800 14.21267 202.0 25 3 30 77.0 99 55 1925 14.21267 202.0 25 stats() works with 3D time-series DataArrays via Dataset conversion .. sourcecode:: python >>> # Convert a 3D time-series DataArray to a Dataset, >>> # then pass to stats() to get per-timestep statistics. >>> values_3d = xr.DataArray( ... np.random.rand(2, 10, 10), ... dims=['time', 'dim_0', 'dim_1'], ... coords={'time': [2020, 2021]}) >>> ds = values_3d.to_dataset(dim='time') >>> stats_df = stats(zones=zones, values=ds) >>> # Columns: zone, 2020_mean, 2020_max, ..., 2021_mean, 2021_max, ... """ # Validate return_type up front. The internal _stats_numpy path silently # returns its raw ndarray buffer for anything that is not # 'pandas.DataFrame', so an unrecognised value (typo, stale name) used to # leak that undocumented intermediate to the caller. _allowed_return_types = ('pandas.DataFrame', 'xarray.DataArray') if return_type not in _allowed_return_types: raise ValueError( f"return_type={return_type!r} is not supported. " f"Allowed values: {list(_allowed_return_types)!r}." ) zones = _maybe_rasterize_zones(zones, values, column=column, rasterize_kw=rasterize_kw) # Dataset support: run stats per variable and merge into one DataFrame if isinstance(values, xr.Dataset): if return_type != 'pandas.DataFrame': raise ValueError( "return_type must be 'pandas.DataFrame' when values is a Dataset" ) if len(values.data_vars) == 0: raise ValueError( "values Dataset has no data variables to compute statistics " "over. Pass a Dataset with at least one data variable." ) dfs = [] for var_name in values.data_vars: df = stats( zones, values[var_name], zone_ids, stats_funcs, nodata_values, 'pandas.DataFrame', ) df = df.rename( columns={c: f'{var_name}_{c}' for c in df.columns if c != 'zone'} ) dfs.append(df) result = dfs[0] for df in dfs[1:]: result = result.merge(df, on='zone', how='outer') return result _validate_raster(zones, func_name='stats', name='zones', ndim=2) _validate_raster(values, func_name='stats', name='values', ndim=(2, 3)) validate_arrays(zones, values) is_dask_values = has_dask_array() and isinstance(values.data, da.Array) is_cupy_values = is_cupy_array(values.data) # Only the pure-numpy backend computes the (n_stats, *shape) buffer # that the xarray.DataArray return type needs. The cupy and dask # backends produce a DataFrame instead, so wrapping that in # xr.DataArray downstream would crash or silently misalign. Reject # 'xarray.DataArray' for non-numpy backends with a clear message # instead of letting it fail deep in the dispatch. if return_type == 'xarray.DataArray' and (is_dask_values or is_cupy_values): backend = 'dask-backed' if is_dask_values else 'cupy-backed' raise ValueError( f"return_type='xarray.DataArray' is not supported for " f"{backend} input. Use 'pandas.DataFrame' instead." ) # Resolve the default stats_funcs based on backend. The dask path cannot # compute 'majority' block-by-block, so its default list omits it. Using # None as the sentinel default also avoids the mutable-default pitfall. if stats_funcs is None: stats_funcs = ( list(_DEFAULT_STATS_DASK) if is_dask_values else list(_DEFAULT_STATS_NUMPY) ) # validate stats_funcs if is_dask_values and not isinstance(stats_funcs, list): raise ValueError( "Got dask-backed DataArray as `values` aggregate. " "`stats_funcs` must be a list that is a subset of " f"{_DEFAULT_STATS_DASK!r}." ) if is_dask_values and isinstance(stats_funcs, list): unsupported = [s for s in stats_funcs if s not in _DEFAULT_STATS_DASK] if unsupported: raise ValueError( f"stats_funcs={unsupported!r} not supported on dask-backed " f"input. Supported on dask: {_DEFAULT_STATS_DASK!r}." ) if isinstance(stats_funcs, list): # create a dict of stats stats_funcs_dict = {} for stat_name in stats_funcs: func = _DEFAULT_STATS.get(stat_name, None) if func is None: err_str = f"Invalid stat name. {stat_name} option not supported." raise ValueError(err_str) stats_funcs_dict[stat_name] = func elif isinstance(stats_funcs, dict): stats_funcs_dict = stats_funcs.copy() mapper = ArrayTypeFunctionMapping( numpy_func=lambda *args: _stats_numpy(*args, return_type=return_type), dask_func=_stats_dask_numpy, cupy_func=_stats_cupy, dask_cupy_func=_stats_dask_cupy, ) result = mapper(values)( zones.data, values.data, zone_ids, stats_funcs_dict, nodata_values, ) if return_type == 'xarray.DataArray': return xr.DataArray( result, coords={'stats': list(stats_funcs_dict.keys()), **values.coords}, dims=('stats', *values.dims), attrs=values.attrs ) return result
def _find_cats(values, cat_ids, nodata_values): if len(values.shape) == 2: # 2D case unique_cats = _unique_finite_cats(values.data, nodata_values) else: # 3D case unique_cats = values[values.dims[0]].data if cat_ids is None: cat_ids = unique_cats else: if isinstance(values.data, np.ndarray) or is_cupy_array(values.data): # remove cats that do not exist in `values` raster cat_ids = [c for c in cat_ids if c in unique_cats] else: cat_ids = _select_ids(unique_cats, cat_ids) return unique_cats, cat_ids def _get_zone_values(values_by_zones, start, end): if len(values_by_zones.shape) == 1: # 1D flatten, i.e, original data is 2D return values_by_zones[start:end] else: # 2D flatten, i.e, original data is 3D return values_by_zones[:, start:end] def _single_zone_crosstab_2d( zone_values, unique_cats, cat_ids, nodata_values, crosstab_dict, ): # 1D flatten zone_values, i.e, original data is 2D # filter out non-finite and nodata_values mask = np.isfinite(zone_values) if nodata_values is not None: mask = mask & (zone_values != nodata_values) zone_values = zone_values[mask] total_count = zone_values.shape[0] crosstab_dict[TOTAL_COUNT].append(total_count) sorted_zone_values = np.sort(zone_values) zone_cat_breaks = _strides(sorted_zone_values, unique_cats) # cat_start must advance for every unique category, not only those # in cat_ids. If we only advanced for selected categories, filtering # out an earlier category would leave cat_start behind and inflate # the next selected category's count. See issue #2560. cat_start = 0 for j, cat in enumerate(unique_cats): if cat in cat_ids: count = zone_cat_breaks[j] - cat_start crosstab_dict[cat].append(count) cat_start = zone_cat_breaks[j] def _single_zone_crosstab_3d( zone_values, unique_cats, cat_ids, nodata_values, crosstab_dict, stats_func ): # 2D flatten `zone_values`, i.e, original data is 3D for j, cat in enumerate(unique_cats): if cat in cat_ids: zone_cat_data = zone_values[j] # filter out non-finite and nodata_values cat_mask = np.isfinite(zone_cat_data) if nodata_values is not None: cat_mask = cat_mask & (zone_cat_data != nodata_values) zone_cat_data = zone_cat_data[cat_mask] crosstab_dict[cat].append(stats_func(zone_cat_data)) def _crosstab_numpy( zones: np.ndarray, values: np.ndarray, zone_ids: List[Union[int, float]], unique_cats: np.ndarray, cat_ids: Union[List, np.ndarray], nodata_values: Union[int, float], agg: str, ) -> pd.DataFrame: # find ids for all zones unique_zones = _unique_finite_zones(zones) # selected zones to do analysis if zone_ids is None: zone_ids = unique_zones else: # remove zones that do not exist in `zones` raster zone_ids = [z for z in zone_ids if z in unique_zones] crosstab_dict = {} crosstab_dict["zone"] = zone_ids if len(values.shape) == 2: crosstab_dict[TOTAL_COUNT] = [] for cat in cat_ids: crosstab_dict[cat] = [] _, values_by_zones, zone_breaks = _sort_and_stride( zones, values, unique_zones ) start = 0 for i in range(len(unique_zones)): end = zone_breaks[i] if unique_zones[i] in zone_ids: # get data for zone unique_zones[i] zone_values = _get_zone_values(values_by_zones, start, end) if len(values.shape) == 2: _single_zone_crosstab_2d( zone_values, unique_cats, cat_ids, nodata_values, crosstab_dict ) else: _single_zone_crosstab_3d( zone_values, unique_cats, cat_ids, nodata_values, crosstab_dict, _DEFAULT_STATS[agg] # noqa ) start = end if TOTAL_COUNT in crosstab_dict: crosstab_dict[TOTAL_COUNT] = np.array( crosstab_dict[TOTAL_COUNT], dtype=np.float32 ) for cat in cat_ids: crosstab_dict[cat] = np.array(crosstab_dict[cat]) # construct output dataframe if agg == 'percentage': # replace 0s with nans to avoid dividing by 0 error crosstab_dict[TOTAL_COUNT][crosstab_dict[TOTAL_COUNT] == 0] = np.nan for cat in cat_ids: crosstab_dict[cat] = crosstab_dict[cat] / crosstab_dict[TOTAL_COUNT] * 100 # noqa crosstab_df = pd.DataFrame(crosstab_dict) crosstab_df = crosstab_df[['zone'] + list(cat_ids)] return crosstab_df @delayed def _single_chunk_crosstab( zones_block: np.array, values_block: np.array, unique_zones: np.array, zone_ids: np.array, unique_cats, cat_ids, nodata_values: Union[int, float], ): results = {} if len(values_block.shape) == 2: results[TOTAL_COUNT] = [] for cat in cat_ids: results[cat] = [] _, values_by_zones, zone_breaks = _sort_and_stride( zones_block, values_block, unique_zones ) start = 0 for i in range(len(unique_zones)): end = zone_breaks[i] if unique_zones[i] in zone_ids: # get data for zone unique_zones[i] zone_values = _get_zone_values(values_by_zones, start, end) if len(values_block.shape) == 2: _single_zone_crosstab_2d( zone_values, unique_cats, cat_ids, nodata_values, results ) else: _single_zone_crosstab_3d( zone_values, unique_cats, cat_ids, nodata_values, results, _DEFAULT_STATS['count'] # noqa ) start = end if TOTAL_COUNT in results: results[TOTAL_COUNT] = np.array(results[TOTAL_COUNT], dtype=np.float32) for cat in cat_ids: results[cat] = np.array(results[cat]) return results @delayed def _select_ids(unique_ids, ids): selected_ids = [] for i in ids: if i in unique_ids: selected_ids.append(i) return selected_ids @delayed def _crosstab_df_dask(crosstab_by_block, zone_ids, cat_ids, agg): result = crosstab_by_block[0] for i in range(1, len(crosstab_by_block)): for k in crosstab_by_block[i]: result[k] += crosstab_by_block[i][k] if agg == 'percentage': # replace 0s with nans to avoid dividing by 0 error result[TOTAL_COUNT][result[TOTAL_COUNT] == 0] = np.nan for cat in cat_ids: result[cat] = result[cat] / result[TOTAL_COUNT] * 100 df = pd.DataFrame(result) df['zone'] = zone_ids columns = ['zone'] + list(cat_ids) df = df[columns] return df def _crosstab_dask_numpy( zones: np.ndarray, values: np.ndarray, zone_ids: List[Union[int, float]], unique_cats: np.ndarray, cat_ids: Union[List, np.ndarray], nodata_values: Union[int, float], agg: str, ): # find ids for all zones unique_zones = _unique_finite_zones(zones) if zone_ids is None: zone_ids = unique_zones else: zone_ids = _select_ids(unique_zones, zone_ids) cat_ids = _select_ids(unique_cats, cat_ids) zones_blocks = zones.to_delayed().ravel() values_blocks = values.to_delayed().ravel() crosstab_by_block = [ _single_chunk_crosstab( z, v, unique_zones, zone_ids, unique_cats, cat_ids, nodata_values ) for z, v in zip(zones_blocks, values_blocks) ] crosstab_df = _crosstab_df_dask( crosstab_by_block, zone_ids, cat_ids, agg ) return dd.from_delayed(crosstab_df) def _crosstab_cupy( zones: np.ndarray, values: np.ndarray, zone_ids, unique_cats, cat_ids, nodata_values, agg: str, ): # unique_cats / cat_ids may be cupy arrays from _find_cats if is_cupy_array(unique_cats): unique_cats = cupy.asnumpy(unique_cats) if is_cupy_array(cat_ids): cat_ids = cupy.asnumpy(cat_ids) return _crosstab_numpy( cupy.asnumpy(zones), cupy.asnumpy(values), zone_ids, unique_cats, cat_ids, nodata_values, agg, ) def _crosstab_dask_cupy( zones, values, zone_ids, unique_cats, cat_ids, nodata_values, agg: str, ): zones_cpu = zones.map_blocks( lambda x: x.get(), dtype=zones.dtype, meta=np.array(()), ) values_cpu = values.map_blocks( lambda x: x.get(), dtype=values.dtype, meta=np.array(()), ) # unique_cats / cat_ids may be cupy arrays from _find_cats if is_cupy_array(unique_cats): unique_cats = cupy.asnumpy(unique_cats) if is_cupy_array(cat_ids): cat_ids = cupy.asnumpy(cat_ids) return _crosstab_dask_numpy( zones_cpu, values_cpu, zone_ids, unique_cats, cat_ids, nodata_values, agg, )
[docs] def crosstab( zones, values: xr.DataArray, zone_ids: List[Union[int, float]] = None, cat_ids: List[Union[int, float]] = None, layer: Optional[int] = None, agg: Optional[str] = "count", nodata_values: Optional[Union[int, float]] = None, column: Optional[str] = None, rasterize_kw: Optional[dict] = None, ) -> Union[pd.DataFrame, dd.DataFrame]: """ Calculate cross-tabulated (categorical stats) areas between two datasets: a zone dataset `zones`, a value dataset `values` (a value raster). Infinite and NaN values in `zones` and `values` will be ignored. Outputs a pandas DataFrame if `zones` and `values` are numpy backed. Outputs a dask DataFrame if `zones` and `values` are dask with numpy-backed xarray DataArrays. Requires a DataArray with a single data dimension, here called the "values", indexed using either 2D or 3D coordinates. DataArrays with 3D coordinates are expected to contain values distributed over different categories that are indexed by the additional coordinate. Such an array would reduce to the 2D-coordinate case if collapsed across the categories (e.g. if one did ``aggc.sum(dim='cat')`` for a categorical dimension ``cat``). Parameters ---------- zones : xr.DataArray, GeoDataFrame, or list of (geometry, value) pairs Zone definitions. Can be: - A 2D xarray DataArray of integers or floats. - A ``geopandas.GeoDataFrame`` (requires *column*). - A list of ``(shapely geometry, zone_id)`` pairs. When vector input is provided, ``rasterize()`` is called internally using *values* as the template grid. values : xr.DataArray 2D or 3D data array of integers or floats. The input value raster contains the input values used in calculating the categorical statistic for each zone. zone_ids: List of ints, or floats List of zones to be included in calculation. If no zone_ids provided, all zones will be used. cat_ids: List of ints, or floats List of categories to be included in calculation. If no cat_ids provided, all categories will be used. layer: int, optional, default=None Index of the categorical dimension layer inside the `values` DataArray. When left as ``None``, layer 0 is used. agg: str, default = 'count' Aggregation method. If the `values` data is 2D, available options are: `percentage`, and `count`. If `values` is 3D and is numpy-backed, available options are: `min`, `max`, `mean`, `sum`, `std`, `var`, and `count`. If `values` is 3D and is dask with numpy-backed, the only available option is `count`. nodata_values: int, float, default=None Nodata value in `values` raster. Cells with `nodata` do not belong to any zone, and thus excluded from calculation. column : str, optional Column name in the GeoDataFrame that contains zone IDs. Required when *zones* is a GeoDataFrame. rasterize_kw : dict, optional Extra keyword arguments forwarded to ``rasterize()`` when *zones* is vector input (e.g. ``{'all_touched': True}``). Returns ------- crosstab_df : Union[pandas.DataFrame, dask.dataframe.DataFrame] A pandas DataFrame, or an uncomputed dask DataFrame, where each column is a categorical value and each row is a zone with zone id. Each entry presents the statistics, which computed using the specified aggregation method, of the category over the zone. Examples -------- crosstab() works with NumPy backed DataArray. .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.zonal import crosstab >>> values_data = np.asarray([ [0, 0, 10, 20], [0, 0, 0, 10], [0, np.nan, 20, 50], [10, 30, 40, np.inf], [10, 10, 50, 0]]) >>> values = xr.DataArray(values_data) >>> zones_data = np.asarray([ [1, 1, 6, 6], [1, np.nan, 6, 6], [3, 5, 6, 6], [3, 5, 7, np.nan], [3, 7, 7, 0]]) >>> zones = xr.DataArray(zones_data) >>> # Calculate Crosstab, numpy case >>> df = crosstab(zones=zones, values=values) >>> print(df) zone 0.0 10.0 20.0 30.0 40.0 50.0 0 0 1 0 0 0 0 0 1 1 3 0 0 0 0 0 2 3 1 2 0 0 0 0 3 5 0 0 0 1 0 0 4 6 1 2 2 0 0 1 5 7 0 1 0 0 1 1 crosstab() works with Dask with NumPy backed DataArray. .. sourcecode:: python >>> import dask.array as da >>> values_dask = xr.DataArray(da.from_array(values_data, chunks=(3, 3))) >>> zones_dask = xr.DataArray(da.from_array(zones_data, chunks=(3, 3))) >>> dask_df = crosstab(zones=zones_dask, values=values_dask) >>> print(dask_df) Dask DataFrame Structure: zone 0.0 10.0 20.0 30.0 40.0 50.0 npartitions=5 0 float64 int64 int64 int64 int64 int64 int64 1 ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... 4 ... ... ... ... ... ... ... 5 ... ... ... ... ... ... ... Dask Name: astype, 1186 tasks >>> print(dask_df.compute()) zone 0.0 10.0 20.0 30.0 40.0 50.0 0 0 1 0 0 0 0 0 1 1 3 0 0 0 0 0 2 3 1 2 0 0 0 0 3 5 0 0 0 1 0 0 4 6 1 2 2 0 0 1 5 7 0 1 0 0 1 1 """ zones = _maybe_rasterize_zones(zones, values, column=column, rasterize_kw=rasterize_kw) _validate_raster(zones, func_name='crosstab', name='zones', ndim=2) _validate_raster(values, func_name='crosstab', name='values', ndim=(2, 3)) # For 2D values, validate and align chunks between zones and values # This is critical for dask arrays that may come from different sources # (e.g., xarray Datasets via to_array().sel()) if values.ndim == 2: validate_arrays(zones, values) else: # 3D values: validate_arrays() requires equal shapes, so it cannot be # used here (zones is 2D, values is 3D). Check backend compatibility up # front instead, otherwise a mixed-backend call (e.g. numpy zones with # dask values) fails deep inside the dask/numba machinery with an # opaque error like "'NoneType' object is not subscriptable". zones_backend = _classify_backend(zones) values_backend = _classify_backend(values) if zones_backend != values_backend: raise ValueError( "`zones` and `values` must share the same backend; got " "'{}' (zones) and '{}' (values)".format( zones_backend, values_backend ) ) agg_2d = ["percentage", "count"] agg_3d_numpy = _DEFAULT_STATS.keys() agg_3d_dask = ["count"] if values.ndim == 2 and agg not in agg_2d: raise ValueError( f"`agg` method for 2D data array must be one of following {agg_2d}" ) if values.ndim == 3: if isinstance(values.data, np.ndarray) and agg not in agg_3d_numpy: raise ValueError( f"`agg` method for 3D numpy backed data array must be one of following {agg_3d_numpy}" # noqa ) if has_dask_array() and isinstance(values.data, da.Array) and agg not in agg_3d_dask: raise ValueError( f"`agg` method for 3D dask backed data array must be one of following {agg_3d_dask}" ) if len(values.shape) == 3: # 3D case if layer is None: layer = 0 try: values.indexes[values.dims[layer]].values except (IndexError, KeyError): raise ValueError("Invalid `layer`") dims = values.dims reshape_dims = [dims[layer]] + [d for d in dims if d != dims[layer]] # transpose by that category dimension values = values.transpose(*reshape_dims) if zones.shape != values.shape[1:]: raise ValueError("Incompatible shapes") if has_dask_array() and isinstance(values.data, da.Array): # dask case, rechunk if necessary zones_chunks = zones.chunks expected_values_chunks = { 0: (values.shape[0],), 1: zones_chunks[0], 2: zones_chunks[1], } actual_values_chunks = { i: values.chunks[i] for i in range(3) } if actual_values_chunks != expected_values_chunks: values.data = values.data.rechunk(expected_values_chunks) # find categories unique_cats, cat_ids = _find_cats(values, cat_ids, nodata_values) mapper = ArrayTypeFunctionMapping( numpy_func=_crosstab_numpy, dask_func=_crosstab_dask_numpy, cupy_func=_crosstab_cupy, dask_cupy_func=_crosstab_dask_cupy, ) crosstab_df = mapper(values)( zones.data, values.data, zone_ids, unique_cats, cat_ids, nodata_values, agg ) return crosstab_df
# --------------------------------------------------------------------------- # Hypsometric integral # --------------------------------------------------------------------------- def _hi_numpy(zones_data, values_data, nodata): """Numpy backend for hypsometric integral.""" unique_zones = np.unique(zones_data[np.isfinite(zones_data)]) if nodata is not None: unique_zones = unique_zones[unique_zones != nodata] out = np.full(values_data.shape, np.nan, dtype=np.float64) for z in unique_zones: mask = (zones_data == z) & np.isfinite(values_data) if not np.any(mask): continue vals = values_data[mask] mn, mx = vals.min(), vals.max() if mx == mn: continue # flat zone -> NaN hi = (vals.mean() - mn) / (mx - mn) out[mask] = hi return out def _hi_cupy(zones_data, values_data, nodata): """CuPy backend for hypsometric integral — transfer to host, compute, return.""" import cupy as cp result_np = _hi_numpy(cp.asnumpy(zones_data), cp.asnumpy(values_data), nodata) return cp.asarray(result_np) @delayed def _hi_block_stats(z_block, v_block, nodata): """Per-chunk: return dict mapping local zone IDs to (min, max, sum, count). Each block discovers its own zones, so the driver never has to compute a global unique-zone set up front. Sparse zones (geographic) stay sparse in the returned dict instead of being padded to a full (n_zones, 4) array. """ finite_v = np.isfinite(v_block) finite_z = np.isfinite(z_block) valid = finite_z & finite_v if not np.any(valid): return {} z_valid = z_block[valid] v_valid = v_block[valid] uzones = np.unique(z_valid) result = {} for z in uzones: if nodata is not None and z == nodata: continue mask = z_valid == z vals = v_valid[mask] if vals.size == 0: continue result[z.item() if hasattr(z, 'item') else z] = ( float(vals.min()), float(vals.max()), float(vals.sum()), int(vals.size), ) return result @delayed def _hi_reduce(partials_list): """Stream-merge per-block dicts into global hi_lookup. Scheduler peak memory is O(n_zones) for the merged dict, rather than O(n_blocks * n_zones) from a stacked array. Per-block partials arrive as a Python list but are iterated once and can be released. """ merged = {} for partial in partials_list: for z, (mn, mx, s, c) in partial.items(): if z in merged: om, oM, os_, oc = merged[z] merged[z] = (min(om, mn), max(oM, mx), os_ + s, oc + c) else: merged[z] = (mn, mx, s, c) hi_lookup = {} for z, (mn, mx, s, c) in merged.items(): if c == 0 or mx == mn: hi_lookup[z] = np.nan else: hi_lookup[z] = (s / c - mn) / (mx - mn) return hi_lookup @delayed def _hi_lookup_as_object_array(hi_lookup): """Wrap the HI lookup dict in a 0-d numpy object array. `map_blocks` only threads dask-array positional args through the graph lazily. Wrapping the dict in a 0-d object array lets us hand the delayed lookup to every paint chunk without computing it up front. """ arr = np.empty((), dtype=object) arr[()] = hi_lookup return arr def _hi_dask_numpy(zones_data, values_data, nodata): """Dask+numpy backend for hypsometric integral. Single graph evaluation: each block computes its local (zone -> stats) dict, a streaming reduce merges them into a lookup table, and map_blocks paints the result. No up-front `_unique_finite_zones` compute and no O(n_blocks * n_zones) scheduler-side stack. The lookup table stays lazy (a 0-d dask object array) so the whole pipeline is lazy: nothing runs until the caller computes the output. """ zones_blocks = zones_data.to_delayed().ravel() values_blocks = values_data.to_delayed().ravel() partials = [ _hi_block_stats(zb, vb, nodata) for zb, vb in zip(zones_blocks, values_blocks) ] hi_lookup_delayed = _hi_lookup_as_object_array(_hi_reduce(partials)) hi_lookup_arr = da.from_delayed( hi_lookup_delayed, shape=(), dtype=object, meta=np.array((), dtype=object), ) def _paint(zones_chunk, values_chunk, hi_map_arr): # hi_map_arr is the 0-d object array carrying the lookup dict. hi_map = hi_map_arr[()] out = np.full(zones_chunk.shape, np.nan, dtype=np.float64) for z, hi_val in hi_map.items(): mask = (zones_chunk == z) & np.isfinite(values_chunk) out[mask] = hi_val return out return da.map_blocks( _paint, zones_data, values_data, hi_lookup_arr, dtype=np.float64, meta=np.array(()), ) def _hi_dask_cupy(zones_data, values_data, nodata): """Dask+cupy backend: convert chunks to numpy, delegate, then re-wrap as cupy. The per-zone HI lookup table is a Python dict so the reduce step has to pass through host memory. The painted output is wrapped back as cupy chunks so the returned dask graph yields cupy arrays, keeping the backend consistent with the dask+cupy input (issue #2525). """ zones_cpu = zones_data.map_blocks( lambda x: x.get(), dtype=zones_data.dtype, meta=np.array(()), ) values_cpu = values_data.map_blocks( lambda x: x.get(), dtype=values_data.dtype, meta=np.array(()), ) result_cpu = _hi_dask_numpy(zones_cpu, values_cpu, nodata) # Re-wrap each numpy chunk as cupy so dispatch downstream stays on GPU. return result_cpu.map_blocks( cupy.asarray, dtype=result_cpu.dtype, meta=cupy.array(()), ) def hypsometric_integral( zones, values: xr.DataArray, nodata: Optional[int] = 0, column: Optional[str] = None, rasterize_kw: Optional[dict] = None, name: str = 'hypsometric_integral', ) -> xr.DataArray: """Hypsometric integral (HI) per zone, painted back to a raster. HI measures geomorphic maturity: ``(mean - min) / (max - min)`` computed over elevations within each zone. Values range from 0 to 1. Parameters ---------- zones : xr.DataArray, GeoDataFrame, or list of (geometry, value) pairs Zone definitions. Integer zone IDs. GeoDataFrame and list-of-pairs inputs are rasterized using *values* as the template grid. values : xr.DataArray 2D elevation raster (float), same shape as *zones*. nodata : int or None, default 0 Zone ID that means "no zone". Excluded from computation; those cells get NaN in the output. Set to ``None`` to include all IDs. column : str, optional Column in a GeoDataFrame containing zone IDs. rasterize_kw : dict, optional Extra keyword arguments for ``rasterize()`` when *zones* is vector. name : str, default ``'hypsometric_integral'`` Name for the output DataArray. Returns ------- xr.DataArray Float64 raster, same shape/dims/coords as *values*. Each cell holds the HI of its zone. NaN for nodata zones, non-finite elevation cells, and flat zones (elevation range = 0). """ zones = _maybe_rasterize_zones(zones, values, column=column, rasterize_kw=rasterize_kw) validate_arrays(zones, values) _nodata = nodata # capture for closures mapper = ArrayTypeFunctionMapping( numpy_func=lambda z, v: _hi_numpy(z, v, _nodata), cupy_func=lambda z, v: _hi_cupy(z, v, _nodata), dask_func=lambda z, v: _hi_dask_numpy(z, v, _nodata), dask_cupy_func=lambda z, v: _hi_dask_cupy(z, v, _nodata), ) out = mapper(zones)(zones.data, values.data) return xr.DataArray( out, name=name, dims=values.dims, coords=values.coords, attrs=values.attrs, ) def _apply_numpy(zones_data, values_data, func, nodata): out = values_data.copy() if nodata is not None: zone_mask = zones_data != nodata else: zone_mask = np.ones(zones_data.shape, dtype=bool) vfunc = np.vectorize(func) if values_data.ndim == 2: out[zone_mask] = vfunc(values_data[zone_mask]) else: # 3D for k in range(values_data.shape[2]): out[:, :, k][zone_mask] = vfunc(values_data[:, :, k][zone_mask]) return out def _make_apply_kernel(func): """Build a CUDA kernel that applies *func* element-wise.""" from numba import cuda as nb_cuda device_func = nb_cuda.jit(device=True)(func) @nb_cuda.jit def _kernel(zones, values, out, nodata_val, has_nodata): y, x = nb_cuda.grid(2) if y < zones.shape[0] and x < zones.shape[1]: if has_nodata and zones[y, x] == nodata_val: return out[y, x] = device_func(values[y, x]) return _kernel def _apply_cupy_gpu(zones_data, values_data, kernel, nodata): """Run the CUDA apply kernel on cupy arrays.""" out = values_data.copy() has_nodata = nodata is not None nodata_val = nodata if has_nodata else 0 griddim, blockdim = cuda_args(values_data.shape[:2]) if values_data.ndim == 2: kernel[griddim, blockdim]( zones_data, values_data, out, nodata_val, has_nodata, ) else: for k in range(values_data.shape[2]): kernel[griddim, blockdim]( zones_data, values_data[:, :, k], out[:, :, k], nodata_val, has_nodata, ) return out def _apply_cupy(zones_data, values_data, func, nodata): try: kernel = _make_apply_kernel(func) return _apply_cupy_gpu(zones_data, values_data, kernel, nodata) except Exception: result_np = _apply_numpy(zones_data.get(), values_data.get(), func, nodata) return cupy.asarray(result_np) def _apply_dask_numpy(zones_data, values_data, func, nodata): def _chunk_fn(zones_chunk, values_chunk): return _apply_numpy(zones_chunk, values_chunk, func, nodata) if values_data.ndim == 2: return da.map_blocks( _chunk_fn, zones_data, values_data, dtype=values_data.dtype, meta=np.array(()), ) else: layers = [] for k in range(values_data.shape[2]): layer = values_data[:, :, k].rechunk(zones_data.chunks) layers.append(da.map_blocks( _chunk_fn, zones_data, layer, dtype=values_data.dtype, meta=np.array(()), )) stacked = da.stack(layers, axis=2) # da.stack produces unit chunks along the new axis; merge back # to the input chunking so downstream ops see the same shape. return stacked.rechunk({2: values_data.chunks[2]}) def _apply_dask_cupy(zones_data, values_data, func, nodata): # Try GPU: build kernel once, reuse across all chunks try: kernel = _make_apply_kernel(func) gpu_ok = True except Exception: gpu_ok = False if gpu_ok: def _chunk_fn(zones_chunk, values_chunk): try: return _apply_cupy_gpu(zones_chunk, values_chunk, kernel, nodata) except Exception: result_np = _apply_numpy( zones_chunk.get(), values_chunk.get(), func, nodata, ) return cupy.asarray(result_np) else: def _chunk_fn(zones_chunk, values_chunk): result_np = _apply_numpy( zones_chunk.get(), values_chunk.get(), func, nodata, ) return cupy.asarray(result_np) if values_data.ndim == 2: return da.map_blocks( _chunk_fn, zones_data, values_data, dtype=values_data.dtype, meta=cupy.array(()), ) else: layers = [] for k in range(values_data.shape[2]): layer = values_data[:, :, k].rechunk(zones_data.chunks) layers.append(da.map_blocks( _chunk_fn, zones_data, layer, dtype=values_data.dtype, meta=cupy.array(()), )) stacked = da.stack(layers, axis=2) # da.stack produces unit chunks along the new axis; merge back # to the input chunking so downstream ops see the same shape. return stacked.rechunk({2: values_data.chunks[2]})
[docs] def apply( zones, values: xr.DataArray, func: Callable, nodata: Optional[int] = 0, column: Optional[str] = None, rasterize_kw: Optional[dict] = None, name: Optional[str] = None, ) -> xr.DataArray: """ Apply a function to the `values` agg within zones in `zones` agg. Returns a new DataArray with the function applied. Parameters ---------- zones : xr.DataArray, GeoDataFrame, or list of (geometry, value) pairs Zone definitions. Can be: - A 2D xarray DataArray of integers. - A ``geopandas.GeoDataFrame`` (requires *column*). - A list of ``(shapely geometry, zone_id)`` pairs. When vector input is provided, ``rasterize()`` is called internally using *values* as the template grid. Use ``rasterize_kw={'dtype': int, 'fill': 0}`` to produce integer zones (required by ``apply``); ``rasterize`` rejects an integer dtype with the default NaN fill (#2504). values : xr.DataArray values.data is either a 2D or 3D array of integers or floats. The input value raster. func : callable function to apply. nodata: int, default=0 Nodata value in `zones` raster. Cells with `nodata` does not belong to any zone, and thus excluded from calculation. Set to None to apply func to all cells. column : str, optional Column name in the GeoDataFrame that contains zone IDs. Required when *zones* is a GeoDataFrame. rasterize_kw : dict, optional Extra keyword arguments forwarded to ``rasterize()`` when *zones* is vector input. name : str, optional Output ``xr.DataArray.name`` property. Defaults to ``None``, which is the same across every backend (without it the dask backends inherit an internal task name instead). Returns ------- result : xr.DataArray A new DataArray with the same shape, dims, coords, and attrs as `values`, with `func` applied to cells within zones. Examples -------- .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.zonal import apply >>> zones_val = np.array([ [1, 1, 0, 2], [0, 2, 1, 2]]) >>> zones = xr.DataArray(zones_val) >>> values_val = np.array([ [2, -1, 5, 3], [3, np.nan, 20, 10]]) >>> agg = xr.DataArray(values_val) >>> func = lambda x: 0 >>> result = apply(zones, agg, func) >>> result array([[0, 0, 5, 0], [3, np.nan, 0, 0]]) """ zones = _maybe_rasterize_zones(zones, values, column=column, rasterize_kw=rasterize_kw) _validate_raster(zones, func_name='apply', name='zones', ndim=2, integer_only=True) _validate_raster(values, func_name='apply', name='values', ndim=(2, 3)) if zones.shape != values.shape[:2]: raise ValueError("Incompatible shapes between `zones` and `values`") # align chunks for 2D values if values.ndim == 2: validate_arrays(zones, values) else: # 3D values: validate_arrays can't be used because it requires equal # full shapes (a 2D zones never equals a 3D values). Check backend # compatibility directly so mixed dask/numpy inputs fail here with a # clear error instead of crashing in the dask backend with an # AttributeError or silently returning eager numpy output. zones_backend = _classify_backend(zones) values_backend = _classify_backend(values) if zones_backend != values_backend: # Wording mirrors validate_arrays() in utils.py so the two stay # greppable together; the labels replace its "array 0"/"array N". raise ValueError( "input arrays must share the same backend; got " f"'{zones_backend}' (zones) and '{values_backend}' (values)" ) mapper = ArrayTypeFunctionMapping( numpy_func=_apply_numpy, dask_func=_apply_dask_numpy, cupy_func=_apply_cupy, dask_cupy_func=_apply_dask_cupy, ) out = mapper(values)(zones.data, values.data, func, nodata) result = xr.DataArray( out, dims=values.dims, coords=values.coords, attrs=values.attrs, ) # Assign .name after construction. When `out` is a dask array, # xr.DataArray(..., name=None) inherits the dask graph's task name, # so the result name would differ from the numpy/cupy backends and # change between runs. Setting it here forces a deterministic name # (None by default) across all four backends. See issue #2611. result.name = name return result
[docs] def get_full_extent(crs): """ Returns the full extent of a map projection, available projections are 'Mercator' and 'Geographic'. Parameters ---------- crs : str Input projection. Returns ------- extent : tuple Projection extent of form ((min_x, max_x), (min_y, max_y)). Examples -------- .. sourcecode:: python >>> from xrspatial.zonal import get_full_extent >>> extent = get_full_extent('Mercator') >>> print(extent) ((-20000000.0, 20000000.0), (-20000000.0, 20000000.0)) """ Mercator = (-20e6, 20e6), (-20e6, 20e6) Geographic = (-180, 180), (-90, 90) def _crs_code_mapping(): CRS_CODES = {} CRS_CODES["Mercator"] = Mercator CRS_CODES["Geographic"] = Geographic return CRS_CODES CRS_CODES = _crs_code_mapping() return CRS_CODES[crs]
[docs] def suggest_zonal_canvas( smallest_area: Union[int, float], x_range: Union[tuple, list], y_range: Union[tuple, list], crs: str = "Mercator", min_pixels: int = 25, ) -> tuple: """ Given a coordinate reference system (crs), a set of polygons with corresponding x range and y range, calculate the height and width of canvas so that the smallest polygon (polygon with smallest area) is rasterized with at least min pixels. Currently, we assume that the smallest polygon does not intersect others. One should note that a polygon can have different shapes when it is rasterized in canvases of different size. Thus, we cannot 100% guarantee the actual number of pixels after rasterization. It is recommended to add an additional of 5% to @min_pixels parameter. Parameters ---------- x_range : tuple or list of float or int The full x extent of the polygon GeoDataFrame. y_range : tuple or list of float or int The full y extent of the polygon GeoDataFrame. smallest_area : float or int Area of the smallest polygon. crs : str, default='Mercator' Name of the coordinate reference system. min_pixels : int, default=25 Expected number of pixels of the polygon with smallest area when the whole dataframe is rasterized. Returns ------- height, width: int Height and width of the canvas in pixel space. Examples -------- .. sourcecode:: python >>> # Imports (datashader is optional: pip install datashader) >>> from spatialpandas import GeoDataFrame >>> import geopandas as gpd >>> import datashader as ds >>> from xrspatial.zonal import suggest_zonal_canvas >>> df = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres')) >>> df = df.to_crs("EPSG:3857") >>> df = df[df.continent != 'Antarctica'] >>> df['id'] = [i for i in range(len(df.index))] >>> xmin, ymin, xmax, ymax = ( df.bounds.minx.min(), df.bounds.miny.min(), df.bounds.maxx.max(), df.bounds.maxy.max(), ) >>> x_range = (xmin, xmax) >>> y_range = (ymin, ymax) >>> smallest_area = df.area.min() >>> min_pixels = 20 >>> height, width = suggest_zonal_canvas( x_range=x_range, y_range=y_range, smallest_area=smallest_area, crs='Mercator', min_pixels=min_pixels, ) >>> height, width (1537, 2376) >>> cvs = ds.Canvas(x_range=x_range, y_range=y_range, >>> plot_height=height, plot_width=width) >>> spatial_df = GeoDataFrame(df, geometry='geometry') >>> agg = cvs.polygons(spatial_df, 'geometry', agg=ds.max('id')) >>> min_poly_id = df.area.argmin() >>> actual_min_pixels = len(np.where(agg.data==min_poly_id)[0]) >>> actual_min_pixels 22 """ full_xrange, full_yrange = get_full_extent(crs) xmin, xmax = full_xrange ymin, ymax = full_yrange aspect_ratio = (xmax - xmin) / (ymax - ymin) # area that a pixel stands for pixel_area = smallest_area / min_pixels # total_area of whole GeoDataFrame total_area = (xmax - xmin) * (ymax - ymin) # total pixels needed total_pixels = total_area / pixel_area # We have, h * w = total_pixels # and, w / h = aspect_ratio # Thus, aspect_ratio * h**2 = total_pixels h = sqrt(total_pixels / aspect_ratio) w = aspect_ratio * h canvas_h = int(h * (y_range[1] - y_range[0]) / (ymax - ymin)) canvas_w = int(w * (x_range[1] - x_range[0]) / (xmax - xmin)) return canvas_h, canvas_w
def _regions_numpy(data, neighborhood): """Connected-component labeling using scipy.ndimage.label (union-find).""" from scipy.ndimage import label if neighborhood == 4: structure = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]]) else: structure = np.ones((3, 3), dtype=int) is_float = np.issubdtype(data.dtype, np.floating) valid = ~np.isnan(data) if is_float else np.ones(data.shape, dtype=bool) unique_vals = np.unique(data[valid]) out = np.full(data.shape, np.nan, dtype=np.float64) uid = 1 for v in unique_vals: mask = (data == v) if is_float: mask &= valid labeled, n_features = label(mask, structure=structure) for region_id in range(1, n_features + 1): out[labeled == region_id] = uid uid += 1 return out def _available_memory_bytes(): """Best-effort estimate of available memory in bytes.""" try: with open('/proc/meminfo', 'r') as f: for line in f: if line.startswith('MemAvailable:'): return int(line.split()[1]) * 1024 except (OSError, ValueError, IndexError): pass try: import psutil return psutil.virtual_memory().available except (ImportError, AttributeError): pass return 2 * 1024 ** 3 def _check_stats_dataarray_memory(n_stats, values_shape): """Guard the (n_stats, H*W) float64 buffer in ``_stats_numpy``. The ``return_type='xarray.DataArray'`` branch allocates a same-shape output replicated per requested statistic, so peak memory scales linearly with ``len(stats_funcs)``. Refuse the request when the buffer would exceed half of available RAM. """ n_cells = 1 for s in values_shape: n_cells *= int(s) required = n_stats * n_cells * 8 # float64 avail = _available_memory_bytes() if required > 0.5 * avail: raise MemoryError( f"stats(return_type='xarray.DataArray') needs " f"~{required / 1e9:.1f} GB for an " f"({n_stats}, {n_cells}) float64 result buffer " f"but only ~{avail / 1e9:.1f} GB is available. " "Reduce `stats_funcs`, use a smaller raster, or call " "stats(..., return_type='pandas.DataFrame') instead." ) def _regions_dask(data, neighborhood): """Dask backend: compute to numpy and run scipy label.""" avail = _available_memory_bytes() # Estimate without touching .nbytes (which can trigger graph inspection # on large arrays). The algorithm needs the full array in RAM plus # scratch space (~5x). estimated_bytes = np.prod(data.shape) * data.dtype.itemsize if estimated_bytes * 5 > 0.5 * avail: raise MemoryError( f"regions() needs the full array in memory (~{estimated_bytes * 5 / 1e9:.1f} GB) " f"but only ~{avail / 1e9:.1f} GB is available. " f"Connected-component labeling is a global operation that cannot be " f"chunked. Consider downsampling or tiling the input manually." ) np_data = data.compute() result = _regions_numpy(np_data, neighborhood) return da.from_array(result, chunks=data.chunks) def _regions_cupy(data, neighborhood): """CuPy GPU backend using cupyx.scipy.ndimage.label.""" import cupy as cp from cupyx.scipy.ndimage import label as cp_label if neighborhood == 4: structure = cp.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]]) else: structure = cp.ones((3, 3), dtype=int) is_float = cp.issubdtype(data.dtype, cp.floating) valid = ~cp.isnan(data) if is_float else cp.ones(data.shape, dtype=bool) unique_vals = cp.unique(data[valid]) out = cp.full(data.shape, cp.nan, dtype=cp.float64) uid = 1 for v in unique_vals: mask = (data == v) if is_float: mask &= valid labeled, n_features = cp_label(mask, structure=structure) for region_id in range(1, n_features + 1): out[labeled == region_id] = uid uid += 1 return out def _regions_dask_cupy(data, neighborhood): """Dask+CuPy backend: compute to cupy and run GPU label.""" estimated_bytes = np.prod(data.shape) * data.dtype.itemsize try: import cupy as cp free_gpu, _total_gpu = cp.cuda.Device().mem_info if estimated_bytes * 5 > 0.5 * free_gpu: raise MemoryError( f"regions() needs the full array on GPU (~{estimated_bytes * 5 / 1e9:.1f} GB) " f"but only ~{free_gpu / 1e9:.1f} GB free. " f"Connected-component labeling is a global operation that cannot be " f"chunked. Consider downsampling or tiling the input manually." ) except (ImportError, AttributeError): pass cp_data = data.compute() result = _regions_cupy(cp_data, neighborhood) return da.from_array(result, chunks=data.chunks)
[docs] def regions( raster: xr.DataArray, neighborhood: int = 4, name: str = "regions" ) -> xr.DataArray: """ Create unique regions of raster based on pixel value connectivity. Connectivity can be based on either 4 or 8-pixel neighborhoods. Output raster contain a unique int for each connected region. Parameters ---------- raster : xr.DataArray neighborhood : int, default=4 4 or 8 pixel-based connectivity. name : str, default='regions' Output xr.DataArray.name property. Returns ------- regions_agg : xarray.DataArray References ---------- - Tomislav Hengl: http://spatial-analyst.net/ILWIS/htm/ilwisapp/areanumbering_algorithm.htm # noqa Examples -------- .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.zonal import regions >>> # Create a raster with distinct value regions >>> arr = np.array([[1, 1, 0, 2, 2], ... [1, 1, 0, 2, 2], ... [0, 0, 0, 0, 0], ... [3, 3, 0, 3, 3], ... [3, 3, 0, 3, 3]], dtype=np.float64) >>> raster = xr.DataArray(arr, dims=['y', 'x']) >>> print(raster.values) [[1. 1. 0. 2. 2.] [1. 1. 0. 2. 2.] [0. 0. 0. 0. 0.] [3. 3. 0. 3. 3.] [3. 3. 0. 3. 3.]] >>> # With 4-connectivity, each group of connected same-value >>> # pixels becomes a unique region >>> result = regions(raster, neighborhood=4) >>> print(result.values) [[1. 1. 2. 3. 3.] [1. 1. 2. 3. 3.] [2. 2. 2. 2. 2.] [5. 5. 2. 6. 6.] [5. 5. 2. 6. 6.]] >>> # Note: The two bottom-corner 3-regions are separate because >>> # they are not connected (the zero-valued cross separates them) >>> print(f"Number of unique regions: {len(np.unique(result.values))}") Number of unique regions: 5 >>> # With 8-connectivity, diagonal neighbors also connect regions >>> diagonal = np.array([[1, 0, 1], ... [0, 1, 0], ... [1, 0, 1]], dtype=np.float64) >>> raster_diag = xr.DataArray(diagonal, dims=['y', 'x']) >>> result_8 = regions(raster_diag, neighborhood=8) >>> print(result_8.values) [[1. 2. 1.] [2. 1. 2.] [1. 2. 1.]] >>> # All 1s are connected diagonally into one region, >>> # all 0s form another region >>> print(f"Number of unique regions: {len(np.unique(result_8.values))}") Number of unique regions: 2 """ _validate_raster(raster, func_name='regions', name='raster', ndim=2) if neighborhood not in (4, 8): raise ValueError("`neighborhood` value must be either 4 or 8)") data = raster.data if isinstance(data, np.ndarray): out = _regions_numpy(data, neighborhood) elif has_cuda_and_cupy() and is_cupy_array(data): out = _regions_cupy(data, neighborhood) elif da is not None and isinstance(data, da.Array): if is_dask_cupy(raster): out = _regions_dask_cupy(data, neighborhood) else: out = _regions_dask(data, neighborhood) else: raise TypeError( f"Unsupported array type {type(data).__name__} for regions()" ) return DataArray( out, name=name, dims=raster.dims, coords=raster.coords, attrs=raster.attrs )
def _bool_crop(arr, rows_flags, cols_flags): top = np.argwhere(rows_flags).flatten()[0] bottom = np.argwhere(rows_flags).flatten()[-1] left = np.argwhere(cols_flags).flatten()[0] right = np.argwhere(cols_flags).flatten()[-1] return arr[top: bottom + 1, left: right + 1] @ngjit def _trim(data, excludes): rows, cols = data.shape # find empty top rows top = 0 scan_complete = False for y in range(rows): if scan_complete: break top = y for x in range(cols): val = data[y, x] is_nodata = False for e in excludes: if e == val: is_nodata = True break if not is_nodata: scan_complete = True break # find empty bottom rows bottom = 0 scan_complete = False for y in range(rows - 1, -1, -1): if scan_complete: break bottom = y for x in range(cols): val = data[y, x] is_nodata = False for e in excludes: if e == val: is_nodata = True break if not is_nodata: scan_complete = True break # find empty left cols left = 0 scan_complete = False for x in range(cols): if scan_complete: break left = x for y in range(rows): val = data[y, x] is_nodata = False for e in excludes: if e == val: is_nodata = True break if not is_nodata: scan_complete = True break # find empty right cols right = 0 scan_complete = False for x in range(cols - 1, -1, -1): if scan_complete: break right = x for y in range(rows): val = data[y, x] is_nodata = False for e in excludes: if e == val: is_nodata = True break if not is_nodata: scan_complete = True break return top, bottom, left, right def _trim_bounds_dask(data, excludes): """Find trim bounds using lazy dask reductions (O(rows+cols) memory).""" excluded = da.zeros_like(data, dtype=bool) for v in excludes: if isinstance(v, float) and np.isnan(v): excluded = excluded | da.isnan(data) else: excluded = excluded | (data == v) all_excl_rows = excluded.all(axis=1) all_excl_cols = excluded.all(axis=0) row_mask, col_mask = dask.compute(all_excl_rows, all_excl_cols) # dask+cupy computes to cupy arrays; move to numpy for np.where if is_cupy_array(row_mask): row_mask = row_mask.get() if is_cupy_array(col_mask): col_mask = col_mask.get() data_rows = np.where(~np.asarray(row_mask))[0] data_cols = np.where(~np.asarray(col_mask))[0] if len(data_rows) == 0 or len(data_cols) == 0: return 0, -1, 0, -1 # empty slice return (int(data_rows[0]), int(data_rows[-1]), int(data_cols[0]), int(data_cols[-1])) def _split_nan_excludes(values): """Return (finite_excludes_array, has_nan). NaN sentinels cannot be matched by the numba kernel's ``e == val`` test (``NaN == NaN`` is False) and ``np.isnan`` is not callable on integer dtypes inside numba, so NaN matching is handled in the wrapper instead of inside ``_trim``. """ has_nan = False finite = [] for v in values: if isinstance(v, float) and np.isnan(v): has_nan = True else: finite.append(v) return np.asarray(finite), has_nan def _trim_bounds_numpy(data, excludes): """Find trim bounds using a numpy row/col reduction (handles NaN).""" finite, has_nan = _split_nan_excludes(excludes) excluded = np.zeros(data.shape, dtype=bool) if has_nan and np.issubdtype(data.dtype, np.floating): excluded |= np.isnan(data) for v in finite: excluded |= (data == v) all_excl_rows = excluded.all(axis=1) all_excl_cols = excluded.all(axis=0) data_rows = np.where(~all_excl_rows)[0] data_cols = np.where(~all_excl_cols)[0] if len(data_rows) == 0 or len(data_cols) == 0: return 0, -1, 0, -1 # empty slice return (int(data_rows[0]), int(data_rows[-1]), int(data_cols[0]), int(data_cols[-1]))
[docs] def trim( raster: xr.DataArray, values: Union[list, tuple] = (np.nan,), name: str = "trim" ) -> xr.DataArray: """ Trim scans from the edges and eliminates rows / cols which only contain the values supplied. Parameters ---------- raster: xr.DataArray values: list or tuple, default=(np.nan) List of zone ids to trim from raster edge. name: str, default='trim' Output xr.DataArray.name property. Returns ------- trim_agg : xarray.DataArray Notes ----- - This operation will change the output size of the raster. Examples -------- .. plot:: :include-source: import matplotlib.pyplot as plt import numpy as np import xarray as xr from xrspatial import generate_terrain from xrspatial.zonal import trim # Generate Example Terrain W = 500 H = 300 template_terrain = xr.DataArray(np.zeros((H, W))) x_range=(-20e6, 20e6) y_range=(-20e6, 20e6) terrain_agg = generate_terrain( template_terrain, x_range=x_range, y_range=y_range ) # Edit Attributes terrain_agg = terrain_agg.assign_attrs( { 'Description': 'Example Terrain', 'units': 'km', 'Max Elevation': '4000', } ) terrain_agg = terrain_agg.rename({'x': 'lon', 'y': 'lat'}) terrain_agg = terrain_agg.rename('Elevation') terrain_agg = terrain_agg.astype('int') # Trim Image trimmed_agg = trim(raster = terrain_agg, values = [0]) # Edit Attributes trimmed_agg = trimmed_agg.assign_attrs({'Description': 'Example Trim'}) # Plot Terrain terrain_agg.plot(cmap = 'terrain', aspect = 2, size = 4) plt.title("Terrain") plt.ylabel("latitude") plt.xlabel("longitude") # Plot Trimmed Terrain trimmed_agg.plot(cmap = 'terrain', aspect = 2, size = 4) plt.title("Trim") plt.ylabel("latitude") plt.xlabel("longitude") .. sourcecode:: python >>> print(terrain_agg.shape) (300, 500) >>> print(terrain_agg.attrs) { 'res': (80000.0, 133333.3333333333), 'Description': 'Example Terrain', 'units': 'km', 'Max Elevation': '4000', } >>> print(trimmed_agg.shape) (268, 500) >>> print(trimmed_agg.attrs) { 'res': (80000.0, 133333.3333333333), 'Description': 'Example Trim', 'units': 'km', 'Max Elevation': '4000', } """ _validate_raster(raster, func_name='trim', name='raster', ndim=2) data = raster.data if has_dask_array() and isinstance(data, da.Array): top, bottom, left, right = _trim_bounds_dask(data, values) else: if is_cupy_array(data): data = data.get() finite, has_nan = _split_nan_excludes(values) # NaN sentinels cannot be matched by the numba kernel # (NaN == NaN is False). Route to the numpy bounds helper # whenever NaN matching is needed. if has_nan: top, bottom, left, right = _trim_bounds_numpy(data, values) else: top, bottom, left, right = _trim(data, finite) arr = raster[top: bottom + 1, left: right + 1] arr.name = name return arr
@ngjit def _crop(data, values): """Scan-based crop bounds. Returns ------- top, bottom, left, right, found : ints ``found`` is 1 if any cell in *data* matches one of *values*, else 0. When ``found == 0`` the bounds are meaningless and the caller must treat the result as an empty crop. """ rows, cols = data.shape top = 0 bottom = 0 left = 0 right = 0 found = 0 # find empty top rows scan_complete = False for y in range(rows): if scan_complete: break top = y for x in range(cols): val = data[y, x] for v in values: if v == val: scan_complete = True found = 1 break else: continue if scan_complete: break if found == 0: return 0, 0, 0, 0, 0 # find empty bottom rows scan_complete = False for y in range(rows - 1, -1, -1): if scan_complete: break bottom = y for x in range(cols): val = data[y, x] for e in values: if e == val: scan_complete = True break else: continue if scan_complete: break # find empty left cols scan_complete = False for x in range(cols): if scan_complete: break left = x for y in range(rows): val = data[y, x] for e in values: if e == val: scan_complete = True break else: continue if scan_complete: break # find empty right cols scan_complete = False for x in range(cols - 1, -1, -1): if scan_complete: break right = x for y in range(rows): val = data[y, x] for e in values: if e == val: scan_complete = True break else: continue if scan_complete: break return top, bottom, left, right, found def _crop_bounds_dask(data, target_values): """Find crop bounds using lazy dask reductions (O(rows+cols) memory). Returns ------- top, bottom, left, right, found : ints ``found`` is 1 if any cell in *data* matches one of *target_values*, else 0. When ``found == 0`` the bounds are meaningless and the caller must treat the result as an empty crop. Matches the contract of :func:`_crop`. """ matched = da.zeros_like(data, dtype=bool) for v in target_values: matched = matched | (data == v) any_match_rows = matched.any(axis=1) any_match_cols = matched.any(axis=0) row_mask, col_mask = dask.compute(any_match_rows, any_match_cols) # dask+cupy computes to cupy arrays; move to numpy for np.where if is_cupy_array(row_mask): row_mask = row_mask.get() if is_cupy_array(col_mask): col_mask = col_mask.get() match_rows = np.where(np.asarray(row_mask))[0] match_cols = np.where(np.asarray(col_mask))[0] if len(match_rows) == 0 or len(match_cols) == 0: return 0, 0, 0, 0, 0 return (int(match_rows[0]), int(match_rows[-1]), int(match_cols[0]), int(match_cols[-1]), 1)
[docs] def crop( zones, values: xr.DataArray, zone_ids: Optional[Union[list, tuple]] = None, name: str = "crop", column: Optional[str] = None, rasterize_kw: Optional[dict] = None, zones_ids: Optional[Union[list, tuple]] = None, ): """ Crop scans from edges and eliminates rows / cols until one of the input values is found. Parameters ---------- zones : xr.DataArray, GeoDataFrame, or list of (geometry, value) pairs Zone definitions. Can be a 2D DataArray, a GeoDataFrame (requires *column*), or a list of ``(geometry, zone_id)`` pairs. values: xr.DataArray Input values raster. zone_ids : list or tuple List of zone ids to crop raster. Matches the ``zone_ids`` parameter of :func:`stats` and :func:`crosstab`. name: str, default='crop' Output xr.DataArray.name property. column : str, optional Column name in the GeoDataFrame that contains zone IDs. Required when *zones* is a GeoDataFrame. rasterize_kw : dict, optional Extra keyword arguments forwarded to ``rasterize()`` when *zones* is vector input. zones_ids : list or tuple, optional Deprecated alias for ``zone_ids``. Will emit a ``DeprecationWarning`` and be removed in a future release. Returns ------- crop_agg : xarray.DataArray Notes ----- - This operation will change the output size of the raster. - ``zones`` and ``values`` must have the same shape and backend; otherwise a ``ValueError`` is raised (consistent with :func:`stats` and :func:`crosstab`). - If none of the requested ``zone_ids`` are present in ``zones``, the returned DataArray has shape ``(0, 0)``. This behaviour is the same across all backends (numpy, cupy, dask+numpy, dask+cupy). Examples -------- .. plot:: :include-source: import matplotlib.pyplot as plt import numpy as np import xarray as xr from xrspatial import generate_terrain from xrspatial.zonal import crop # Generate Example Terrain W = 500 H = 300 template_terrain = xr.DataArray(np.zeros((H, W))) x_range=(-20e6, 20e6) y_range=(-20e6, 20e6) terrain_agg = generate_terrain( template_terrain, x_range=x_range, y_range=y_range ) # Edit Attributes terrain_agg = terrain_agg.assign_attrs( { 'Description': 'Example Terrain', 'units': 'km', 'Max Elevation': '4000', } ) terrain_agg = terrain_agg.rename({'x': 'lon', 'y': 'lat'}) terrain_agg = terrain_agg.rename('Elevation') # Create a simple zone raster (0 = below median, 1 = above) zones_agg = (terrain_agg > terrain_agg.median()).astype(int) zones_agg.attrs = terrain_agg.attrs zones_agg = zones_agg.rename('Zone') # Crop to keep only the above-median zone values_agg = terrain_agg[0:300, 0:250] zones_sub = zones_agg[0:300, 0:250] cropped_agg = crop( zones=zones_sub, values=values_agg, zone_ids=[1], ) # Edit Attributes cropped_agg = cropped_agg.assign_attrs({'Description': 'Example Crop'}) # Plot Terrain terrain_agg.plot(cmap = 'terrain', aspect = 2, size = 4) plt.title("Terrain") plt.ylabel("latitude") plt.xlabel("longitude") # Plot Cropped Terrain cropped_agg.plot(cmap = 'terrain', aspect = 2, size = 4) plt.title("Crop") plt.ylabel("latitude") plt.xlabel("longitude") .. sourcecode:: python >>> print(terrain_agg.shape) (300, 500) >>> print(terrain_agg.attrs) { 'res': (80000.0, 133333.3333333333), 'Description': 'Example Terrain', 'units': 'km', 'Max Elevation': '4000', } >>> print(cropped_agg.shape) (300, 250) >>> print(cropped_agg.attrs) { 'res': (80000.0, 133333.3333333333), 'Description': 'Example Crop', 'units': 'km', 'Max Elevation': '4000', } """ # Backwards-compatible alias: stats() and crosstab() use `zone_ids`, # crop() historically used `zones_ids` (extra 's'). Accept both, # emit a DeprecationWarning on the old name, raise if both are passed. if zones_ids is not None: import warnings if zone_ids is not None: raise TypeError( "crop() received both `zone_ids` and `zones_ids`; pass " "only `zone_ids` (the canonical name)." ) warnings.warn( "crop(zones_ids=...) is deprecated and will be removed in a " "future release; use `zone_ids=...` to match stats() and " "crosstab().", DeprecationWarning, stacklevel=2, ) zone_ids = zones_ids if zone_ids is None: raise TypeError( "crop() missing required argument `zone_ids` (list or tuple " "of zone ids to crop to)." ) zones = _maybe_rasterize_zones(zones, values, column=column, rasterize_kw=rasterize_kw) _validate_raster(zones, func_name='crop', name='zones', ndim=2) _validate_raster(values, func_name='crop', name='values', ndim=2) # Guard against mismatched shapes / backends, consistent with stats() # and crosstab(). Without this, a zones/values shape mismatch silently # produces a nonsense crop instead of raising (GH #2638). validate_arrays(zones, values) data = zones.data if has_dask_array() and isinstance(data, da.Array): top, bottom, left, right, found = _crop_bounds_dask(data, zone_ids) else: if is_cupy_array(data): data = data.get() top, bottom, left, right, found = _crop(data, np.asarray(zone_ids)) if not found: # No requested zone exists in `zones`; return an empty (0, 0) slice # so all backends agree (see GH #2561). Slicing with `0:0` preserves # the underlying array type (numpy/cupy/dask) and the dim names. arr = values[0:0, 0:0] else: arr = values[top: bottom + 1, left: right + 1] arr.name = name return arr