"""GLCM (Gray-Level Co-occurrence Matrix) texture metrics.
Computes Haralick texture features over a sliding window on a raster.
Supports numpy, cupy, dask+numpy, and dask+cupy backends.
"""
from __future__ import annotations
import numpy as np
import xarray as xr
try:
import dask.array as da
except ImportError:
da = None
try:
import cupy
except ImportError:
class cupy:
ndarray = False
from xrspatial.utils import (
ArrayTypeFunctionMapping,
_validate_raster,
_validate_scalar,
is_cupy_array,
ngjit,
not_implemented_func,
)
VALID_METRICS = ('contrast', 'dissimilarity', 'homogeneity',
'energy', 'correlation', 'entropy')
# Offset vectors for the four standard GLCM angles (row_offset, col_offset).
_ANGLE_OFFSETS = {
0: (0, 1),
45: (-1, 1),
90: (-1, 0),
135: (-1, -1),
}
[docs]
def glcm_texture(
agg,
metric='contrast',
window_size=7,
levels=64,
distance=1,
angle=None,
):
"""Compute GLCM texture metrics over a sliding window.
Parameters
----------
agg : xr.DataArray
2-D input raster with numeric dtype.
metric : str or list of str
One or more of: 'contrast', 'dissimilarity', 'homogeneity',
'energy', 'correlation', 'entropy'.
window_size : int
Side length of the sliding window. Must be odd, >= 3, and
<= min(rows, cols) of the input raster.
levels : int
Number of gray levels for quantization (2-256).
distance : int
Pixel pair distance. Must be >= 1 and <= window_size // 2 so
both pixels of a pair can fit inside the window.
angle : int or None
Co-occurrence angle in degrees: 0, 45, 90, or 135.
If None, averages over all four angles.
Returns
-------
xr.DataArray
If a single metric is requested, returns a 2-D DataArray.
If multiple metrics are requested, returns a 3-D DataArray
with a leading 'metric' dimension.
"""
_validate_raster(agg, func_name='glcm_texture', ndim=2)
# Cap window_size at min(rows, cols). A window larger than the raster
# cannot sample anything new, and an arbitrarily large cap would let a
# caller pass e.g. window_size=1e6 and spin the per-pixel window_size**2
# numba loop for hours (CPU-time DoS). On the dask backends the window
# also drives map_overlap depth, so an uncapped value triggers oversize
# per-chunk padding allocations (memory DoS).
rows, cols = agg.shape[-2:]
max_window = max(3, min(rows, cols))
_validate_scalar(window_size, func_name='glcm_texture', name='window_size',
dtype=int, min_val=3, max_val=max_window)
if window_size % 2 == 0:
raise ValueError("glcm_texture(): `window_size` must be odd, "
f"got {window_size}")
_validate_scalar(levels, func_name='glcm_texture', name='levels',
dtype=int, min_val=2, max_val=256)
# distance must fit inside the window; otherwise every (pixel, neighbor)
# pair falls outside and no GLCM entries are populated.
max_distance = max(1, window_size // 2)
_validate_scalar(distance, func_name='glcm_texture', name='distance',
dtype=int, min_val=1, max_val=max_distance)
if angle is not None:
if angle not in _ANGLE_OFFSETS:
raise ValueError(
f"glcm_texture(): `angle` must be one of "
f"{list(_ANGLE_OFFSETS.keys())} or None, got {angle}"
)
single_metric = isinstance(metric, str)
if single_metric:
metrics = [metric]
else:
metrics = list(metric)
for m in metrics:
if m not in VALID_METRICS:
raise ValueError(
f"glcm_texture(): unknown metric {m!r}, "
f"must be one of {VALID_METRICS}"
)
# Sort metrics to match the kernel's output order (VALID_METRICS order).
# Without this, coordinate labels would be wrong when the user requests
# metrics in a different order (e.g. ['entropy', 'contrast']).
metrics = _sorted_metrics(metrics)
mapper = ArrayTypeFunctionMapping(
numpy_func=_glcm_numpy,
cupy_func=_glcm_cupy,
dask_func=_glcm_dask_numpy,
dask_cupy_func=_glcm_dask_cupy,
)
func = mapper(agg)
result = func(agg, metrics, window_size, levels, distance, angle)
if single_metric:
result = result.isel(metric=0, drop=True)
return result
# ---------------------------------------------------------------------------
# Quantization
# ---------------------------------------------------------------------------
def _quantize(data, levels, dmin=None, dmax=None):
"""Quantize data to integer levels [0, levels-1]. NaN maps to -1."""
xp = np
if is_cupy_array(data):
xp = cupy
result = xp.full(data.shape, -1, dtype=np.int32)
valid = ~xp.isnan(data)
if not xp.any(valid):
return result
if dmin is None:
dmin = float(xp.nanmin(data))
if dmax is None:
dmax = float(xp.nanmax(data))
if dmin == dmax:
result[valid] = 0
return result
scale = (levels - 1) / (dmax - dmin)
result[valid] = xp.clip(
((data[valid] - dmin) * scale).astype(np.int32),
0, levels - 1,
)
return result
# ---------------------------------------------------------------------------
# Numba kernel (shared by all backends)
# ---------------------------------------------------------------------------
@ngjit
def _glcm_numba_kernel(quantized, out, metric_flags, levels, half, dy, dx):
"""Numba-jitted GLCM kernel for a single angle.
metric_flags: length-6 bool array in VALID_METRICS order.
"""
h = quantized.shape[0]
w = quantized.shape[1]
for r in range(h):
for c in range(w):
glcm = np.zeros((levels, levels), dtype=np.float64)
count = 0.0
for wy in range(r - half, r + half + 1):
for wx in range(c - half, c + half + 1):
ny = wy + dy
nx = wx + dx
if (0 <= wy < h and 0 <= wx < w and
0 <= ny < h and 0 <= nx < w):
i_val = quantized[wy, wx]
j_val = quantized[ny, nx]
if i_val >= 0 and j_val >= 0:
glcm[i_val, j_val] += 1.0
count += 1.0
if count == 0.0:
continue
for i in range(levels):
for j in range(levels):
glcm[i, j] /= count
need_corr = metric_flags[4]
mu_i = 0.0
mu_j = 0.0
std_i = 0.0
std_j = 0.0
if need_corr:
for i in range(levels):
row_sum = 0.0
col_sum = 0.0
for j in range(levels):
row_sum += glcm[i, j]
col_sum += glcm[j, i]
mu_i += i * row_sum
mu_j += i * col_sum
var_i = 0.0
var_j = 0.0
for i in range(levels):
row_sum = 0.0
col_sum = 0.0
for j in range(levels):
row_sum += glcm[i, j]
col_sum += glcm[j, i]
var_i += (i - mu_i) ** 2 * row_sum
var_j += (i - mu_j) ** 2 * col_sum
std_i = var_i ** 0.5
std_j = var_j ** 0.5
v_contrast = 0.0
v_dissimilarity = 0.0
v_homogeneity = 0.0
v_energy = 0.0
v_correlation = 0.0
v_entropy = 0.0
for i in range(levels):
for j in range(levels):
p = glcm[i, j]
if p == 0.0:
continue
diff = i - j
if diff < 0:
diff = -diff
if metric_flags[0]:
v_contrast += p * diff * diff
if metric_flags[1]:
v_dissimilarity += p * diff
if metric_flags[2]:
v_homogeneity += p / (1.0 + diff * diff)
if metric_flags[3]:
v_energy += p * p
if need_corr and std_i > 0 and std_j > 0:
v_correlation += (
p * (i - mu_i) * (j - mu_j) / (std_i * std_j)
)
if metric_flags[5]:
v_entropy -= p * np.log(p)
idx = 0
if metric_flags[0]:
out[idx, r, c] = v_contrast
idx += 1
if metric_flags[1]:
out[idx, r, c] = v_dissimilarity
idx += 1
if metric_flags[2]:
out[idx, r, c] = v_homogeneity
idx += 1
if metric_flags[3]:
out[idx, r, c] = v_energy
idx += 1
if metric_flags[4]:
if std_i > 0 and std_j > 0:
out[idx, r, c] = v_correlation
else:
out[idx, r, c] = np.nan
idx += 1
if metric_flags[5]:
out[idx, r, c] = v_entropy
idx += 1
def _metric_flags(metrics):
"""Convert metric names to a boolean flag array in VALID_METRICS order."""
flags = np.zeros(len(VALID_METRICS), dtype=np.bool_)
for m in metrics:
flags[VALID_METRICS.index(m)] = True
return flags
def _sorted_metrics(metrics):
"""Return *metrics* sorted in VALID_METRICS order.
The numba kernel always writes output slots in VALID_METRICS order,
so coordinate labels must follow the same ordering.
"""
order = {m: i for i, m in enumerate(VALID_METRICS)}
return sorted(metrics, key=lambda m: order[m])
def _run_glcm_on_quantized(quantized, metrics, window_size, levels,
distance, angle):
"""Run GLCM computation on pre-quantized int32 data.
Returns (n_metrics, H, W) float64 array.
"""
h, w = quantized.shape
n_metrics = len(metrics)
half = window_size // 2
flags = _metric_flags(metrics)
if angle is not None:
out = np.full((n_metrics, h, w), np.nan, dtype=np.float64)
dy, dx = _ANGLE_OFFSETS[angle]
dy *= distance
dx *= distance
_glcm_numba_kernel(quantized, out, flags, levels, half, dy, dx)
else:
# Average across all four angles using nanmean-style accumulation:
# only count angles that produced a valid (non-NaN) value at each
# pixel. If no angle produced a valid value (e.g. all-NaN window,
# 1x1 raster), the output stays NaN -- otherwise the user could
# not distinguish "metric is 0" from "no valid data".
sums = np.zeros((n_metrics, h, w), dtype=np.float64)
counts = np.zeros((n_metrics, h, w), dtype=np.int32)
for a in _ANGLE_OFFSETS:
tmp = np.full((n_metrics, h, w), np.nan, dtype=np.float64)
dy, dx = _ANGLE_OFFSETS[a]
dy *= distance
dx *= distance
_glcm_numba_kernel(quantized, tmp, flags, levels, half, dy, dx)
valid = ~np.isnan(tmp)
sums[valid] += tmp[valid]
counts[valid] += 1
out = np.full((n_metrics, h, w), np.nan, dtype=np.float64)
any_valid = counts > 0
out[any_valid] = sums[any_valid] / counts[any_valid]
return out
# ---------------------------------------------------------------------------
# NumPy backend
# ---------------------------------------------------------------------------
def _glcm_numpy(agg, metrics, window_size, levels, distance, angle):
data = agg.values.astype(np.float64)
quantized = _quantize(data, levels)
result = _run_glcm_on_quantized(quantized, metrics, window_size,
levels, distance, angle)
coords = dict(agg.coords)
dims = ('metric',) + agg.dims
return xr.DataArray(
result, dims=dims,
coords={'metric': list(metrics), **coords},
attrs=agg.attrs,
)
# ---------------------------------------------------------------------------
# Dask + NumPy backend
# ---------------------------------------------------------------------------
def _glcm_dask_numpy(agg, metrics, window_size, levels, distance, angle):
if da is None:
raise ImportError("dask is required for the dask+numpy backend")
data = agg.data.astype(np.float64)
depth = window_size // 2 + distance
# Global min/max for consistent quantization across chunks
dmin = float(da.nanmin(data))
dmax = float(da.nanmax(data))
quantized = _dask_quantize(data, levels, dmin, dmax)
# Compute each metric individually via map_overlap then stack
layers = []
for m in metrics:
single = [m]
def _chunk_func(block, _single=single):
return _run_glcm_on_quantized(block, _single, window_size,
levels, distance, angle)[0]
layer = da.map_overlap(
_chunk_func, quantized,
depth=depth, boundary=-1, dtype=np.float64,
)
layers.append(layer)
result = da.stack(layers, axis=0)
coords = dict(agg.coords)
dims = ('metric',) + agg.dims
return xr.DataArray(
result, dims=dims,
coords={'metric': list(metrics), **coords},
attrs=agg.attrs,
)
def _dask_quantize(data, levels, dmin, dmax):
"""Quantize a dask array to int32 levels. NaN maps to -1."""
if dmin == dmax:
return da.where(da.isnan(data), -1, 0).astype(np.int32)
scale = (levels - 1) / (dmax - dmin)
result = da.clip(((data - dmin) * scale).astype(np.int32),
0, levels - 1)
return da.where(da.isnan(data), -1, result).astype(np.int32)
# ---------------------------------------------------------------------------
# CuPy backend (CPU fallback via cupy.asnumpy)
# ---------------------------------------------------------------------------
def _glcm_cupy(agg, metrics, window_size, levels, distance, angle):
data = cupy.asnumpy(agg.data).astype(np.float64)
quantized = _quantize(data, levels)
result_np = _run_glcm_on_quantized(quantized, metrics, window_size,
levels, distance, angle)
result_cp = cupy.asarray(result_np)
coords = dict(agg.coords)
dims = ('metric',) + agg.dims
return xr.DataArray(
result_cp, dims=dims,
coords={'metric': list(metrics), **coords},
attrs=agg.attrs,
)
# ---------------------------------------------------------------------------
# Dask + CuPy backend (CPU fallback per chunk)
# ---------------------------------------------------------------------------
def _glcm_dask_cupy(agg, metrics, window_size, levels, distance, angle):
if da is None:
raise ImportError("dask is required for the dask+cupy backend")
data = agg.data
depth = window_size // 2 + distance
dmin = float(da.nanmin(data))
dmax = float(da.nanmax(data))
quantized = _dask_quantize(data, levels, dmin, dmax)
layers = []
for m in metrics:
single = [m]
def _chunk_func(block, _single=single):
block_np = cupy.asnumpy(block)
result = _run_glcm_on_quantized(block_np, _single, window_size,
levels, distance, angle)[0]
return cupy.asarray(result)
layer = da.map_overlap(
_chunk_func, quantized,
depth=depth, boundary=-1, dtype=np.float64,
)
layers.append(layer)
result = da.stack(layers, axis=0)
coords = dict(agg.coords)
dims = ('metric',) + agg.dims
return xr.DataArray(
result, dims=dims,
coords={'metric': list(metrics), **coords},
attrs=agg.attrs,
)