Source code for xrspatial.hydro.flow_direction_d8

from __future__ import annotations

import math
from functools import partial
from typing import Union

try:
    import cupy
except ImportError:
    class cupy(object):
        ndarray = False

try:
    import dask.array as da
except ImportError:
    da = None

import numpy as np
import xarray as xr
from numba import cuda

from xrspatial.dataset_support import supports_dataset
from xrspatial.utils import (ArrayTypeFunctionMapping, _boundary_to_dask, _pad_array,
                             _validate_boundary, _validate_raster, cuda_args,
                             get_dataarray_resolution, ngjit)

# =====================================================================
# CPU kernel
# =====================================================================


@ngjit
def _cpu(data, cellsize_x, cellsize_y):
    out = np.empty(data.shape, np.float64)
    out[:] = np.nan
    rows, cols = data.shape

    # 8 neighbor offsets: E, SE, S, SW, W, NW, N, NE
    dy = np.array([0, 1, 1, 1, 0, -1, -1, -1])
    dx = np.array([1, 1, 0, -1, -1, -1, 0, 1])
    codes = np.array([1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0])

    diag = math.sqrt(cellsize_x * cellsize_x + cellsize_y * cellsize_y)
    # distances: E=cx, SE=diag, S=cy, SW=diag, W=cx, NW=diag, N=cy, NE=diag
    dists = np.array([cellsize_x, diag, cellsize_y, diag,
                      cellsize_x, diag, cellsize_y, diag])

    for y in range(1, rows - 1):
        for x in range(1, cols - 1):
            center = data[y, x]
            if center != center:  # NaN check
                continue

            has_nan = False
            for k in range(8):
                v = data[y + dy[k], x + dx[k]]
                if v != v:
                    has_nan = True
                    break
            if has_nan:
                continue

            max_slope = -math.inf
            direction = 0.0
            for k in range(8):
                v = data[y + dy[k], x + dx[k]]
                grad = (center - v) / dists[k]
                if grad > max_slope:
                    max_slope = grad
                    direction = codes[k]

            if max_slope <= 0.0:
                out[y, x] = 0.0
            else:
                out[y, x] = direction

    return out


# =====================================================================
# GPU kernels
# =====================================================================

@cuda.jit(device=True)
def _gpu(arr, cellsize_x, cellsize_y):
    center = arr[1, 1]
    if center != center:
        return center  # NaN

    cx = cellsize_x[0]
    cy = cellsize_y[0]
    diag = (cx * cx + cy * cy) ** 0.5

    max_slope = -1.0e308
    direction = 0.0

    # E: arr[1, 2], distance = cx, code = 1
    v = arr[1, 2]
    if v != v:
        return v
    grad = (center - v) / cx
    if grad > max_slope:
        max_slope = grad
        direction = 1.0

    # SE: arr[2, 2], distance = diag, code = 2
    v = arr[2, 2]
    if v != v:
        return v
    grad = (center - v) / diag
    if grad > max_slope:
        max_slope = grad
        direction = 2.0

    # S: arr[2, 1], distance = cy, code = 4
    v = arr[2, 1]
    if v != v:
        return v
    grad = (center - v) / cy
    if grad > max_slope:
        max_slope = grad
        direction = 4.0

    # SW: arr[2, 0], distance = diag, code = 8
    v = arr[2, 0]
    if v != v:
        return v
    grad = (center - v) / diag
    if grad > max_slope:
        max_slope = grad
        direction = 8.0

    # W: arr[1, 0], distance = cx, code = 16
    v = arr[1, 0]
    if v != v:
        return v
    grad = (center - v) / cx
    if grad > max_slope:
        max_slope = grad
        direction = 16.0

    # NW: arr[0, 0], distance = diag, code = 32
    v = arr[0, 0]
    if v != v:
        return v
    grad = (center - v) / diag
    if grad > max_slope:
        max_slope = grad
        direction = 32.0

    # N: arr[0, 1], distance = cy, code = 64
    v = arr[0, 1]
    if v != v:
        return v
    grad = (center - v) / cy
    if grad > max_slope:
        max_slope = grad
        direction = 64.0

    # NE: arr[0, 2], distance = diag, code = 128
    v = arr[0, 2]
    if v != v:
        return v
    grad = (center - v) / diag
    if grad > max_slope:
        max_slope = grad
        direction = 128.0

    if max_slope <= 0.0:
        return 0.0

    return direction


@cuda.jit
def _run_gpu(arr, cellsize_x_arr, cellsize_y_arr, out):
    i, j = cuda.grid(2)
    di = 1
    dj = 1
    if (i - di >= 0 and i + di < out.shape[0] and
            j - dj >= 0 and j + dj < out.shape[1]):
        out[i, j] = _gpu(arr[i - di:i + di + 1, j - dj:j + dj + 1],
                         cellsize_x_arr,
                         cellsize_y_arr)


# =====================================================================
# Backend wrappers
# =====================================================================

def _run_numpy(data: np.ndarray,
               cellsize_x: Union[int, float],
               cellsize_y: Union[int, float],
               boundary: str = 'nan') -> np.ndarray:
    data = data.astype(np.float64)
    if boundary == 'nan':
        return _cpu(data, cellsize_x, cellsize_y)
    padded = _pad_array(data, 1, boundary)
    result = _cpu(padded, cellsize_x, cellsize_y)
    return result[1:-1, 1:-1]


def _run_dask_numpy(data: da.Array,
                    cellsize_x: Union[int, float],
                    cellsize_y: Union[int, float],
                    boundary: str = 'nan') -> da.Array:
    data = data.astype(np.float64)
    _func = partial(_cpu,
                    cellsize_x=cellsize_x,
                    cellsize_y=cellsize_y)

    out = data.map_overlap(_func,
                           depth=(1, 1),
                           boundary=_boundary_to_dask(boundary),
                           meta=np.array(()))
    return out


def _run_cupy(data: cupy.ndarray,
              cellsize_x: Union[int, float],
              cellsize_y: Union[int, float],
              boundary: str = 'nan') -> cupy.ndarray:
    if boundary != 'nan':
        padded = _pad_array(data, 1, boundary)
        result = _run_cupy(padded, cellsize_x, cellsize_y)
        return result[1:-1, 1:-1]

    cellsize_x_arr = cupy.array([float(cellsize_x)], dtype='f8')
    cellsize_y_arr = cupy.array([float(cellsize_y)], dtype='f8')
    data = data.astype(cupy.float64)

    griddim, blockdim = cuda_args(data.shape)
    out = cupy.empty(data.shape, dtype='f8')
    out[:] = cupy.nan

    _run_gpu[griddim, blockdim](data,
                                cellsize_x_arr,
                                cellsize_y_arr,
                                out)
    return out


def _run_dask_cupy(data: da.Array,
                   cellsize_x: Union[int, float],
                   cellsize_y: Union[int, float],
                   boundary: str = 'nan') -> da.Array:
    data = data.astype(cupy.float64)
    _func = partial(_run_cupy,
                    cellsize_x=cellsize_x,
                    cellsize_y=cellsize_y)

    out = data.map_overlap(_func,
                           depth=(1, 1),
                           boundary=_boundary_to_dask(boundary, is_cupy=True),
                           meta=cupy.array(()))
    return out


# =====================================================================
# Public API
# =====================================================================

[docs] @supports_dataset def flow_direction_d8(agg: xr.DataArray, name: str = 'flow_direction', boundary: str = 'nan') -> xr.DataArray: """Compute D8 flow direction for each cell. Determines which of the 8 neighbors has the steepest downhill gradient from the center cell. Uses the standard power-of-two D8 direction encoding:: 32 64 128 16 0 1 8 4 2 Parameters ---------- agg : xarray.DataArray or xr.Dataset 2D NumPy, CuPy, NumPy-backed Dask, or CuPy-backed Dask xarray DataArray of elevation values. If a Dataset is passed, the operation is applied to each data variable independently. name : str, default='flow_direction' Name of output DataArray. boundary : str, default='nan' How to handle edges where the kernel extends beyond the raster. ``'nan'`` - fill missing neighbours with NaN (default). ``'nearest'`` - repeat edge values. ``'reflect'`` - mirror at boundary. ``'wrap'`` - periodic / toroidal. Returns ------- xarray.DataArray or xr.Dataset 2D array of D8 flow direction codes. Valid values are ``{0, 1, 2, 4, 8, 16, 32, 64, 128}`` for interior cells (0 indicates a pit or flat with no downhill neighbor). Edge cells and cells with NaN in their 3x3 window are NaN. References ---------- Jenson, S.K. and Domingue, J.O. (1988). Extracting Topographic Structure from Digital Elevation Data for Geographic Information System Analysis. Photogrammetric Engineering and Remote Sensing, 54(11), 1593-1600. """ _validate_raster(agg, func_name='flow_direction', name='agg') _validate_boundary(boundary) cellsize_x, cellsize_y = get_dataarray_resolution(agg) if not (np.isfinite(cellsize_x) and cellsize_x != 0 and np.isfinite(cellsize_y) and cellsize_y != 0): raise ValueError( f"flow_direction(): cellsize must be finite and non-zero " f"(got cellsize_x={cellsize_x}, cellsize_y={cellsize_y}). " f"Ensure agg has at least 2 cells per spatial dimension " f"with finite coords." ) mapper = ArrayTypeFunctionMapping( numpy_func=_run_numpy, cupy_func=_run_cupy, dask_func=_run_dask_numpy, dask_cupy_func=_run_dask_cupy, ) out = mapper(agg)(agg.data, cellsize_x, cellsize_y, boundary) return xr.DataArray(out, name=name, coords=agg.coords, dims=agg.dims, attrs=agg.attrs)