Source code for xrspatial.curvature

from __future__ import annotations

# std lib
from functools import partial
from typing import Optional, Union

# 3rd-party
try:
    import cupy
except ImportError:
    class cupy(object):
        ndarray = False

try:
    import dask.array as da
except ImportError:
    da = None

import numpy as np
import xarray as xr
from numba import cuda

# local modules
from xrspatial.utils import ArrayTypeFunctionMapping
from xrspatial.utils import _boundary_to_dask
from xrspatial.utils import _pad_array
from xrspatial.utils import _validate_boundary
from xrspatial.utils import _validate_raster
from xrspatial.utils import cuda_args
from xrspatial.utils import get_dataarray_resolution
from xrspatial.utils import ngjit
from xrspatial.dataset_support import supports_dataset


@ngjit
def _cpu(data, cellsize):
    out = np.empty(data.shape, np.float32)
    out[:] = np.nan
    rows, cols = data.shape
    for y in range(1, rows - 1):
        for x in range(1, cols - 1):
            d = (data[y + 1, x] + data[y - 1, x]) / 2 - data[y, x]
            e = (data[y, x + 1] + data[y, x - 1]) / 2 - data[y, x]
            out[y, x] = -2 * (d + e) * 100 / (cellsize * cellsize)
    return out


def _run_numpy(data: np.ndarray,
               cellsize: Union[int, float],
               boundary: str = 'nan') -> np.ndarray:
    data = data.astype(np.float32)
    if boundary == 'nan':
        return _cpu(data, cellsize)
    padded = _pad_array(data, 1, boundary)
    result = _cpu(padded, cellsize)
    return result[1:-1, 1:-1]


def _run_dask_numpy(data: da.Array,
                    cellsize: Union[int, float],
                    boundary: str = 'nan') -> da.Array:
    data = data.astype(np.float32)
    _func = partial(_cpu, cellsize=cellsize)
    out = data.map_overlap(_func,
                           depth=(1, 1),
                           boundary=_boundary_to_dask(boundary),
                           meta=np.array(()))
    return out


@cuda.jit(device=True)
def _gpu(arr, cellsize):
    d = (arr[1, 0] + arr[1, 2]) / 2 - arr[1, 1]
    e = (arr[0, 1] + arr[2, 1]) / 2 - arr[1, 1]
    curv = -2 * (d + e) * 100 / (cellsize[0] * cellsize[0])
    return curv


@cuda.jit
def _run_gpu(arr, cellsize, out):
    i, j = cuda.grid(2)
    di = 1
    dj = 1
    if (i - di >= 0 and i + di <= out.shape[0] - 1 and
            j - dj >= 0 and j + dj <= out.shape[1] - 1):
        out[i, j] = _gpu(arr[i - di:i + di + 1, j - dj:j + dj + 1], cellsize)


def _run_cupy(data: cupy.ndarray,
              cellsize: Union[int, float],
              boundary: str = 'nan') -> cupy.ndarray:
    if boundary != 'nan':
        padded = _pad_array(data, 1, boundary)
        result = _run_cupy(padded, cellsize)
        return result[1:-1, 1:-1]

    data = data.astype(cupy.float32)
    cellsize_arr = cupy.array([float(cupy.asnumpy(cellsize).item())], dtype='f4')

    griddim, blockdim = cuda_args(data.shape)
    out = cupy.empty(data.shape, dtype='f4')
    out[:] = cupy.nan

    _run_gpu[griddim, blockdim](data, cellsize_arr, out)

    return out


def _run_dask_cupy(data: da.Array,
                   cellsize: Union[int, float],
                   boundary: str = 'nan') -> da.Array:
    data = data.astype(cupy.float32)
    cellsize_arr = cupy.array([float(cupy.asnumpy(cellsize).item())], dtype='f4')

    _func = partial(_run_cupy, cellsize=cellsize_arr)

    out = data.map_overlap(_func,
                           depth=(1, 1),
                           boundary=_boundary_to_dask(boundary, is_cupy=True),
                           meta=cupy.array(()))
    return out


[docs] @supports_dataset def curvature(agg: xr.DataArray, name: Optional[str] = 'curvature', boundary: str = 'nan') -> xr.DataArray: """ Calculates, for all cells in the array, the curvature (second derivative) of each cell based on the elevation of its neighbors in a 3x3 grid. A positive curvature indicates the surface is upwardly convex. A negative value indicates it is upwardly concave. A value of 0 indicates a flat surface. Units of the curvature output raster are one hundredth (1/100) of a z-unit. Parameters ---------- agg : xarray.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask xarray DataArray of elevation values. Must contain `res` attribute. If a Dataset is passed, the operation is applied to each data variable independently. name : str, default='curvature' Name of output DataArray. boundary : str, default='nan' How to handle edges where the kernel extends beyond the raster. ``'nan'`` — fill missing neighbours with NaN (default). ``'nearest'`` — repeat edge values. ``'reflect'`` — mirror at boundary. ``'wrap'`` — periodic / toroidal. Returns ------- curvature_agg : xarray.DataArray or xr.Dataset If `agg` is a DataArray, returns a DataArray of the same type. If `agg` is a Dataset, returns a Dataset with curvature computed for each data variable. 2D aggregate array of curvature values. All other input attributes are preserved. References ---------- - arcgis: https://pro.arcgis.com/en/pro-app/latest/tool-reference/spatial-analyst/how-curvature-works.htm # noqa Examples -------- Curvature works with NumPy backed xarray DataArray .. sourcecode:: python >>> import numpy as np >>> import dask.array as da >>> import xarray as xr >>> from xrspatial import curvature >>> flat_data = np.zeros((5, 5), dtype=np.float32) >>> flat_raster = xr.DataArray(flat_data, attrs={'res': (1, 1)}) >>> flat_curv = curvature(flat_raster) >>> print(flat_curv) <xarray.DataArray 'curvature' (dim_0: 5, dim_1: 5)> array([[nan, nan, nan, nan, nan], [nan, -0., -0., -0., nan], [nan, -0., -0., -0., nan], [nan, -0., -0., -0., nan], [nan, nan, nan, nan, nan]]) Dimensions without coordinates: dim_0, dim_1 Attributes: res: (1, 1) Curvature works with Dask with NumPy backed xarray DataArray .. sourcecode:: python >>> convex_data = np.array([ [0, 0, 0, 0, 0], [0, 0, 0, 0, 0], [0, 0, -1, 0, 0], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]], dtype=np.float32) >>> convex_raster = xr.DataArray( da.from_array(convex_data, chunks=(3, 3)), attrs={'res': (10, 10)}, name='convex_dask_numpy_raster') >>> print(convex_raster) <xarray.DataArray 'convex_dask_numpy_raster' (dim_0: 5, dim_1: 5)> dask.array<array, shape=(5, 5), dtype=float32, chunksize=(3, 3), chunktype=numpy.ndarray> Dimensions without coordinates: dim_0, dim_1 Attributes: res: (10, 10) >>> convex_curv = curvature(convex_raster, name='convex_curvature') >>> print(convex_curv) # return a xarray DataArray with Dask-backed array <xarray.DataArray 'convex_curvature' (dim_0: 5, dim_1: 5)> dask.array<_trim, shape=(5, 5), dtype=float32, chunksize=(3, 3), chunktype=numpy.ndarray> Dimensions without coordinates: dim_0, dim_1 Attributes: res: (10, 10) >>> print(convex_curv.compute()) <xarray.DataArray 'convex_curvature' (dim_0: 5, dim_1: 5)> array([[nan, nan, nan, nan, nan], [nan, -0., 1., -0., nan], [nan, 1., -4., 1., nan], [nan, -0., 1., -0., nan], [nan, nan, nan, nan, nan]]) Dimensions without coordinates: dim_0, dim_1 Attributes: res: (10, 10) Curvature works with CuPy backed xarray DataArray. .. sourcecode:: python >>> import cupy >>> concave_data = np.array([ [0, 0, 0, 0, 0], [0, 0, 0, 0, 0], [0, 0, 1, 0, 0], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]], dtype=np.float32) >>> concave_raster = xr.DataArray( cupy.asarray(concave_data), attrs={'res': (10, 10)}, name='concave_cupy_raster') >>> concave_curv = curvature(concave_raster) >>> print(type(concave_curv.data)) <class 'cupy.core.core.ndarray'> >>> print(concave_curv) <xarray.DataArray 'curvature' (dim_0: 5, dim_1: 5)> array([[nan, nan, nan, nan, nan], [nan, -0., -1., -0., nan], [nan, -1., 4., -1., nan], [nan, -0., -1., -0., nan], [nan, nan, nan, nan, nan]], dtype=float32) Dimensions without coordinates: dim_0, dim_1 Attributes: res: (10, 10) """ _validate_raster(agg, func_name='curvature', name='agg') cellsize_x, cellsize_y = get_dataarray_resolution(agg) if (not np.isfinite(cellsize_x) or not np.isfinite(cellsize_y) or cellsize_x == 0 or cellsize_y == 0): raise ValueError( "curvature() requires a non-zero, finite cell size on both axes; " f"got cellsize_x={cellsize_x!r}, cellsize_y={cellsize_y!r}. " "Set agg.attrs['res'] to a (x, y) tuple of non-zero floats, " "or attach numeric x/y coordinates to the DataArray." ) cellsize = (cellsize_x + cellsize_y) / 2 _validate_boundary(boundary) mapper = ArrayTypeFunctionMapping( numpy_func=_run_numpy, cupy_func=_run_cupy, dask_func=_run_dask_numpy, dask_cupy_func=_run_dask_cupy ) out = mapper(agg)(agg.data, cellsize, boundary) return xr.DataArray(out, name=name, coords=agg.coords, dims=agg.dims, attrs=agg.attrs)