Skip to content

Commit

Permalink
Limit A* node expansions
Browse files Browse the repository at this point in the history
Closes #27
  • Loading branch information
MKuranowski committed Jul 21, 2024
1 parent c181222 commit b4a6cb4
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 2 deletions.
9 changes: 8 additions & 1 deletion pyroutelib3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@

from . import osm, protocols
from .distance import euclidean_distance, haversine_earth_distance, taxicab_distance
from .router import find_route, find_route_without_turn_around
from .router import (
DEFAULT_STEP_LIMIT,
StepLimitExceeded,
find_route,
find_route_without_turn_around,
)

__all__ = [
"DEFAULT_STEP_LIMIT",
"euclidean_distance",
"find_route_without_turn_around",
"find_route",
"haversine_earth_distance",
"osm",
"protocols",
"StepLimitExceeded",
"taxicab_distance",
]
40 changes: 40 additions & 0 deletions pyroutelib3/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from .distance import haversine_earth_distance
from .protocols import DistanceFunction, ExternalNodeLike, GraphLike, NodeLike

DEFAULT_STEP_LIMIT = 1_000_000
"""Default number of allowed node expansions in :py:func:`find_route` and
:py:func:`find_route_without_turn_around`."""


@dataclass(frozen=True, order=True)
class _AStarQueueItem:
Expand All @@ -21,11 +25,24 @@ class _NodeAndBefore:
external_id_before: Optional[int] = None


class StepLimitExceeded(ValueError):
"""Exception used when a route search has exceeded its limit of steps.
Either the nodes are really far apart, or no route exists.
Concluding that no route exists requires traversing the whole graph,
which on real OpenStreetMap data may require going through the whole planet,
hence this exception.
"""

pass


def find_route(
g: GraphLike[NodeLike],
start: int,
end: int,
distance: DistanceFunction = haversine_earth_distance,
step_limit: Optional[int] = DEFAULT_STEP_LIMIT,
) -> List[int]:
"""find_route uses the `A* algorithm <https://en.wikipedia.org/wiki/A*_search_algorithm>`_
to find the shortest route between two nodes in the provided graph.
Expand All @@ -35,11 +52,18 @@ def find_route(
For graphs with turn restrictions, use :py:func:`find_route_without_turn_around`,
as this implementation will generate instructions with immediate turn-arounds
(A-B-A) to circumvent any restrictions.
``step_limit`` (if not None) limits how many nodes may be expanded during the search
before raising :py:exc:`StepLimitExceeded`. Concluding that no route exists requires
expanding all nodes accessible from the start, which is usually very time-consuming,
especially on large datasets (like the whole planet). Defaults to
:py:const:`DEFAULT_STEP_LIMIT`. Only set to ``None`` on small, contained graphs.
"""
queue: List[_AStarQueueItem] = []
came_from: Dict[int, int] = {}
known_costs: Dict[int, float] = {}
end_position = g.get_node(end).position
steps = 0

# Push the start element onto the queue
queue.append(
Expand All @@ -65,6 +89,10 @@ def find_route(
if item.cost > known_costs.get(item.node_id, inf):
continue

steps += 1
if step_limit is not None and steps > step_limit:
raise StepLimitExceeded()

for neighbor_id, cost in g.get_edges(item.node_id):
neighbor_cost = item.cost + cost
if neighbor_cost < known_costs.get(neighbor_id, inf):
Expand All @@ -82,6 +110,7 @@ def find_route_without_turn_around(
start: int,
end: int,
distance: DistanceFunction = haversine_earth_distance,
step_limit: Optional[int] = DEFAULT_STEP_LIMIT,
) -> List[int]:
"""find_route_without_turn_around uses the `A* algorithm <https://en.wikipedia.org/wiki/A*_search_algorithm>`_
to find the shortest route between two points in the provided graph.
Expand All @@ -92,11 +121,18 @@ def find_route_without_turn_around(
as it runs faster. ``find_route_without_turn_around`` has an extra search dimension -
it needs to not only consider the node, but also what was the previous node to prevent
A-B-A immediate turn-around instructions.
``step_limit`` (if not None) limits how many nodes may be expanded during the search
before raising :py:exc:`StepLimitExceeded`. Concluding that no route exists requires
expanding all nodes accessible from the start, which is usually very time-consuming,
especially on large datasets (like the whole planet). Defaults to
:py:const:`DEFAULT_STEP_LIMIT`. Only set to ``None`` on small, contained graphs.
"""
queue: List[_AStarQueueItem] = []
came_from: Dict[_NodeAndBefore, _NodeAndBefore] = {}
known_costs: Dict[_NodeAndBefore, float] = {}
end_position = g.get_node(end).position
steps = 0

# Push the start element onto the queue
queue.append(
Expand All @@ -123,6 +159,10 @@ def find_route_without_turn_around(
if item.cost > known_costs.get(item_key, inf):
continue

steps += 1
if step_limit is not None and steps > step_limit:
raise StepLimitExceeded()

item_external_id = g.get_node(item.node_id).external_id

for neighbor_id, cost in g.get_edges(item.node_id):
Expand Down
52 changes: 51 additions & 1 deletion pyroutelib3/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from .distance import euclidean_distance
from .protocols import GraphLike, Position
from .router import find_route, find_route_without_turn_around
from .router import StepLimitExceeded, find_route, find_route_without_turn_around


@dataclass
Expand Down Expand Up @@ -99,6 +99,31 @@ def test_shortest_not_optimal(self) -> None:
[1, 2, 3, 6, 9, 8],
)

def test_step_limit(self) -> None:
# (20) (20) (20)
# 1─────2─────3─────4
# └─────5─────┘
# (10) (10)
g = Graph(
nodes={
1: Node(1, (1, 1)),
2: Node(2, (2, 1)),
3: Node(3, (3, 1)),
4: Node(4, (4, 1)),
5: Node(5, (3, 0)),
},
edges={
1: {2: 20},
2: {1: 20, 3: 20, 5: 10},
3: {2: 20, 4: 20},
4: {3: 20, 5: 10},
5: {2: 10, 4: 10},
},
)

with self.assertRaises(StepLimitExceeded):
find_route(g, 1, 4, distance=euclidean_distance, step_limit=2)


class TestFindRouteWithoutTurnAround(TestCase):
def test(self) -> None:
Expand Down Expand Up @@ -140,3 +165,28 @@ def test(self) -> None:
find_route_without_turn_around(g, 1, 3, distance=euclidean_distance),
[1, 20, 4, 5, 3],
)

def test_step_limit(self) -> None:
# (20) (20) (20)
# 1─────2─────3─────4
# └─────5─────┘
# (10) (10)
g = Graph(
nodes={
1: Node(1, (1, 1)),
2: Node(2, (2, 1)),
3: Node(3, (3, 1)),
4: Node(4, (4, 1)),
5: Node(5, (3, 0)),
},
edges={
1: {2: 20},
2: {1: 20, 3: 20, 5: 10},
3: {2: 20, 4: 20},
4: {3: 20, 5: 10},
5: {2: 10, 4: 10},
},
)

with self.assertRaises(StepLimitExceeded):
find_route_without_turn_around(g, 1, 4, distance=euclidean_distance, step_limit=2)

0 comments on commit b4a6cb4

Please sign in to comment.