Pathfinding#
Xarray-spatial provides A* pathfinding for finding optimal routes through raster surfaces. Paths can be computed using geometric distance alone (unweighted) or weighted by a friction/cost surface, which makes the algorithm find the least-cost path rather than the geometrically shortest one.
multi_stop_search extends A* to visit a sequence of waypoints, stitching segments into a single cumulative-cost surface. It can optionally reorder interior waypoints to minimize total travel cost (TSP).
All four backends are supported:
Backend |
Strategy |
|---|---|
NumPy |
Numba-jitted kernel (fast, in-memory) |
Dask |
Sparse Python A* with LRU chunk cache — loads chunks on demand so the full grid never needs to fit in RAM |
CuPy |
CPU fallback (transfers to numpy, runs the numba kernel, transfers back) |
Dask+CuPy |
Same sparse A* as Dask, with automatic cupy-to-numpy chunk conversion |
Note: snap_start and snap_goal are not supported with Dask-backed arrays because the brute-force nearest-pixel scan is O(h*w). Ensure the start and goal pixels are valid before calling a_star_search.
Unweighted A*: find the shortest geometric path through a line network
Weighted A*: find the least-cost path through a friction surface
Multi-Stop Search: route through multiple waypoints with optional reordering
Dask (out-of-core): pathfinding on datasets that don’t fit in RAM
Importing Packages#
[1]:
import numpy as np
import pandas as pd
import xarray as xr
import datashader as ds
from datashader.transfer_functions import shade
from datashader.transfer_functions import stack
from datashader.transfer_functions import dynspread
from datashader.transfer_functions import set_background
from datashader.colors import Elevation
import xrspatial
from xrspatial import a_star_search, multi_stop_search, generate_terrain, slope, cost_distance
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[1], line 5
1 import numpy as np
2 import pandas as pd
3 import xarray as xr
4
----> 5 import datashader as ds
6 from datashader.transfer_functions import shade
7 from datashader.transfer_functions import stack
8 from datashader.transfer_functions import dynspread
ModuleNotFoundError: No module named 'datashader'
Unweighted A*#
A* is an informed search algorithm that finds the shortest path from a start to a goal through a graph. In the unweighted case, edge costs are purely geometric (Euclidean distance scaled by cellsize).
We’ll demonstrate this on a synthetic line raster with barriers.
Create a line raster#
[2]:
# define range of x and y
xrange = (0, 4)
yrange = (0, 4)
# create line raster
ys = [1, 1, 3, 3, 1, 1, np.nan, 1, 3, np.nan, 1, 3, np.nan, 1, 3, np.nan, 2, 2]
xs = [1, 3, 3, 1, 1, 3, np.nan, 1, 3, np.nan, 3, 1, np.nan, 1, 2, np.nan, 1, 3]
line_df = pd.DataFrame(dict(x=xs, y=ys))
W = 800
H = 600
cvs = ds.Canvas(plot_width=W, plot_height=H, x_range=xrange, y_range=yrange)
line_agg = cvs.line(line_df, x="x", y="y").astype(int)
line_shaded = dynspread(shade(line_agg, cmap=["black", "salmon"]))
# pick start and goal locations
start = (3, 1)
goal = (1, 3)
start_df = pd.DataFrame({"x": [start[1]], "y": [start[0]]})
start_agg = cvs.points(start_df, "x", "y")
start_shaded = dynspread(shade(start_agg, cmap=["red"]), threshold=1, max_px=5)
goal_df = pd.DataFrame({"x": [goal[1]], "y": [goal[0]]})
goal_agg = cvs.points(goal_df, "x", "y")
goal_shaded = dynspread(shade(goal_agg, cmap=["lime"]), threshold=1, max_px=5)
set_background(stack(line_shaded, start_shaded, goal_shaded), "black")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[2], line 12
8 line_df = pd.DataFrame(dict(x=xs, y=ys))
9
10 W = 800
11 H = 600
---> 12 cvs = ds.Canvas(plot_width=W, plot_height=H, x_range=xrange, y_range=yrange)
13 line_agg = cvs.line(line_df, x="x", y="y").astype(int)
14 line_shaded = dynspread(shade(line_agg, cmap=["black", "salmon"]))
15
NameError: name 'ds' is not defined
8-connectivity shortest path#
Cells with value 0 (off the lines) are barriers. The path is highlighted in green.
[3]:
path_agg_8 = a_star_search(
line_agg, start, goal, barriers=[0], snap_start=True, snap_goal=True
)
path_shaded = dynspread(shade(path_agg_8, cmap=["green"]))
set_background(stack(line_shaded, path_shaded, start_shaded, goal_shaded), "black")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[3], line 1
----> 1 path_agg_8 = a_star_search(
2 line_agg, start, goal, barriers=[0], snap_start=True, snap_goal=True
3 )
4
NameError: name 'a_star_search' is not defined
4-connectivity shortest path#
[4]:
path_agg_4 = a_star_search(
line_agg, start, goal, barriers=[0],
snap_start=True, snap_goal=True, connectivity=4
)
path_shaded = dynspread(shade(path_agg_4, cmap=["green"]))
set_background(stack(line_shaded, path_shaded, start_shaded, goal_shaded), "black")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[4], line 1
----> 1 path_agg_4 = a_star_search(
2 line_agg, start, goal, barriers=[0],
3 snap_start=True, snap_goal=True, connectivity=4
4 )
NameError: name 'a_star_search' is not defined
Weighted A*#
When a friction surface is provided, A* finds the least-cost path rather than the geometrically shortest one. Each edge cost becomes geometric_distance * mean_friction_of_endpoints, the same cost model used by cost_distance().
This is useful for terrain routing: a path over flat ground may be longer in distance but much cheaper than a short path over steep slopes.
Generate terrain and friction surface#
[5]:
# Generate a terrain raster
W, H = 600, 400
template = xr.DataArray(np.zeros((H, W)), dims=['y', 'x'])
terrain = generate_terrain(template, seed=12345)
shade(terrain, cmap=Elevation, how="linear")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[5], line 4
1 # Generate a terrain raster
2 W, H = 600, 400
3 template = xr.DataArray(np.zeros((H, W)), dims=['y', 'x'])
----> 4 terrain = generate_terrain(template, seed=12345)
5
6 shade(terrain, cmap=Elevation, how="linear")
NameError: name 'generate_terrain' is not defined
[6]:
# Derive friction from slope: steeper = higher cost
terrain_slope = slope(terrain)
# slope() produces NaN at edges; fill with 0 so friction = 1 there (passable, flat)
# Add 1 so flat areas have friction=1 rather than 0
friction = terrain_slope.fillna(0.0) + 1.0
shade(friction, cmap=["lightgreen", "yellow", "red"], how="eq_hist")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[6], line 2
1 # Derive friction from slope: steeper = higher cost
----> 2 terrain_slope = slope(terrain)
3
4 # slope() produces NaN at edges; fill with 0 so friction = 1 there (passable, flat)
5 # Add 1 so flat areas have friction=1 rather than 0
NameError: name 'slope' is not defined
Find paths: unweighted vs weighted#
We pick a start and goal on opposite sides of a ridge, then compare the path found with and without friction weighting.
[7]:
# Choose start and goal coordinates
start_coord = (400.0, 50.0)
goal_coord = (100.0, 450.0)
# Unweighted path (geometric distance only)
path_unweighted = a_star_search(terrain, start_coord, goal_coord)
# Weighted path (friction from slope)
path_weighted = a_star_search(terrain, start_coord, goal_coord, friction=friction)
print(f"Unweighted path cost at goal: {np.nanmax(path_unweighted):.2f}")
print(f"Weighted path cost at goal: {np.nanmax(path_weighted):.2f}")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[7], line 6
2 start_coord = (400.0, 50.0)
3 goal_coord = (100.0, 450.0)
4
5 # Unweighted path (geometric distance only)
----> 6 path_unweighted = a_star_search(terrain, start_coord, goal_coord)
7
8 # Weighted path (friction from slope)
9 path_weighted = a_star_search(terrain, start_coord, goal_coord, friction=friction)
NameError: name 'a_star_search' is not defined
Visualise both paths on the terrain#
Red = unweighted (geometrically shortest), green = weighted (least-cost). Notice how the weighted path avoids steep ridges even if it’s longer.
[8]:
terrain_shaded = shade(terrain, cmap=Elevation, how="linear", alpha=180)
unweighted_shaded = dynspread(
shade(path_unweighted, cmap=["red", "red"], how="linear", min_alpha=255),
threshold=1, max_px=2,
)
weighted_shaded = dynspread(
shade(path_weighted, cmap=["lime", "lime"], how="linear", min_alpha=255),
threshold=1, max_px=2,
)
stack(terrain_shaded, unweighted_shaded, weighted_shaded)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[8], line 1
----> 1 terrain_shaded = shade(terrain, cmap=Elevation, how="linear", alpha=180)
2
3 unweighted_shaded = dynspread(
4 shade(path_unweighted, cmap=["red", "red"], how="linear", min_alpha=255),
NameError: name 'shade' is not defined
Validate against cost_distance#
For a single source, the A* path cost at the goal should match the cost_distance() value at that same pixel. This confirms both algorithms use the same cost model.
[9]:
# Create a source raster with a single target at the start pixel
source_raster = terrain.copy(data=np.zeros_like(terrain.data))
# Find the start pixel index
from xrspatial.pathfinding import _get_pixel_id
spy, spx = _get_pixel_id(start_coord, terrain)
source_raster.data[spy, spx] = 1.0
cd = cost_distance(source_raster, friction)
# Compare: A* cost at goal vs cost_distance at goal
gpy, gpx = _get_pixel_id(goal_coord, terrain)
cd_val = float(cd.values[gpy, gpx])
astar_val = float(path_weighted.values[gpy, gpx])
print(f"cost_distance at goal: {cd_val:.4f}")
print(f"A* path cost at goal: {astar_val:.4f}")
print(f"Match: {np.isclose(cd_val, astar_val, rtol=1e-5)}")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[9], line 2
1 # Create a source raster with a single target at the start pixel
----> 2 source_raster = terrain.copy(data=np.zeros_like(terrain.data))
3 # Find the start pixel index
4 from xrspatial.pathfinding import _get_pixel_id
5 spy, spx = _get_pixel_id(start_coord, terrain)
NameError: name 'terrain' is not defined
Multi-Stop Search#
multi_stop_search routes through a list of waypoints in order, calling A* for each consecutive pair and stitching the segments into one cumulative-cost surface.
Set optimize_order=True to let the solver reorder the interior waypoints (keeping the first and last fixed) to minimize total travel cost. For up to 12 waypoints it uses exact Held-Karp; beyond that it falls back to nearest-neighbor + 2-opt.
Route through three waypoints#
We pick three waypoints across the terrain and let multi_stop_search find the least-cost route visiting them in order.
[10]:
# Three waypoints across the terrain (re-using the terrain and friction from above)
wp_start = (400.0, 50.0)
wp_mid = (250.0, 250.0)
wp_end = (100.0, 450.0)
# Naive order
path_multi = multi_stop_search(
terrain, [wp_start, wp_mid, wp_end], friction=friction
)
print(f"Segment costs: {path_multi.attrs['segment_costs']}")
print(f"Total cost: {path_multi.attrs['total_cost']:.2f}")
# Visualise the multi-stop path
terrain_shaded = shade(terrain, cmap=Elevation, how="linear", alpha=180)
multi_shaded = dynspread(
shade(path_multi, cmap=["cyan", "cyan"], how="linear", min_alpha=255),
threshold=1, max_px=2,
)
stack(terrain_shaded, multi_shaded)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[10], line 7
3 wp_mid = (250.0, 250.0)
4 wp_end = (100.0, 450.0)
5
6 # Naive order
----> 7 path_multi = multi_stop_search(
8 terrain, [wp_start, wp_mid, wp_end], friction=friction
9 )
10
NameError: name 'multi_stop_search' is not defined
Optimize waypoint order#
With four or more waypoints, the given order may not be the cheapest. optimize_order=True solves a TSP over the pairwise A* costs, keeping the first and last waypoints fixed.
[11]:
# Four waypoints in a deliberately suboptimal order (zigzag)
wp_a = (400.0, 50.0) # start (fixed)
wp_b = (100.0, 450.0) # far corner
wp_c = (350.0, 200.0) # back near start
wp_d = (100.0, 350.0) # end (fixed)
waypoints = [wp_a, wp_b, wp_c, wp_d]
# Without optimization
path_naive = multi_stop_search(terrain, waypoints, friction=friction)
# With optimization
path_opt = multi_stop_search(
terrain, waypoints, friction=friction, optimize_order=True
)
print(f"Naive order: {path_naive.attrs['waypoint_order']}")
print(f"Naive cost: {path_naive.attrs['total_cost']:.2f}")
print(f"Optimized order: {path_opt.attrs['waypoint_order']}")
print(f"Optimized cost: {path_opt.attrs['total_cost']:.2f}")
print(f"Savings: {path_naive.attrs['total_cost'] - path_opt.attrs['total_cost']:.2f}")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[11], line 10
6
7 waypoints = [wp_a, wp_b, wp_c, wp_d]
8
9 # Without optimization
---> 10 path_naive = multi_stop_search(terrain, waypoints, friction=friction)
11
12 # With optimization
13 path_opt = multi_stop_search(
NameError: name 'multi_stop_search' is not defined
References#
A* search algorithm: https://en.wikipedia.org/wiki/A*_search_algorithm
Red Blob Games — A* implementation: https://www.redblobgames.com/pathfinding/a-star/implementation.html
Cost distance (weighted proximity):
xrspatial.cost_distance
Dask (out-of-core)#
For rasters that don’t fit in memory, wrap your data in a Dask array. A* explores only the pixels it needs (typically O(path_length) to O(path_length^2)), loading chunks on demand through an LRU cache. The output is a lazy Dask array of the same chunk structure.
[12]:
import dask.array as da
# Re-use the terrain and friction from above, but as dask arrays
terrain_dask = terrain.copy()
terrain_dask.data = da.from_array(terrain.data, chunks=(200, 300))
friction_dask = friction.copy()
friction_dask.data = da.from_array(friction.data, chunks=(200, 300))
# Run A* — chunks are loaded on demand, not all at once
path_dask = a_star_search(terrain_dask, start_coord, goal_coord, friction=friction_dask)
print(f"Result type: {type(path_dask.data)}")
print(f"Chunks: {path_dask.data.chunks}")
# Compute to verify it matches the in-memory result
dask_cost = float(path_dask.values[gpy, gpx])
print(f"Dask A* cost at goal: {dask_cost:.4f}")
print(f"NumPy A* cost at goal: {astar_val:.4f}")
print(f"Match: {np.isclose(dask_cost, astar_val, rtol=1e-5)}")
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[12], line 4
1 import dask.array as da
2
3 # Re-use the terrain and friction from above, but as dask arrays
----> 4 terrain_dask = terrain.copy()
5 terrain_dask.data = da.from_array(terrain.data, chunks=(200, 300))
6
7 friction_dask = friction.copy()
NameError: name 'terrain' is not defined