Skip to content

add meta-agent warehouse examples #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions examples/warehouse/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Pseudo-Warehouse Model (Meta-Agent Example)

## Summary

The purpose of this model is to demonstrate Mesa's meta-agent capability and some of its implementation approaches, not to be an accurate warehouse simulation.

**Overview of meta agent:** Complex systems often have multiple levels of components. A city is not a single entity, but it is made of districts,neighborhoods, buildings, and people. A forest comprises an ecosystem of trees, plants, animals, and microorganisms. An organization is not one entity, but is made of departments, sub-departments, and people. A person is not a single entity, but it is made of micro biomes, organs and cells.

This reality is the motivation for meta-agents. It allows users to represent these multiple levels, where each level can have agents with sub-agents.

In this simulation, robots are given tasks to take retrieve inventory items and then take those items to the loading docks.

Each `RobotAgent` is made up of sub-components that are treated as separate agents. For this simulation, each robot as a `SensorAgent`, `RouterAgent`, and `WorkerAgent`.

This model demonstrates deliberate meta-agent creation. It shows the basics of meta-agent creation and different ways to use and reference sub-agent and meta-agent functions and attributes. (The alliance formation demonstrates emergent meta-agent creation.)

In its current configuration, agents being part of multiple meta-agents is not supported

An additional item of note is that to reference the RobotAgent created in model you will see `type(self.RobotAgent)` or `type(model.RobotAgent)` in various places. If you have any ideas for how to make this more user friendly please let us know or do a pull request.

## Installation

This model requires Mesa's recommended install

```
$ pip install mesa[rec]
```

## How to Run

To run the model interactively, in this directory, run the following command

```
$ solara run app.py
```

## Files

- `model.py`: Contains creation of agents, the network and management of agent execution.
- `agents.py`: Contains logic for forming alliances and creation of new agents
- `app.py`: Contains the code for the interactive Solara visualization.
- `make_warehouse`: Generates a warehouse numpy array with loading docks, inventory, and charging stations.
Empty file added examples/warehouse/__init__.py
Empty file.
163 changes: 163 additions & 0 deletions examples/warehouse/agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from queue import PriorityQueue

import mesa
from mesa.discrete_space import FixedAgent


class InventoryAgent(FixedAgent):
"""
Represents an inventory item in the warehouse.
"""

def __init__(self, model, cell, item: str):
super().__init__(model)
self.cell = cell
self.item = item
self.quantity = 1000 # Default quantity


class RouteAgent(mesa.Agent):
"""
Handles path finding for agents in the warehouse.

Intended to be a pseudo onboard GPS system for robots.
"""

def __init__(self, model):
super().__init__(model)

def find_path(self, start, goal) -> list[tuple[int, int, int]] | None:
"""
Determines the path for a robot to take using the A* algorithm.
"""

def heuristic(a, b) -> int:
dx = abs(a[0] - b[0])
dy = abs(a[1] - b[1])
return dx + dy

open_set = PriorityQueue()
open_set.put((0, start.coordinate))
came_from = {}
g_score = {start.coordinate: 0}

while not open_set.empty():
_, current = open_set.get()

if current[:2] == goal.coordinate[:2]:
path = []
while current in came_from:
path.append(current)
current = came_from[current]
path.reverse()
path.insert(0, start.coordinate)
path.pop() # Remove the last location (inventory)
return path

for n_cell in self.model.warehouse[current].neighborhood:
coord = n_cell.coordinate

# Only consider orthogonal neighbors
if abs(coord[0] - current[0]) + abs(coord[1] - current[1]) != 1:
continue

tentative_g_score = g_score[current] + 1
if not n_cell.is_empty:
tentative_g_score += 50 # Penalty for non-empty cells

if coord not in g_score or tentative_g_score < g_score[coord]:
g_score[coord] = tentative_g_score
f_score = tentative_g_score + heuristic(coord, goal.coordinate)
open_set.put((f_score, coord))
came_from[coord] = current

return None


class SensorAgent(mesa.Agent):
"""
Detects entities in the area and handles movement along a path.

Intended to be a pseudo onboard sensor system for robot.
"""

def __init__(self, model):
super().__init__(model)

def move(
self, coord: tuple[int, int, int], path: list[tuple[int, int, int]]
) -> str:
"""
Moves the agent along the given path.
"""
if coord not in path:
raise ValueError("Current coordinate not in path.")

idx = path.index(coord)
if idx + 1 >= len(path):
return "movement complete"

next_cell = self.model.warehouse[path[idx + 1]]
if next_cell.is_empty:
self.meta_agent.cell = next_cell
return "moving"

# Handle obstacle
neighbors = self.model.warehouse[self.meta_agent.cell.coordinate].neighborhood
empty_neighbors = [n for n in neighbors if n.is_empty]
if empty_neighbors:
self.meta_agent.cell = self.random.choice(empty_neighbors)

# Recalculate path
new_path = self.meta_agent.get_constituting_agent_instance(
RouteAgent
).find_path(self.meta_agent.cell, self.meta_agent.item.cell)
self.meta_agent.path = new_path
return "recalculating"


class WorkerAgent(mesa.Agent):
"""
Represents a robot worker responsible for collecting and loading items.
"""

def __init__(self, model, ld, cs):
super().__init__(model)
self.loading_dock = ld
self.charging_station = cs
self.path: list[tuple[int, int, int]] | None = None
self.carrying: str | None = None
self.item: InventoryAgent | None = None

def initiate_task(self, item: InventoryAgent):
"""
Initiates a task for the robot to perform.
"""
self.item = item
self.path = self.find_path(self.cell, item.cell)

def continue_task(self):
"""
Continues the task if the robot is able to perform it.
"""
status = self.meta_agent.get_constituting_agent_instance(SensorAgent).move(
self.cell.coordinate, self.path
)

if status == "movement complete" and self.meta_agent.status == "inventory":
# Pick up item and bring to loading dock
self.meta_agent.cell = self.model.warehouse[
*self.meta_agent.cell.coordinate[:2], self.item.cell.coordinate[2]
]
self.meta_agent.status = "loading"
self.carrying = self.item.item
self.item.quantity -= 1
self.meta_agent.cell = self.model.warehouse[
*self.meta_agent.cell.coordinate[:2], 0
]
self.path = self.find_path(self.cell, self.loading_dock)

if status == "movement complete" and self.meta_agent.status == "loading":
# Load item onto truck and return to charging station
self.carrying = None
self.meta_agent.status = "open"
125 changes: 125 additions & 0 deletions examples/warehouse/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import matplotlib.pyplot as plt
import pandas as pd
import solara
from agents import InventoryAgent
from mesa.visualization import SolaraViz
from mesa.visualization.utils import update_counter
from model import WarehouseModel

# Constants
LOADING_DOCKS = [(0, 0, 0), (0, 2, 0), (0, 4, 0), (0, 6, 0), (0, 8, 0)]
AXIS_LIMITS = {"x": (0, 22), "y": (0, 20), "z": (0, 5)}

model_params = {
"seed": {
"type": "InputText",
"value": 42,
"label": "Random Seed",
},
}


def prepare_agent_data(model, agent_type, agent_label):
"""
Prepare data for agents of a specific type.

Args:
model: The WarehouseModel instance.
agent_type: The type of agent (e.g., "InventoryAgent", "RobotAgent").
agent_label: The label for the agent type.

Returns:
A list of dictionaries containing agent coordinates and type.
"""
return [
{
"x": agent.cell.coordinate[0],
"y": agent.cell.coordinate[1],
"z": agent.cell.coordinate[2],
"type": agent_label,
}
for agent in model.agents_by_type[agent_type]
]


@solara.component
def plot_warehouse(model):
"""
Visualize the warehouse model in a 3D scatter plot.

Args:
model: The WarehouseModel instance.
"""
update_counter.get()

# Prepare data for inventory and robot agents
inventory_data = prepare_agent_data(model, InventoryAgent, "Inventory")
robot_data = prepare_agent_data(model, type(model.RobotAgent), "Robot")

# Combine data into a single DataFrame
data = pd.DataFrame(inventory_data + robot_data)

# Create Matplotlib 3D scatter plot
fig = plt.figure(figsize=(8, 6))
ax = fig.add_subplot(111, projection="3d")

# Highlight loading dock cells
for i, dock in enumerate(LOADING_DOCKS):
ax.scatter(
dock[0],
dock[1],
dock[2],
c="yellow",
label="Loading Dock"
if i == 0
else None, # Add label only to the first dock
s=300,
marker="o",
)

# Plot inventory agents
inventory = data[data["type"] == "Inventory"]
ax.scatter(
inventory["x"],
inventory["y"],
inventory["z"],
c="blue",
label="Inventory",
s=100,
marker="s",
)

# Plot robot agents
robots = data[data["type"] == "Robot"]
ax.scatter(robots["x"], robots["y"], robots["z"], c="red", label="Robot", s=200)

# Set labels, title, and legend
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Z")
ax.set_title("Warehouse Visualization")
ax.legend()

# Configure plot appearance
ax.grid(False)
ax.set_xlim(*AXIS_LIMITS["x"])
ax.set_ylim(*AXIS_LIMITS["y"])
ax.set_zlim(*AXIS_LIMITS["z"])
ax.axis("off")

# Render the plot in Solara
solara.FigureMatplotlib(fig)


# Create initial model instance
model = WarehouseModel()

# Create the SolaraViz page
page = SolaraViz(
model,
components=[plot_warehouse],
model_params=model_params,
name="Pseudo-Warehouse Model",
)

page # noqa
52 changes: 52 additions & 0 deletions examples/warehouse/make_warehouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import random
import string

import numpy as np

# Constants
DEFAULT_ROWS = 22
DEFAULT_COLS = 20
DEFAULT_HEIGHT = 4
LOADING_DOCK_COORDS = [(0, i, 0) for i in range(0, 10, 2)]
CHARGING_STATION_COORDS = [(21, i, 0) for i in range(19, 10, -2)]


def generate_item_code() -> str:
"""Generate a random item code (1 letter + 2 numbers)."""
letter = random.choice(string.ascii_uppercase)
number = random.randint(10, 99)
return f"{letter}{number}"


def make_warehouse(
rows: int = DEFAULT_ROWS, cols: int = DEFAULT_COLS, height: int = DEFAULT_HEIGHT
) -> np.ndarray:
"""
Generate a warehouse layout with designated LD, CS, and storage rows as a NumPy array.

Args:
rows (int): Number of rows in the warehouse.
cols (int): Number of columns in the warehouse.
height (int): Number of levels in the warehouse.

Returns:
np.ndarray: A 3D NumPy array representing the warehouse layout.
"""
# Initialize empty warehouse layout
warehouse = np.full((rows, cols, height), " ", dtype=object)

# Place Loading Docks (LD)
for r, c, h in LOADING_DOCK_COORDS:
warehouse[r, c, h] = "LD"

# Place Charging Stations (CS)
for r, c, h in CHARGING_STATION_COORDS:
warehouse[r, c, h] = "CS"

# Fill storage rows with item codes
for r in range(3, rows - 2, 3): # Skip row 0,1,2 (LD) and row 17,18,19 (CS)
for c in range(2, cols, 3): # Leave 2 spaces between each item row
for h in range(height):
warehouse[r, c, h] = generate_item_code()

return warehouse
Loading
Loading