Source code for xrspatial.classify

from __future__ import annotations

import warnings
from functools import partial
from typing import List, Optional

import cmath

import xarray as xr

try:
    import cupy
except ImportError:
    class cupy(object):
        ndarray = False

try:
    import dask
    import dask.array as da
except ImportError:
    dask = None
    da = None

import numba as nb
import numpy as np

from xrspatial.utils import (
    ArrayTypeFunctionMapping,
    _validate_raster,
    _validate_scalar,
    cuda_args,
    ngjit,
    not_implemented_func,
)
from xrspatial.dataset_support import supports_dataset


def _available_memory_bytes():
    """Best-effort estimate of available memory in bytes."""
    try:
        with open('/proc/meminfo', 'r') as f:
            for line in f:
                if line.startswith('MemAvailable:'):
                    return int(line.split()[1]) * 1024
    except (OSError, ValueError, IndexError):
        pass
    try:
        import psutil
        return psutil.virtual_memory().available
    except (ImportError, AttributeError):
        pass
    return 2 * 1024 ** 3


@ngjit
def _cpu_binary(data, values):
    out = np.empty(data.shape, dtype=data.dtype)
    out[:] = np.nan
    rows, cols = data.shape
    for y in range(0, rows):
        for x in range(0, cols):
            if np.any(values == data[y, x]):
                out[y, x] = 1
            elif np.isfinite(data[y, x]):
                out[y, x] = 0
    return out


def _run_numpy_binary(data, values):
    values = np.asarray(values)
    out = _cpu_binary(data, values)
    return out


def _run_dask_numpy_binary(data, values):
    _func = partial(_run_numpy_binary, values=values)
    out = data.map_blocks(_func)
    return out


@nb.cuda.jit(device=True)
def _gpu_binary(val, values):
    for v in values:
        if val == v:
            return 1
    return 0


@nb.cuda.jit
def _run_gpu_binary(data, values, out):
    i, j = nb.cuda.grid(2)
    if i >= 0 and i < out.shape[0] and j >= 0 and j < out.shape[1]:
        if cmath.isfinite(data[i, j]):
            out[i, j] = _gpu_binary(data[i, j], values)


def _run_cupy_binary(data, values):
    values_cupy = cupy.asarray(values)
    out = cupy.empty(data.shape, dtype='f4')
    out[:] = cupy.nan
    griddim, blockdim = cuda_args(data.shape)
    _run_gpu_binary[griddim, blockdim](data, values_cupy, out)
    return out


def _run_dask_cupy_binary(data, values_cupy):
    out = data.map_blocks(lambda da: _run_cupy_binary(da, values_cupy), meta=cupy.array(()))
    return out


[docs] @supports_dataset def binary(agg, values, name='binary'): """ Binarize a data array based on a set of values. Data that equals to a value in the set will be set to 1. In contrast, data that does not equal to any value in the set will be set to 0. Note that NaNs and infinite values will be set to NaNs. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or Cupy-backed Dask array of values to be reclassified. values : array-like object Values to keep in the binarized array. name : str, default='binary' Name of output aggregate array. Returns ------- binarized_agg : xr.DataArray or xr.Dataset 2D aggregate array of binarized data array. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. Examples -------- Binary works with NumPy backed xarray DataArray .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.classify import binary >>> data = np.array([ [np.nan, 1., 2., 3., 4.], [5., 6., 7., 8., 9.], [10., 11., 12., 13., 14.], [15., 16., 17., 18., np.inf], ], dtype=np.float32) >>> agg = xr.DataArray(data) >>> values = [1, 2, 3] >>> agg_binary = binary(agg, values) >>> print(agg_binary) <xarray.DataArray 'binary' (dim_0: 4, dim_1: 5)> array([[np.nan, 1., 1., 1., 0.], [0., 0., 0., 0., 0.], [0., 0., 0., 0., 0.], [0., 0., 0., 0., np.nan]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 """ _validate_raster(agg, func_name='binary', name='agg', ndim=None) mapper = ArrayTypeFunctionMapping(numpy_func=_run_numpy_binary, dask_func=_run_dask_numpy_binary, cupy_func=_run_cupy_binary, dask_cupy_func=_run_dask_cupy_binary) out = mapper(agg)(agg.data, values) return xr.DataArray(out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs)
@ngjit def _cpu_bin(data, bins, new_values): out = np.zeros(data.shape, dtype=np.float32) out[:] = np.nan rows, cols = data.shape nbins = len(bins) for y in range(0, rows): for x in range(0, cols): val = data[y, x] val_bin = -1 # find bin if np.isfinite(val): if val <= bins[0]: val_bin = 0 elif val <= bins[nbins - 1]: start = 0 end = nbins - 1 mid = (end + start) // 2 while start <= end: if bins[mid] < val: start = mid + 1 elif val > bins[mid - 1]: break else: end = mid - 1 mid = (end + start) // 2 val_bin = mid if val_bin > -1: out[y, x] = new_values[val_bin] else: out[y, x] = np.nan return out def _run_numpy_bin(data, bins, new_values): bins = np.asarray(bins) new_values = np.asarray(new_values) out = _cpu_bin(data, bins, new_values) return out def _run_dask_numpy_bin(data, bins, new_values): _func = partial(_run_numpy_bin, bins=bins, new_values=new_values) out = data.map_blocks(_func) return out @nb.cuda.jit(device=True) def _gpu_bin(data, bins, new_values): nbins = len(bins) val = data[0, 0] val_bin = -1 # find bin for b in range(0, nbins): # first bin if b == 0: if val <= bins[b]: val_bin = b break else: if val > bins[b - 1] and val <= bins[b]: val_bin = b break if val_bin > -1: out = new_values[val_bin] else: out = np.nan return out @nb.cuda.jit def _run_gpu_bin(data, bins, new_values, out): i, j = nb.cuda.grid(2) if (i >= 0 and i < out.shape[0] and j >= 0 and j < out.shape[1]): out[i, j] = _gpu_bin(data[i:i+1, j:j+1], bins, new_values) def _run_cupy_bin(data, bins, new_values): # replace ±inf with nan in a single pass to avoid classifying outliers data = cupy.where(cupy.isinf(data), cupy.nan, data) bins_cupy = cupy.asarray(bins) new_values_cupy = cupy.asarray(new_values) out = cupy.empty(data.shape, dtype='f4') out[:] = cupy.nan griddim, blockdim = cuda_args(data.shape) _run_gpu_bin[griddim, blockdim](data, bins_cupy, new_values_cupy, out) return out def _run_dask_cupy_bin(data, bins_cupy, new_values_cupy): out = data.map_blocks(lambda da: _run_cupy_bin(da, bins_cupy, new_values_cupy), meta=cupy.array(())) return out def _bin(agg, bins, new_values): mapper = ArrayTypeFunctionMapping(numpy_func=_run_numpy_bin, dask_func=_run_dask_numpy_bin, cupy_func=_run_cupy_bin, dask_cupy_func=_run_dask_cupy_bin) out = mapper(agg)(agg.data, bins, new_values) return out
[docs] @supports_dataset def reclassify(agg: xr.DataArray, bins: List[int], new_values: List[int], name: Optional[str] = 'reclassify') -> xr.DataArray: """ Reclassifies data for array `agg` into new values based on user defined bins. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or Cupy-backed Dask array of values to be reclassified. bins : array-like object Values or ranges of values to be changed. new_values : array-like object New values for each bin. name : str, default='reclassify' Name of output aggregate array. Returns ------- reclass_agg : xr.DataArray or xr.Dataset 2D aggregate array of reclassified allocations. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html # noqa Examples -------- Reclassify works with NumPy backed xarray DataArray .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.classify import reclassify >>> data = np.array([ [np.nan, 1., 2., 3., 4.], [ 5., 6., 7., 8., 9.], [10., 11., 12., 13., 14.], [15., 16., 17., 18., np.inf], ]) >>> agg = xr.DataArray(data) >>> print(agg) <xarray.DataArray (dim_0: 4, dim_1: 5)> array([[nan, 1., 2., 3., 4.], [ 5., 6., 7., 8., 9.], [10., 11., 12., 13., 14.], [15., 16., 17., 18., inf]]) Dimensions without coordinates: dim_0, dim_1 >>> bins=[10, 15, np.inf] >>> new_values=[1, 2, 3] >>> agg_reclassify = reclassify(agg, bins=bins, new_values=new_values) >>> print(agg_reclassify) <xarray.DataArray 'reclassify' (dim_0: 4, dim_1: 5)> array([[nan, 1., 1., 1., 1.], [ 1., 2., 2., 2., 2.], [ 2., 2., 2., 2., 2.], [ 2., 3., 3., 3., 3.]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 Reclassify works with Dask with NumPy backed xarray DataArray .. sourcecode:: python >>> import dask.array as da >>> data_da = da.from_array(data, chunks=(3, 3)) >>> agg_da = xr.DataArray(data_da, name='agg_da') >>> print(agg_da) <xarray.DataArray 'agg_da' (dim_0: 4, dim_1: 5)> dask.array<array, shape=(4, 5), dtype=float32, chunksize=(3, 3), chunktype=numpy.ndarray> Dimensions without coordinates: dim_0, dim_1 >>> agg_reclassify_da = reclassify(agg_da, bins=bins, new_values=new_values) # noqa >>> print(agg_reclassify_da) <xarray.DataArray 'reclassify' (dim_0: 4, dim_1: 5)> dask.array<_run_numpy_bin, shape=(4, 5), dtype=float32, chunksize=(3, 3), chunktype=numpy.ndarray> Dimensions without coordinates: dim_0, dim_1 >>> print(agg_reclassify_da.compute()) # print the computed the results <xarray.DataArray 'reclassify' (dim_0: 4, dim_1: 5)> array([[nan, 1., 1., 1., 1.], [ 1., 2., 2., 2., 2.], [ 2., 2., 2., 2., 2.], [ 2., 3., 3., 3., 3.]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 Reclassify works with CuPy backed xarray DataArray. Make sure you have a GPU and CuPy installed to run this example. .. sourcecode:: python >>> import cupy >>> data_cupy = cupy.asarray(data) >>> agg_cupy = xr.DataArray(data_cupy) >>> agg_reclassify_cupy = reclassify(agg_cupy, bins, new_values) >>> print(type(agg_reclassify_cupy.data)) <class 'cupy.core.core.ndarray'> >>> print(agg_reclassify_cupy) <xarray.DataArray 'reclassify' (dim_0: 4, dim_1: 5)> array([[nan, 1., 1., 1., 1.], [ 1., 2., 2., 2., 2.], [ 2., 2., 2., 2., 2.], [ 2., 3., 3., 3., 3.]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 Reclassify works with Dask with CuPy backed xarray DataArray. """ _validate_raster(agg, func_name='reclassify', name='agg', ndim=None) if len(bins) != len(new_values): raise ValueError( 'bins and new_values mismatch. Should have same length.' ) out = _bin(agg, bins, new_values) return xr.DataArray(out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs)
def _run_quantile(data, num_sample, k, module): # num_sample ignored for in-memory backends w = 100.0 / k p = module.arange(w, 100 + w, w) if p[-1] > 100.0: p[-1] = 100.0 q = module.percentile(data[module.isfinite(data)], p) q = module.unique(q) return q def _run_dask_quantile(data, num_sample, k): # Avoid boolean fancy indexing (data[da.isfinite(data)]) which creates # unknown dask chunk sizes. Use sampling via indexed access to avoid # materialising the full array (#884). w = 100.0 / k p = np.arange(w, 100 + w, w) if p[-1] > 100.0: p[-1] = 100.0 clean = da.where(da.isinf(data), np.nan, data) num_data = data.size if num_sample is None or num_sample >= num_data: num_sample = num_data sample_idx = _generate_sample_indices(num_data, num_sample) values = np.asarray(clean.ravel()[sample_idx].compute()) values = values[np.isfinite(values)] q = np.percentile(values, p) q = np.unique(q) return q def _run_dask_cupy_quantile(data, num_sample, k): # Convert dask+cupy chunks to numpy, then same safe path as dask (#884). data_cpu = data.map_blocks(cupy.asnumpy, dtype=data.dtype, meta=np.array(())) return _run_dask_quantile(data_cpu, num_sample, k) def _quantile(agg, num_sample, k): mapper = ArrayTypeFunctionMapping( numpy_func=lambda *args: _run_quantile(*args, module=np), dask_func=_run_dask_quantile, cupy_func=lambda *args: _run_quantile(*args, module=cupy), dask_cupy_func=_run_dask_cupy_quantile ) out = mapper(agg)(agg.data, num_sample, k) return out
[docs] @supports_dataset def quantile(agg: xr.DataArray, k: int = 4, num_sample: Optional[int] = 20_000, name: Optional[str] = 'quantile') -> xr.DataArray: """ Reclassifies data for array `agg` into new values based on quantile groups of equal size. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or Cupy-backed Dask array of values to be reclassified. k : int, default=4 Number of quantiles to be produced. num_sample : int or None, default=20000 Number of sample data points used to compute percentile breakpoints. For dask-backed arrays the sample is drawn lazily to avoid materialising the entire array into RAM. ``None`` means use all data (safe for numpy/cupy, automatically capped for dask). name : str, default='quantile' Name of the output aggregate array. Returns ------- quantile_agg : xr.DataArray or xr.Dataset 2D aggregate array, of quantile allocations. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. Notes ----- - Dask's percentile algorithm is approximate, while numpy's is exact. - This may cause some differences between results of vanilla numpy and dask version of the input agg. (https://github.com/dask/dask/issues/3099) # noqa References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html#Quantiles # noqa Examples -------- Quantile work with numpy backed xarray DataArray .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.classify import quantile >>> elevation = np.array([ [np.nan, 1., 2., 3., 4.], [ 5., 6., 7., 8., 9.], [10., 11., 12., 13., 14.], [15., 16., 17., 18., 19.], [20., 21., 22., 23., np.inf] ]) >>> agg_numpy = xr.DataArray(elevation, attrs={'res': (10.0, 10.0)}) >>> numpy_quantile = quantile(agg_numpy, k=5) >>> print(numpy_quantile) <xarray.DataArray 'quantile' (dim_0: 5, dim_1: 5)> array([[nan, 0., 0., 0., 0.], [ 0., 1., 1., 1., 1.], [ 2., 2., 2., 2., 2.], [ 3., 3., 3., 3., 4.], [ 4., 4., 4., 4., nan]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 Attributes: res: (10.0, 10.0) """ _validate_raster(agg, func_name='quantile', name='agg', ndim=None) _validate_scalar(k, func_name='quantile', name='k', dtype=int, min_val=2) q = _quantile(agg, num_sample, k) k_q = q.shape[0] if k_q < k: print("Quantile Warning: Not enough unique values" "for k classes (using {} bins)".format(k_q)) k = k_q out = _bin(agg, bins=q, new_values=np.arange(k)) return xr.DataArray(out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs)
@nb.jit(nopython=True) def _run_numpy_jenks_matrices(data, n_classes): n_data = data.shape[0] lower_class_limits = np.zeros( (n_data + 1, n_classes + 1), dtype=np.float64 ) lower_class_limits[1, 1:n_classes + 1] = 1.0 var_combinations = np.zeros( (n_data + 1, n_classes + 1), dtype=np.float64 ) var_combinations[2:n_data + 1, 1:n_classes + 1] = np.inf variance = 0.0 for l in range(2, n_data + 1): # noqa sum = 0.0 sum_squares = 0.0 w = 0.0 for m in range(l): # `III` originally lower_class_limit = l - m i4 = lower_class_limit - 1 val = data[i4] # here we're estimating variance for each potential classing # of the data, for each potential number of classes. `w` # is the number of data points considered so far. w += 1.0 # increase the current sum and sum-of-squares sum += val sum_squares += val * val # the variance at this point in the sequence is the difference # between the sum of squares and the total x 2, over the number # of samples. variance = sum_squares - (sum * sum) / w if i4 == 0: continue for j in range(2, n_classes + 1): # if adding this element to an existing class # will increase its variance beyond the limit, break # the class at this point, setting the lower_class_limit # at this point. new_variance = variance + var_combinations[i4, j-1] if var_combinations[l, j] >= new_variance: lower_class_limits[l, j] = lower_class_limit var_combinations[l, j] = new_variance lower_class_limits[l, 1] = 1. var_combinations[l, 1] = variance return lower_class_limits, var_combinations def _run_jenks(data, n_classes): # ported from existing cython implementation: # https://github.com/perrygeo/jenks/blob/master/jenks.pyx data.sort() lower_class_limits, _ = _run_numpy_jenks_matrices(data, n_classes) k = data.shape[0] kclass = np.zeros(n_classes + 1, dtype=np.float64) kclass[0] = data[0] kclass[-1] = data[-1] count_num = n_classes while count_num > 1: elt = int(lower_class_limits[k][count_num] - 2) kclass[count_num - 1] = data[elt] k = int(lower_class_limits[k][count_num] - 1) count_num -= 1 return kclass def _compute_natural_break_bins(data_flat_np, num_sample, k, max_data): """Shared helper: compute natural break bins from a flat numpy array. Returns (bins, uvk) where bins is a numpy array of bin edges and uvk is the number of unique values. """ num_data = data_flat_np.size if num_sample is not None and num_sample < num_data: # randomly select sample from the whole dataset # create a pseudo random number generator # Note: cupy and numpy generate different random numbers # use numpy.random to ensure the same result generator = np.random.RandomState(1234567890) idx = np.linspace( 0, num_data, num_data, endpoint=False, dtype=np.uint32 ) generator.shuffle(idx) sample_idx = idx[:num_sample] sample_data = data_flat_np[sample_idx] else: sample_data = data_flat_np # warning if number of total data points to fit the model bigger than 40k if sample_data.size >= 40000: with warnings.catch_warnings(): warnings.simplefilter('default') warnings.warn('natural_breaks Warning: Natural break ' 'classification (Jenks) has a complexity of O(n^2), ' 'your classification with {} data points may take ' 'a long time.'.format(sample_data.size), Warning) sample_data = np.asarray(sample_data) # only include finite values sample_data = sample_data[np.isfinite(sample_data)] # _run_jenks allocates two float64 matrices of shape # (n_data + 1, n_classes + 1). Reject inputs that would exceed half of # available memory so a num_sample=None call on a large raster fails # fast instead of OOMing the process. jenks_bytes = 2 * (sample_data.size + 1) * (k + 1) * 8 avail = _available_memory_bytes() if jenks_bytes > 0.5 * avail: raise MemoryError( 'natural_breaks would allocate ~{:.1f} GB for the Jenks ' 'matrices ({} data points, k={}), exceeding the memory budget ' 'of {:.1f} GB. Pass a smaller num_sample to subsample the ' 'input.'.format( jenks_bytes / 1024 ** 3, sample_data.size, k, 0.5 * avail / 1024 ** 3, ) ) uv = np.unique(sample_data) uvk = len(uv) if uvk < k: with warnings.catch_warnings(): warnings.simplefilter('default') warnings.warn('natural_breaks Warning: Not enough unique values ' 'in data array for {} classes. ' 'n_samples={} should be >= n_clusters={}. ' 'Using k={} instead.'.format(k, uvk, k, uvk), Warning) uv.sort() bins = uv else: centroids = _run_jenks(sample_data, k) bins = np.array(centroids[1:]) bins[-1] = max_data return bins, uvk def _run_natural_break(agg, num_sample, k): data = agg.data max_data = float(np.max(data[np.isfinite(data)])) data_flat_np = data.flatten() bins, uvk = _compute_natural_break_bins(data_flat_np, num_sample, k, max_data) out = _bin(agg, bins, np.arange(uvk)) return out def _run_cupy_natural_break(agg, num_sample, k): data = agg.data max_data = float(cupy.max(data[cupy.isfinite(data)]).get()) data_flat_np = cupy.asnumpy(data.ravel()) bins, uvk = _compute_natural_break_bins(data_flat_np, num_sample, k, max_data) out = _bin(agg, bins, np.arange(uvk)) return out def _generate_sample_indices(num_data, num_sample, seed=1234567890): """Generate sorted sample indices for natural breaks sampling. For small datasets (<=10M elements), uses the same linspace+shuffle approach as the numpy backend for exact reproducibility. For large datasets, uses memory-efficient RandomState.choice() which is O(num_sample) rather than O(num_data). """ generator = np.random.RandomState(seed) if num_data <= 10_000_000: idx = np.linspace( 0, num_data, num_data, endpoint=False, dtype=np.uint32 ) generator.shuffle(idx) sample_idx = idx[:num_sample] else: sample_idx = generator.choice( num_data, size=num_sample, replace=False ) # sort for efficient dask chunk access sample_idx.sort() return sample_idx def _run_dask_natural_break(agg, num_sample, k): data = agg.data # Avoid boolean fancy indexing which flattens dask arrays and # produces chunks of unknown size; use element-wise where instead max_data = float(da.nanmax(da.where(da.isinf(data), np.nan, data)).compute()) num_data = data.size if num_sample is None or num_sample >= num_data: num_sample = num_data # cap: still uses indexed access, never .compute() all sample_idx = _generate_sample_indices(num_data, num_sample) sample_data_np = np.asarray(data.ravel()[sample_idx].compute()) bins, uvk = _compute_natural_break_bins(sample_data_np, None, k, max_data) out = _bin(agg, bins, np.arange(uvk)) return out def _run_dask_cupy_natural_break(agg, num_sample, k): data = agg.data # Avoid boolean fancy indexing which flattens dask arrays and # produces chunks of unknown size; use element-wise where instead max_data = float(da.nanmax(da.where(da.isinf(data), np.nan, data)).compute().item()) num_data = data.size if num_sample is None or num_sample >= num_data: num_sample = num_data # cap: still uses indexed access, never .compute() all sample_idx = _generate_sample_indices(num_data, num_sample) sample_data = data.ravel()[sample_idx].compute() sample_data_np = cupy.asnumpy(sample_data) bins, uvk = _compute_natural_break_bins(sample_data_np, None, k, max_data) out = _bin(agg, bins, np.arange(uvk)) return out
[docs] @supports_dataset def natural_breaks(agg: xr.DataArray, num_sample: Optional[int] = 20000, name: Optional[str] = 'natural_breaks', k: int = 5) -> xr.DataArray: """ Reclassifies data for array `agg` into new values based on Natural Breaks or K-Means clustering method. Values are grouped so that similar values are placed in the same group and space between groups is maximized. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask array of values to be reclassified. num_sample : int, default=20000 Number of sample data points used to fit the model. Natural Breaks (Jenks) classification is indeed O(n²) complexity, where n is the total number of data points, i.e: `agg.size` When n is large, we should fit the model on a small sub-sample of the data instead of using the whole dataset. k : int, default=5 Number of classes to be produced. name : str, default='natural_breaks' Name of output aggregate. Returns ------- natural_breaks_agg : xr.DataArray or xr.Dataset 2D aggregate array of natural break allocations. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html#NaturalBreaks # noqa - jenks: https://github.com/perrygeo/jenks/blob/master/jenks.pyx Examples ------- natural_breaks() works with numpy backed xarray DataArray. .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.classify import natural_breaks >>> elevation = np.array([ [np.nan, 1., 2., 3., 4.], [ 5., 6., 7., 8., 9.], [10., 11., 12., 13., 14.], [15., 16., 17., 18., 19.], [20., 21., 22., 23., np.inf] ]) >>> agg_numpy = xr.DataArray(elevation, attrs={'res': (10.0, 10.0)}) >>> numpy_natural_breaks = natural_breaks(agg_numpy, k=5) >>> print(numpy_natural_breaks) <xarray.DataArray 'natural_breaks' (dim_0: 5, dim_1: 5)> array([[nan, 0., 0., 0., 0.], [ 1., 1., 1., 1., 2.], [ 2., 2., 2., 2., 3.], [ 3., 3., 3., 3., 4.], [ 4., 4., 4., 4., nan]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 Attributes: res: (10.0, 10.0) natural_breaks() works with cupy backed xarray DataArray. .. sourcecode:: python >>> import cupy >>> agg_cupy = xr.DataArray(cupy.asarray(elevation)) >>> cupy_natural_breaks = natural_breaks(agg_cupy) >>> print(type(cupy_natural_breaks)) <class 'xarray.core.dataarray.DataArray'> >>> print(cupy_natural_breaks) <xarray.DataArray 'natural_breaks' (dim_0: 5, dim_1: 5)> array([[nan, 0., 0., 0., 0.], [ 1., 1., 1., 1., 2.], [ 2., 2., 2., 2., 3.], [ 3., 3., 3., 3., 4.], [ 4., 4., 4., 4., nan]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 """ _validate_raster(agg, func_name='natural_breaks', name='agg', ndim=None) _validate_scalar(k, func_name='natural_breaks', name='k', dtype=int, min_val=2) mapper = ArrayTypeFunctionMapping( numpy_func=_run_natural_break, dask_func=_run_dask_natural_break, cupy_func=_run_cupy_natural_break, dask_cupy_func=_run_dask_cupy_natural_break, ) out = mapper(agg)(agg, num_sample, k) return xr.DataArray(out, name=name, coords=agg.coords, dims=agg.dims, attrs=agg.attrs)
def _run_equal_interval(agg, k, module): data = agg.data # Replace ±inf with nan in a single pass (no ravel needed) data_clean = module.where(module.isinf(data), np.nan, data) with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'All-NaN slice', RuntimeWarning) min_lazy = module.nanmin(data_clean) max_lazy = module.nanmax(data_clean) if module == cupy: min_data = float(min_lazy.get()) max_data = float(max_lazy.get()) elif module == da: # Compute both in a single pass over the data min_data, max_data = dask.compute(min_lazy, max_lazy) min_data = float(min_data) max_data = float(max_data) else: min_data = float(min_lazy) max_data = float(max_lazy) # Degenerate data: all-NaN/inf, or every finite pixel has the same value. # Collapse to a single bin so finite pixels map to class 0 and any # non-finite pixels remain NaN (handled by _cpu_bin's isfinite check). if not np.isfinite(max_data - min_data) or max_data == min_data: cut_value = max_data if np.isfinite(max_data) else 0.0 cuts = np.array([cut_value], dtype=np.float64) out = _bin(agg, cuts, np.arange(1)) return out width = (max_data - min_data) / k # Build cuts as numpy — only k elements, no need for dask/cupy overhead cuts = np.arange(min_data + width, max_data + width, width) l_cuts = cuts.shape[0] if l_cuts > k: cuts = cuts[0:k] cuts[-1] = max_data out = _bin(agg, cuts, np.arange(l_cuts)) return out
[docs] @supports_dataset def equal_interval(agg: xr.DataArray, k: int = 5, name: Optional[str] = 'equal_interval') -> xr.DataArray: """ Reclassifies data for array `agg` into new values based on intervals of equal width. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or Cupy-backed Dask array of values to be reclassified. k : int, default=5 Number of classes to be produced. name : str, default='equal_interval' Name of output aggregate. Returns ------- equal_interval_agg : xr.DataArray or xr.Dataset 2D aggregate array of equal interval allocations. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html#EqualInterval # noqa - scikit-learn: https://scikit-learn.org/stable/auto_examples/classification/plot_classifier_comparison.html#sphx-glr-auto-examples-classification-plot-classifier-comparison-py # noqa Examples -------- .. sourcecode:: python >>> import numpy as np >>> import xarray as xr >>> from xrspatial.classify import equal_interval >>> elevation = np.array([ [np.nan, 1., 2., 3., 4.], [ 5., 6., 7., 8., 9.], [10., 11., 12., 13., 14.], [15., 16., 17., 18., 19.], [20., 21., 22., 23., np.inf] ]) >>> agg_numpy = xr.DataArray(elevation, attrs={'res': (10.0, 10.0)}) >>> numpy_equal_interval = equal_interval(agg_numpy, k=5) >>> print(numpy_equal_interval) <xarray.DataArray 'equal_interval' (dim_0: 5, dim_1: 5)> array([[nan, 0., 0., 0., 0.], [ 0., 0., 0., 0., 1.], [ 1., 1., 1., 1., 1.], [ 1., 2., 2., 2., 2.], [ 2., 2., 2., 2., nan]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 Attributes: res: (10.0, 10.0) """ _validate_raster(agg, func_name='equal_interval', name='agg', ndim=None) _validate_scalar(k, func_name='equal_interval', name='k', dtype=int, min_val=1) mapper = ArrayTypeFunctionMapping( numpy_func=lambda *args: _run_equal_interval(*args, module=np), dask_func=lambda *args: _run_equal_interval(*args, module=da), cupy_func=lambda *args: _run_equal_interval(*args, module=cupy), dask_cupy_func=lambda *args: _run_equal_interval(*args, module=da) ) out = mapper(agg)(agg, k) return xr.DataArray(out, name=name, coords=agg.coords, dims=agg.dims, attrs=agg.attrs)
def _run_std_mean(agg, module): data = agg.data data_clean = module.where(module.isinf(data), np.nan, data) mean_lazy = module.nanmean(data_clean) std_lazy = module.nanstd(data_clean) max_lazy = module.nanmax(data_clean) if module == cupy: mean_v = float(mean_lazy.get()) std_v = float(std_lazy.get()) max_v = float(max_lazy.get()) elif module == da: mean_v, std_v, max_v = dask.compute(mean_lazy, std_lazy, max_lazy) mean_v, std_v, max_v = float(mean_v), float(std_v), float(max_v) else: mean_v, std_v, max_v = float(mean_lazy), float(std_lazy), float(max_lazy) bins = np.sort(np.unique([ mean_v - 2 * std_v, mean_v - std_v, mean_v + std_v, mean_v + 2 * std_v, max_v, ])) out = _bin(agg, bins, np.arange(len(bins))) return out
[docs] @supports_dataset def std_mean(agg: xr.DataArray, name: Optional[str] = 'std_mean') -> xr.DataArray: """ Classify data based on standard deviations from the mean. Creates bins at mean +/- 1 and 2 standard deviations. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask array of values to be classified. name : str, default='std_mean' Name of output aggregate array. Returns ------- std_mean_agg : xr.DataArray or xr.Dataset 2D aggregate array of standard deviation classifications. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html#StdMean """ _validate_raster(agg, func_name='std_mean', name='agg', ndim=None) mapper = ArrayTypeFunctionMapping( numpy_func=lambda *args: _run_std_mean(*args, module=np), dask_func=lambda *args: _run_std_mean(*args, module=da), cupy_func=lambda *args: _run_std_mean(*args, module=cupy), dask_cupy_func=lambda *args: _run_std_mean(*args, module=da), ) out = mapper(agg)(agg) return xr.DataArray(out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs)
def _compute_head_tail_bins(values_np): """Compute head/tail break bins from flat finite numpy values.""" bins = [] data = values_np.copy() while len(data) > 1: mean_v = float(np.nanmean(data)) bins.append(mean_v) head = data[data > mean_v] if len(head) == 0 or len(head) / len(data) > 0.40: break data = head if not bins: bins = [float(np.nanmean(values_np))] bins.append(float(np.nanmax(values_np))) return np.array(bins) def _run_head_tail_breaks(agg, module): data = agg.data if module == cupy: finite = data[cupy.isfinite(data)] values_np = cupy.asnumpy(finite) else: finite = data[np.isfinite(data)] values_np = np.asarray(finite) bins = _compute_head_tail_bins(values_np) out = _bin(agg, bins, np.arange(len(bins))) return out def _run_dask_head_tail_breaks(agg): data = agg.data # Persist once so the iterative loop does not re-read from storage on # every scan. Fuse the three reductions per iteration into a single # dask.compute() to cut graph traversals from 3N+1 to N+1. data_clean = da.where(da.isinf(data), np.nan, data).persist() bins = [] mask = da.isfinite(data_clean) total_count_lazy = mask.sum() total_count = int(total_count_lazy.compute()) if total_count == 0: max_v = float(da.nanmax(data_clean).compute()) return _bin(agg, np.array([max_v]), np.arange(1)) current_total = total_count while True: current = da.where(mask, data_clean, np.nan) new_mask = mask & (data_clean > da.nanmean(current)) # Fuse mean and head-count into one graph evaluation. mean_v, head_count = dask.compute(da.nanmean(current), new_mask.sum()) mean_v = float(mean_v) head_count = int(head_count) bins.append(mean_v) if head_count == 0 or head_count / current_total > 0.40: break mask = new_mask current_total = head_count max_v = float(da.nanmax(data_clean).compute()) bins.append(max_v) bins = np.array(bins) out = _bin(agg, bins, np.arange(len(bins))) return out
[docs] @supports_dataset def head_tail_breaks(agg: xr.DataArray, name: Optional[str] = 'head_tail_breaks') -> xr.DataArray: """ Classify data using the Head/Tail Breaks algorithm. Iteratively partitions data around the mean. Values below the mean form a class, and values above continue to be split until the head proportion exceeds 40%. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask array of values to be classified. name : str, default='head_tail_breaks' Name of output aggregate array. Returns ------- head_tail_agg : xr.DataArray or xr.Dataset 2D aggregate array of head/tail break classifications. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html#HeadTailBreaks """ _validate_raster(agg, func_name='head_tail_breaks', name='agg', ndim=None) mapper = ArrayTypeFunctionMapping( numpy_func=lambda *args: _run_head_tail_breaks(*args, module=np), dask_func=_run_dask_head_tail_breaks, cupy_func=lambda *args: _run_head_tail_breaks(*args, module=cupy), dask_cupy_func=_run_dask_head_tail_breaks, ) out = mapper(agg)(agg) return xr.DataArray(out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs)
def _run_percentiles(data, num_sample, pct, module): # num_sample ignored for in-memory backends q = module.percentile(data[module.isfinite(data)], pct) q = module.unique(q) return q def _run_dask_percentiles(data, num_sample, pct): # Avoid boolean fancy indexing (data[da.isfinite(data)]) which creates # unknown dask chunk sizes. Use sampling via indexed access to avoid # materialising the full array (#884). clean = da.where(da.isinf(data), np.nan, data) num_data = data.size if num_sample is None or num_sample >= num_data: num_sample = num_data sample_idx = _generate_sample_indices(num_data, num_sample) values = np.asarray(clean.ravel()[sample_idx].compute()) values = values[np.isfinite(values)] q = np.percentile(values, pct) q = np.unique(q) return q def _run_dask_cupy_percentiles(data, num_sample, pct): # Convert dask+cupy chunks to numpy, then same safe path as dask (#884). data_cpu = data.map_blocks(cupy.asnumpy, dtype=data.dtype, meta=np.array(())) return _run_dask_percentiles(data_cpu, num_sample, pct)
[docs] @supports_dataset def percentiles(agg: xr.DataArray, pct: Optional[List] = None, num_sample: Optional[int] = 20_000, name: Optional[str] = 'percentiles') -> xr.DataArray: """ Classify data based on percentile breakpoints. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask array of values to be classified. pct : list of float, default=[1, 10, 50, 90, 99] Percentile values to use as breakpoints. num_sample : int or None, default=20000 Number of sample data points used to compute percentile breakpoints. For dask-backed arrays the sample is drawn lazily to avoid materialising the entire array into RAM. ``None`` means use all data (safe for numpy/cupy, automatically capped for dask). name : str, default='percentiles' Name of output aggregate array. Returns ------- percentiles_agg : xr.DataArray or xr.Dataset 2D aggregate array of percentile classifications. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html#Percentiles """ _validate_raster(agg, func_name='percentiles', name='agg', ndim=None) if pct is None: pct = [1, 10, 50, 90, 99] mapper = ArrayTypeFunctionMapping( numpy_func=lambda *args: _run_percentiles(*args, module=np), dask_func=_run_dask_percentiles, cupy_func=lambda *args: _run_percentiles(*args, module=cupy), dask_cupy_func=_run_dask_cupy_percentiles, ) q = mapper(agg)(agg.data, num_sample, pct) # Materialize bin edges to numpy if hasattr(q, 'compute'): q_np = np.asarray(q.compute()) elif hasattr(q, 'get'): q_np = np.asarray(q.get()) else: q_np = np.asarray(q) # Append max of finite data so all values get classified data = agg.data if hasattr(data, 'compute'): max_v = float(da.nanmax(da.where(da.isinf(data), np.nan, data)).compute()) elif hasattr(data, 'get'): clean = cupy.where(cupy.isinf(data), cupy.nan, data) max_v = float(cupy.nanmax(clean).get()) else: clean = np.where(np.isinf(data), np.nan, data) max_v = float(np.nanmax(clean)) bins = np.sort(np.unique(np.append(q_np, max_v))) k = len(bins) out = _bin(agg, bins, np.arange(k)) return xr.DataArray(out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs)
def _compute_maximum_break_bins(values_np, k): """Compute maximum breaks bins from sorted finite numpy values.""" uv = np.unique(values_np) if len(uv) < k: return uv diffs = np.diff(uv) n_gaps = min(k - 1, len(diffs)) top_indices = np.argsort(diffs, kind='stable')[-n_gaps:] top_indices.sort() bins = np.array([(uv[i] + uv[i + 1]) / 2.0 for i in top_indices]) bins = np.append(bins, float(uv[-1])) return bins def _run_maximum_breaks(agg, num_sample, k, module): # num_sample ignored for in-memory backends data = agg.data if module == cupy: finite = data[cupy.isfinite(data)] values_np = cupy.asnumpy(finite) else: finite = data[np.isfinite(data)] values_np = np.asarray(finite) bins = _compute_maximum_break_bins(values_np, k) out = _bin(agg, bins, np.arange(len(bins))) return out def _run_dask_maximum_breaks(agg, num_sample, k): data = agg.data data_clean = da.where(da.isinf(data), np.nan, data) num_data = data.size if num_sample is None or num_sample >= num_data: num_sample = num_data sample_idx = _generate_sample_indices(num_data, num_sample) values_np = np.asarray(data_clean.ravel()[sample_idx].compute()) values_np = values_np[np.isfinite(values_np)] bins = _compute_maximum_break_bins(values_np, k) out = _bin(agg, bins, np.arange(len(bins))) return out def _run_dask_cupy_maximum_breaks(agg, num_sample, k): data = agg.data data_clean = da.where(da.isinf(data), np.nan, data) data_cpu = data_clean.map_blocks(cupy.asnumpy, dtype=data.dtype, meta=np.array(())) num_data = data.size if num_sample is None or num_sample >= num_data: num_sample = num_data sample_idx = _generate_sample_indices(num_data, num_sample) values_np = np.asarray(data_cpu.ravel()[sample_idx].compute()) values_np = values_np[np.isfinite(values_np)] bins = _compute_maximum_break_bins(values_np, k) out = _bin(agg, bins, np.arange(len(bins))) return out
[docs] @supports_dataset def maximum_breaks(agg: xr.DataArray, k: int = 5, num_sample: Optional[int] = 20_000, name: Optional[str] = 'maximum_breaks') -> xr.DataArray: """ Classify data using the Maximum Breaks algorithm. Finds the k-1 largest gaps between sorted unique values and uses midpoints of those gaps as bin edges. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask array of values to be classified. k : int, default=5 Number of classes to be produced. num_sample : int or None, default=20000 Number of sample data points used to fit the model. For dask-backed arrays the sample is drawn lazily to avoid materialising the entire array into RAM. ``None`` means use all data (safe for numpy/cupy, automatically capped for dask). name : str, default='maximum_breaks' Name of output aggregate array. Returns ------- max_breaks_agg : xr.DataArray or xr.Dataset 2D aggregate array of maximum break classifications. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html#MaximumBreaks """ _validate_raster(agg, func_name='maximum_breaks', name='agg', ndim=None) _validate_scalar(k, func_name='maximum_breaks', name='k', dtype=int, min_val=2) mapper = ArrayTypeFunctionMapping( numpy_func=lambda *args: _run_maximum_breaks(*args, module=np), dask_func=_run_dask_maximum_breaks, cupy_func=lambda *args: _run_maximum_breaks(*args, module=cupy), dask_cupy_func=_run_dask_cupy_maximum_breaks, ) out = mapper(agg)(agg, num_sample, k) return xr.DataArray(out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs)
_BOX_PLOT_DEFAULT_SAMPLE = 200_000 def _box_plot_bins_from_sample(finite_np, hinge, max_v): q1 = float(np.percentile(finite_np, 25)) q2 = float(np.percentile(finite_np, 50)) q3 = float(np.percentile(finite_np, 75)) iqr = q3 - q1 raw_bins = [q1 - hinge * iqr, q1, q2, q3, q3 + hinge * iqr, max_v] bins = np.sort(np.unique(raw_bins)) bins = bins[bins <= max_v] if bins[-1] < max_v: bins = np.append(bins, max_v) return bins def _run_box_plot(agg, hinge, module): data = agg.data data_clean = module.where(module.isinf(data), np.nan, data) finite_data = data_clean[module.isfinite(data_clean)] if module == cupy: q1 = float(cupy.percentile(finite_data, 25).get()) q2 = float(cupy.percentile(finite_data, 50).get()) q3 = float(cupy.percentile(finite_data, 75).get()) max_v = float(cupy.nanmax(finite_data).get()) iqr = q3 - q1 raw_bins = [q1 - hinge * iqr, q1, q2, q3, q3 + hinge * iqr, max_v] bins = np.sort(np.unique(raw_bins)) bins = bins[bins <= max_v] if bins[-1] < max_v: bins = np.append(bins, max_v) else: # numpy finite_np = np.asarray(finite_data) max_v = float(np.nanmax(finite_np)) bins = _box_plot_bins_from_sample(finite_np, hinge, max_v) out = _bin(agg, bins, np.arange(len(bins))) return out def _run_dask_box_plot(agg, hinge): """Dask+numpy box_plot. Avoids boolean fancy indexing on a dask array (which produces unknown chunk sizes and forces a chunk-size compute pass). Samples the data via the same seeded index sampler used by natural_breaks, then computes percentiles on the finite portion of the sample in numpy. """ data = agg.data data_clean = da.where(da.isinf(data), np.nan, data) num_data = data_clean.size num_sample = min(_BOX_PLOT_DEFAULT_SAMPLE, num_data) sample_idx = _generate_sample_indices(num_data, num_sample) sample_lazy = data_clean.ravel()[sample_idx] max_lazy = da.nanmax(data_clean) sample_np, max_v = dask.compute(sample_lazy, max_lazy) sample_np = np.asarray(sample_np) finite_np = sample_np[np.isfinite(sample_np)] max_v = float(max_v) bins = _box_plot_bins_from_sample(finite_np, hinge, max_v) return _bin(agg, bins, np.arange(len(bins))) def _run_dask_cupy_box_plot(agg, hinge): """Dask+cupy box_plot: sample on-device, transfer only the sample.""" data = agg.data data_clean = da.where(da.isinf(data), np.nan, data) num_data = data_clean.size num_sample = min(_BOX_PLOT_DEFAULT_SAMPLE, num_data) sample_idx = _generate_sample_indices(num_data, num_sample) sample_lazy = data_clean.ravel()[sample_idx] max_lazy = da.nanmax(data_clean) sample_cp, max_v = dask.compute(sample_lazy, max_lazy) sample_np = cupy.asnumpy(sample_cp) finite_np = sample_np[np.isfinite(sample_np)] max_v = float(cupy.asnumpy(max_v).item()) if hasattr(max_v, 'get') else float(max_v) bins = _box_plot_bins_from_sample(finite_np, hinge, max_v) return _bin(agg, bins, np.arange(len(bins)))
[docs] @supports_dataset def box_plot(agg: xr.DataArray, hinge: float = 1.5, name: Optional[str] = 'box_plot') -> xr.DataArray: """ Classify data using box plot breakpoints. Uses Q1, median (Q2), Q3, and the whiskers (Q1 - hinge*IQR, Q3 + hinge*IQR) as class boundaries. Parameters ---------- agg : xr.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask array of values to be classified. hinge : float, default=1.5 Multiplier for the IQR to determine whisker extent. name : str, default='box_plot' Name of output aggregate array. Returns ------- box_plot_agg : xr.DataArray or xr.Dataset 2D aggregate array of box plot classifications. All other input attributes are preserved. If `agg` is a Dataset, returns a Dataset with each variable classified independently. References ---------- - PySAL: https://pysal.org/mapclassify/_modules/mapclassify/classifiers.html#BoxPlot """ _validate_raster(agg, func_name='box_plot', name='agg', ndim=None) mapper = ArrayTypeFunctionMapping( numpy_func=lambda *args: _run_box_plot(*args, module=np), dask_func=_run_dask_box_plot, cupy_func=lambda *args: _run_box_plot(*args, module=cupy), dask_cupy_func=_run_dask_cupy_box_plot, ) out = mapper(agg)(agg, hinge) return xr.DataArray(out, name=name, dims=agg.dims, coords=agg.coords, attrs=agg.attrs)