Pathfinding#

Xarray-spatial provides A* pathfinding for finding optimal routes through raster surfaces. Paths can be computed using geometric distance alone (unweighted) or weighted by a friction/cost surface, which makes the algorithm find the least-cost path rather than the geometrically shortest one.

multi_stop_search extends A* to visit a sequence of waypoints, stitching segments into a single cumulative-cost surface. It can optionally reorder interior waypoints to minimize total travel cost (TSP).

All four backends are supported:

Backend

Strategy

NumPy

Numba-jitted kernel (fast, in-memory)

Dask

Sparse Python A* with LRU chunk cache — loads chunks on demand so the full grid never needs to fit in RAM

CuPy

CPU fallback (transfers to numpy, runs the numba kernel, transfers back)

Dask+CuPy

Same sparse A* as Dask, with automatic cupy-to-numpy chunk conversion

Note: snap_start and snap_goal are not supported with Dask-backed arrays because the brute-force nearest-pixel scan is O(h*w). Ensure the start and goal pixels are valid before calling a_star_search.

Importing Packages#

[1]:
import numpy as np
import pandas as pd
import xarray as xr

import datashader as ds
from datashader.transfer_functions import shade
from datashader.transfer_functions import stack
from datashader.transfer_functions import dynspread
from datashader.transfer_functions import set_background
from datashader.colors import Elevation

import xrspatial
from xrspatial import a_star_search, multi_stop_search, generate_terrain, slope, cost_distance
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[1], line 5
      1 import numpy as np
      2 import pandas as pd
      3 import xarray as xr
      4
----> 5 import datashader as ds
      6 from datashader.transfer_functions import shade
      7 from datashader.transfer_functions import stack
      8 from datashader.transfer_functions import dynspread

ModuleNotFoundError: No module named 'datashader'

Unweighted A*#

A* is an informed search algorithm that finds the shortest path from a start to a goal through a graph. In the unweighted case, edge costs are purely geometric (Euclidean distance scaled by cellsize).

We’ll demonstrate this on a synthetic line raster with barriers.

Create a line raster#

[2]:
# define range of x and y
xrange = (0, 4)
yrange = (0, 4)

# create line raster
ys = [1, 1, 3, 3, 1, 1, np.nan, 1, 3, np.nan, 1, 3, np.nan, 1, 3, np.nan, 2, 2]
xs = [1, 3, 3, 1, 1, 3, np.nan, 1, 3, np.nan, 3, 1, np.nan, 1, 2, np.nan, 1, 3]
line_df = pd.DataFrame(dict(x=xs, y=ys))

W = 800
H = 600
cvs = ds.Canvas(plot_width=W, plot_height=H, x_range=xrange, y_range=yrange)
line_agg = cvs.line(line_df, x="x", y="y").astype(int)
line_shaded = dynspread(shade(line_agg, cmap=["black", "salmon"]))

# pick start and goal locations
start = (3, 1)
goal = (1, 3)

start_df = pd.DataFrame({"x": [start[1]], "y": [start[0]]})
start_agg = cvs.points(start_df, "x", "y")
start_shaded = dynspread(shade(start_agg, cmap=["red"]), threshold=1, max_px=5)

goal_df = pd.DataFrame({"x": [goal[1]], "y": [goal[0]]})
goal_agg = cvs.points(goal_df, "x", "y")
goal_shaded = dynspread(shade(goal_agg, cmap=["lime"]), threshold=1, max_px=5)

set_background(stack(line_shaded, start_shaded, goal_shaded), "black")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[2], line 12
      8 line_df = pd.DataFrame(dict(x=xs, y=ys))
      9
     10 W = 800
     11 H = 600
---> 12 cvs = ds.Canvas(plot_width=W, plot_height=H, x_range=xrange, y_range=yrange)
     13 line_agg = cvs.line(line_df, x="x", y="y").astype(int)
     14 line_shaded = dynspread(shade(line_agg, cmap=["black", "salmon"]))
     15

NameError: name 'ds' is not defined

8-connectivity shortest path#

Cells with value 0 (off the lines) are barriers. The path is highlighted in green.

[3]:
path_agg_8 = a_star_search(
    line_agg, start, goal, barriers=[0], snap_start=True, snap_goal=True
)

path_shaded = dynspread(shade(path_agg_8, cmap=["green"]))
set_background(stack(line_shaded, path_shaded, start_shaded, goal_shaded), "black")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[3], line 1
----> 1 path_agg_8 = a_star_search(
      2     line_agg, start, goal, barriers=[0], snap_start=True, snap_goal=True
      3 )
      4

NameError: name 'a_star_search' is not defined

4-connectivity shortest path#

[4]:
path_agg_4 = a_star_search(
    line_agg, start, goal, barriers=[0],
    snap_start=True, snap_goal=True, connectivity=4
)

path_shaded = dynspread(shade(path_agg_4, cmap=["green"]))
set_background(stack(line_shaded, path_shaded, start_shaded, goal_shaded), "black")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[4], line 1
----> 1 path_agg_4 = a_star_search(
      2     line_agg, start, goal, barriers=[0],
      3     snap_start=True, snap_goal=True, connectivity=4
      4 )

NameError: name 'a_star_search' is not defined

Weighted A*#

When a friction surface is provided, A* finds the least-cost path rather than the geometrically shortest one. Each edge cost becomes geometric_distance * mean_friction_of_endpoints, the same cost model used by cost_distance().

This is useful for terrain routing: a path over flat ground may be longer in distance but much cheaper than a short path over steep slopes.

Generate terrain and friction surface#

[5]:
# Generate a terrain raster
W, H = 600, 400
template = xr.DataArray(np.zeros((H, W)), dims=['y', 'x'])
terrain = generate_terrain(template, seed=12345)

shade(terrain, cmap=Elevation, how="linear")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[5], line 4
      1 # Generate a terrain raster
      2 W, H = 600, 400
      3 template = xr.DataArray(np.zeros((H, W)), dims=['y', 'x'])
----> 4 terrain = generate_terrain(template, seed=12345)
      5
      6 shade(terrain, cmap=Elevation, how="linear")

NameError: name 'generate_terrain' is not defined
[6]:
# Derive friction from slope: steeper = higher cost
terrain_slope = slope(terrain)

# slope() produces NaN at edges; fill with 0 so friction = 1 there (passable, flat)
# Add 1 so flat areas have friction=1 rather than 0
friction = terrain_slope.fillna(0.0) + 1.0

shade(friction, cmap=["lightgreen", "yellow", "red"], how="eq_hist")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[6], line 2
      1 # Derive friction from slope: steeper = higher cost
----> 2 terrain_slope = slope(terrain)
      3
      4 # slope() produces NaN at edges; fill with 0 so friction = 1 there (passable, flat)
      5 # Add 1 so flat areas have friction=1 rather than 0

NameError: name 'slope' is not defined

Find paths: unweighted vs weighted#

We pick a start and goal on opposite sides of a ridge, then compare the path found with and without friction weighting.

[7]:
# Choose start and goal coordinates
start_coord = (400.0, 50.0)
goal_coord = (100.0, 450.0)

# Unweighted path (geometric distance only)
path_unweighted = a_star_search(terrain, start_coord, goal_coord)

# Weighted path (friction from slope)
path_weighted = a_star_search(terrain, start_coord, goal_coord, friction=friction)

print(f"Unweighted path cost at goal: {np.nanmax(path_unweighted):.2f}")
print(f"Weighted path cost at goal:   {np.nanmax(path_weighted):.2f}")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[7], line 6
      2 start_coord = (400.0, 50.0)
      3 goal_coord = (100.0, 450.0)
      4
      5 # Unweighted path (geometric distance only)
----> 6 path_unweighted = a_star_search(terrain, start_coord, goal_coord)
      7
      8 # Weighted path (friction from slope)
      9 path_weighted = a_star_search(terrain, start_coord, goal_coord, friction=friction)

NameError: name 'a_star_search' is not defined

Visualise both paths on the terrain#

Red = unweighted (geometrically shortest), green = weighted (least-cost). Notice how the weighted path avoids steep ridges even if it’s longer.

[8]:
terrain_shaded = shade(terrain, cmap=Elevation, how="linear", alpha=180)

unweighted_shaded = dynspread(
    shade(path_unweighted, cmap=["red", "red"], how="linear", min_alpha=255),
    threshold=1, max_px=2,
)
weighted_shaded = dynspread(
    shade(path_weighted, cmap=["lime", "lime"], how="linear", min_alpha=255),
    threshold=1, max_px=2,
)

stack(terrain_shaded, unweighted_shaded, weighted_shaded)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 terrain_shaded = shade(terrain, cmap=Elevation, how="linear", alpha=180)
      2
      3 unweighted_shaded = dynspread(
      4     shade(path_unweighted, cmap=["red", "red"], how="linear", min_alpha=255),

NameError: name 'shade' is not defined

Validate against cost_distance#

For a single source, the A* path cost at the goal should match the cost_distance() value at that same pixel. This confirms both algorithms use the same cost model.

[9]:
# Create a source raster with a single target at the start pixel
source_raster = terrain.copy(data=np.zeros_like(terrain.data))
# Find the start pixel index
from xrspatial.pathfinding import _get_pixel_id
spy, spx = _get_pixel_id(start_coord, terrain)
source_raster.data[spy, spx] = 1.0

cd = cost_distance(source_raster, friction)

# Compare: A* cost at goal vs cost_distance at goal
gpy, gpx = _get_pixel_id(goal_coord, terrain)
cd_val = float(cd.values[gpy, gpx])
astar_val = float(path_weighted.values[gpy, gpx])

print(f"cost_distance at goal: {cd_val:.4f}")
print(f"A* path cost at goal:  {astar_val:.4f}")
print(f"Match: {np.isclose(cd_val, astar_val, rtol=1e-5)}")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[9], line 2
      1 # Create a source raster with a single target at the start pixel
----> 2 source_raster = terrain.copy(data=np.zeros_like(terrain.data))
      3 # Find the start pixel index
      4 from xrspatial.pathfinding import _get_pixel_id
      5 spy, spx = _get_pixel_id(start_coord, terrain)

NameError: name 'terrain' is not defined

References#

Dask (out-of-core)#

For rasters that don’t fit in memory, wrap your data in a Dask array. A* explores only the pixels it needs (typically O(path_length) to O(path_length^2)), loading chunks on demand through an LRU cache. The output is a lazy Dask array of the same chunk structure.

[12]:
import dask.array as da

# Re-use the terrain and friction from above, but as dask arrays
terrain_dask = terrain.copy()
terrain_dask.data = da.from_array(terrain.data, chunks=(200, 300))

friction_dask = friction.copy()
friction_dask.data = da.from_array(friction.data, chunks=(200, 300))

# Run A* — chunks are loaded on demand, not all at once
path_dask = a_star_search(terrain_dask, start_coord, goal_coord, friction=friction_dask)
print(f"Result type: {type(path_dask.data)}")
print(f"Chunks: {path_dask.data.chunks}")

# Compute to verify it matches the in-memory result
dask_cost = float(path_dask.values[gpy, gpx])
print(f"Dask A* cost at goal:   {dask_cost:.4f}")
print(f"NumPy A* cost at goal:  {astar_val:.4f}")
print(f"Match: {np.isclose(dask_cost, astar_val, rtol=1e-5)}")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[12], line 4
      1 import dask.array as da
      2
      3 # Re-use the terrain and friction from above, but as dask arrays
----> 4 terrain_dask = terrain.copy()
      5 terrain_dask.data = da.from_array(terrain.data, chunks=(200, 300))
      6
      7 friction_dask = friction.copy()

NameError: name 'terrain' is not defined