diff --git a/examples/warehouse/Readme.md b/examples/warehouse/Readme.md new file mode 100644 index 00000000..150d18b5 --- /dev/null +++ b/examples/warehouse/Readme.md @@ -0,0 +1,42 @@ +# Pseudo-Warehouse Model (Meta-Agent Example) + +## Summary + +The purpose of this model is to demonstrate Mesa's meta-agent capability and some of its implementation approaches, not to be an accurate warehouse simulation. + +**Overview of meta agent:** Complex systems often have multiple levels of components. A city is not a single entity, but it is made of districts,neighborhoods, buildings, and people. A forest comprises an ecosystem of trees, plants, animals, and microorganisms. An organization is not one entity, but is made of departments, sub-departments, and people. A person is not a single entity, but it is made of micro biomes, organs and cells. + +This reality is the motivation for meta-agents. It allows users to represent these multiple levels, where each level can have agents with sub-agents. + +In this simulation, robots are given tasks to take retrieve inventory items and then take those items to the loading docks. + +Each `RobotAgent` is made up of sub-components that are treated as separate agents. For this simulation, each robot as a `SensorAgent`, `RouterAgent`, and `WorkerAgent`. + +This model demonstrates deliberate meta-agent creation. It shows the basics of meta-agent creation and different ways to use and reference sub-agent and meta-agent functions and attributes. (The alliance formation demonstrates emergent meta-agent creation.) + +In its current configuration, agents being part of multiple meta-agents is not supported + +An additional item of note is that to reference the RobotAgent created in model you will see `type(self.RobotAgent)` or `type(model.RobotAgent)` in various places. If you have any ideas for how to make this more user friendly please let us know or do a pull request. + +## Installation + +This model requires Mesa's recommended install + +``` + $ pip install mesa[rec] +``` + +## How to Run + +To run the model interactively, in this directory, run the following command + +``` + $ solara run app.py +``` + +## Files + +- `model.py`: Contains creation of agents, the network and management of agent execution. +- `agents.py`: Contains logic for forming alliances and creation of new agents +- `app.py`: Contains the code for the interactive Solara visualization. +- `make_warehouse`: Generates a warehouse numpy array with loading docks, inventory, and charging stations. \ No newline at end of file diff --git a/examples/warehouse/__init__.py b/examples/warehouse/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/warehouse/agents.py b/examples/warehouse/agents.py new file mode 100644 index 00000000..06b088b7 --- /dev/null +++ b/examples/warehouse/agents.py @@ -0,0 +1,163 @@ +from queue import PriorityQueue + +import mesa +from mesa.discrete_space import FixedAgent + + +class InventoryAgent(FixedAgent): + """ + Represents an inventory item in the warehouse. + """ + + def __init__(self, model, cell, item: str): + super().__init__(model) + self.cell = cell + self.item = item + self.quantity = 1000 # Default quantity + + +class RouteAgent(mesa.Agent): + """ + Handles path finding for agents in the warehouse. + + Intended to be a pseudo onboard GPS system for robots. + """ + + def __init__(self, model): + super().__init__(model) + + def find_path(self, start, goal) -> list[tuple[int, int, int]] | None: + """ + Determines the path for a robot to take using the A* algorithm. + """ + + def heuristic(a, b) -> int: + dx = abs(a[0] - b[0]) + dy = abs(a[1] - b[1]) + return dx + dy + + open_set = PriorityQueue() + open_set.put((0, start.coordinate)) + came_from = {} + g_score = {start.coordinate: 0} + + while not open_set.empty(): + _, current = open_set.get() + + if current[:2] == goal.coordinate[:2]: + path = [] + while current in came_from: + path.append(current) + current = came_from[current] + path.reverse() + path.insert(0, start.coordinate) + path.pop() # Remove the last location (inventory) + return path + + for n_cell in self.model.warehouse[current].neighborhood: + coord = n_cell.coordinate + + # Only consider orthogonal neighbors + if abs(coord[0] - current[0]) + abs(coord[1] - current[1]) != 1: + continue + + tentative_g_score = g_score[current] + 1 + if not n_cell.is_empty: + tentative_g_score += 50 # Penalty for non-empty cells + + if coord not in g_score or tentative_g_score < g_score[coord]: + g_score[coord] = tentative_g_score + f_score = tentative_g_score + heuristic(coord, goal.coordinate) + open_set.put((f_score, coord)) + came_from[coord] = current + + return None + + +class SensorAgent(mesa.Agent): + """ + Detects entities in the area and handles movement along a path. + + Intended to be a pseudo onboard sensor system for robot. + """ + + def __init__(self, model): + super().__init__(model) + + def move( + self, coord: tuple[int, int, int], path: list[tuple[int, int, int]] + ) -> str: + """ + Moves the agent along the given path. + """ + if coord not in path: + raise ValueError("Current coordinate not in path.") + + idx = path.index(coord) + if idx + 1 >= len(path): + return "movement complete" + + next_cell = self.model.warehouse[path[idx + 1]] + if next_cell.is_empty: + self.meta_agent.cell = next_cell + return "moving" + + # Handle obstacle + neighbors = self.model.warehouse[self.meta_agent.cell.coordinate].neighborhood + empty_neighbors = [n for n in neighbors if n.is_empty] + if empty_neighbors: + self.meta_agent.cell = self.random.choice(empty_neighbors) + + # Recalculate path + new_path = self.meta_agent.get_constituting_agent_instance( + RouteAgent + ).find_path(self.meta_agent.cell, self.meta_agent.item.cell) + self.meta_agent.path = new_path + return "recalculating" + + +class WorkerAgent(mesa.Agent): + """ + Represents a robot worker responsible for collecting and loading items. + """ + + def __init__(self, model, ld, cs): + super().__init__(model) + self.loading_dock = ld + self.charging_station = cs + self.path: list[tuple[int, int, int]] | None = None + self.carrying: str | None = None + self.item: InventoryAgent | None = None + + def initiate_task(self, item: InventoryAgent): + """ + Initiates a task for the robot to perform. + """ + self.item = item + self.path = self.find_path(self.cell, item.cell) + + def continue_task(self): + """ + Continues the task if the robot is able to perform it. + """ + status = self.meta_agent.get_constituting_agent_instance(SensorAgent).move( + self.cell.coordinate, self.path + ) + + if status == "movement complete" and self.meta_agent.status == "inventory": + # Pick up item and bring to loading dock + self.meta_agent.cell = self.model.warehouse[ + *self.meta_agent.cell.coordinate[:2], self.item.cell.coordinate[2] + ] + self.meta_agent.status = "loading" + self.carrying = self.item.item + self.item.quantity -= 1 + self.meta_agent.cell = self.model.warehouse[ + *self.meta_agent.cell.coordinate[:2], 0 + ] + self.path = self.find_path(self.cell, self.loading_dock) + + if status == "movement complete" and self.meta_agent.status == "loading": + # Load item onto truck and return to charging station + self.carrying = None + self.meta_agent.status = "open" diff --git a/examples/warehouse/app.py b/examples/warehouse/app.py new file mode 100644 index 00000000..11821f98 --- /dev/null +++ b/examples/warehouse/app.py @@ -0,0 +1,125 @@ +import matplotlib.pyplot as plt +import pandas as pd +import solara +from agents import InventoryAgent +from mesa.visualization import SolaraViz +from mesa.visualization.utils import update_counter +from model import WarehouseModel + +# Constants +LOADING_DOCKS = [(0, 0, 0), (0, 2, 0), (0, 4, 0), (0, 6, 0), (0, 8, 0)] +AXIS_LIMITS = {"x": (0, 22), "y": (0, 20), "z": (0, 5)} + +model_params = { + "seed": { + "type": "InputText", + "value": 42, + "label": "Random Seed", + }, +} + + +def prepare_agent_data(model, agent_type, agent_label): + """ + Prepare data for agents of a specific type. + + Args: + model: The WarehouseModel instance. + agent_type: The type of agent (e.g., "InventoryAgent", "RobotAgent"). + agent_label: The label for the agent type. + + Returns: + A list of dictionaries containing agent coordinates and type. + """ + return [ + { + "x": agent.cell.coordinate[0], + "y": agent.cell.coordinate[1], + "z": agent.cell.coordinate[2], + "type": agent_label, + } + for agent in model.agents_by_type[agent_type] + ] + + +@solara.component +def plot_warehouse(model): + """ + Visualize the warehouse model in a 3D scatter plot. + + Args: + model: The WarehouseModel instance. + """ + update_counter.get() + + # Prepare data for inventory and robot agents + inventory_data = prepare_agent_data(model, InventoryAgent, "Inventory") + robot_data = prepare_agent_data(model, type(model.RobotAgent), "Robot") + + # Combine data into a single DataFrame + data = pd.DataFrame(inventory_data + robot_data) + + # Create Matplotlib 3D scatter plot + fig = plt.figure(figsize=(8, 6)) + ax = fig.add_subplot(111, projection="3d") + + # Highlight loading dock cells + for i, dock in enumerate(LOADING_DOCKS): + ax.scatter( + dock[0], + dock[1], + dock[2], + c="yellow", + label="Loading Dock" + if i == 0 + else None, # Add label only to the first dock + s=300, + marker="o", + ) + + # Plot inventory agents + inventory = data[data["type"] == "Inventory"] + ax.scatter( + inventory["x"], + inventory["y"], + inventory["z"], + c="blue", + label="Inventory", + s=100, + marker="s", + ) + + # Plot robot agents + robots = data[data["type"] == "Robot"] + ax.scatter(robots["x"], robots["y"], robots["z"], c="red", label="Robot", s=200) + + # Set labels, title, and legend + ax.set_xlabel("X") + ax.set_ylabel("Y") + ax.set_zlabel("Z") + ax.set_title("Warehouse Visualization") + ax.legend() + + # Configure plot appearance + ax.grid(False) + ax.set_xlim(*AXIS_LIMITS["x"]) + ax.set_ylim(*AXIS_LIMITS["y"]) + ax.set_zlim(*AXIS_LIMITS["z"]) + ax.axis("off") + + # Render the plot in Solara + solara.FigureMatplotlib(fig) + + +# Create initial model instance +model = WarehouseModel() + +# Create the SolaraViz page +page = SolaraViz( + model, + components=[plot_warehouse], + model_params=model_params, + name="Pseudo-Warehouse Model", +) + +page # noqa diff --git a/examples/warehouse/make_warehouse.py b/examples/warehouse/make_warehouse.py new file mode 100644 index 00000000..bbd5a8d3 --- /dev/null +++ b/examples/warehouse/make_warehouse.py @@ -0,0 +1,52 @@ +import random +import string + +import numpy as np + +# Constants +DEFAULT_ROWS = 22 +DEFAULT_COLS = 20 +DEFAULT_HEIGHT = 4 +LOADING_DOCK_COORDS = [(0, i, 0) for i in range(0, 10, 2)] +CHARGING_STATION_COORDS = [(21, i, 0) for i in range(19, 10, -2)] + + +def generate_item_code() -> str: + """Generate a random item code (1 letter + 2 numbers).""" + letter = random.choice(string.ascii_uppercase) + number = random.randint(10, 99) + return f"{letter}{number}" + + +def make_warehouse( + rows: int = DEFAULT_ROWS, cols: int = DEFAULT_COLS, height: int = DEFAULT_HEIGHT +) -> np.ndarray: + """ + Generate a warehouse layout with designated LD, CS, and storage rows as a NumPy array. + + Args: + rows (int): Number of rows in the warehouse. + cols (int): Number of columns in the warehouse. + height (int): Number of levels in the warehouse. + + Returns: + np.ndarray: A 3D NumPy array representing the warehouse layout. + """ + # Initialize empty warehouse layout + warehouse = np.full((rows, cols, height), " ", dtype=object) + + # Place Loading Docks (LD) + for r, c, h in LOADING_DOCK_COORDS: + warehouse[r, c, h] = "LD" + + # Place Charging Stations (CS) + for r, c, h in CHARGING_STATION_COORDS: + warehouse[r, c, h] = "CS" + + # Fill storage rows with item codes + for r in range(3, rows - 2, 3): # Skip row 0,1,2 (LD) and row 17,18,19 (CS) + for c in range(2, cols, 3): # Leave 2 spaces between each item row + for h in range(height): + warehouse[r, c, h] = generate_item_code() + + return warehouse diff --git a/examples/warehouse/model.py b/examples/warehouse/model.py new file mode 100644 index 00000000..3fd84f8f --- /dev/null +++ b/examples/warehouse/model.py @@ -0,0 +1,109 @@ +import mesa +from agents import ( + InventoryAgent, + RouteAgent, + SensorAgent, + WorkerAgent, +) +from make_warehouse import make_warehouse +from mesa.discrete_space import OrthogonalMooreGrid +from mesa.discrete_space.cell_agent import CellAgent +from mesa.experimental.meta_agents.meta_agent import create_meta_agent + +# Constants for configuration +LOADING_DOCKS = [(0, 0, 0), (0, 2, 0), (0, 4, 0), (0, 6, 0), (0, 8, 0)] +CHARGING_STATIONS = [ + (21, 19, 0), + (21, 17, 0), + (21, 15, 0), + (21, 13, 0), + (21, 11, 0), +] +INVENTORY_START_ROW_OFFSET = 3 + + +class WarehouseModel(mesa.Model): + """ + Model for simulating warehouse management with autonomous systems where + each autonomous system (e.g., robot) is made of numerous smaller agents + (e.g., routing, sensors, etc.). + """ + + def __init__(self, seed=42): + """ + Initialize the model. + + Args: + seed (int): Random seed. + """ + super().__init__(seed=seed) + self.inventory = {} + self.loading_docks = LOADING_DOCKS + self.charging_stations = CHARGING_STATIONS + + # Create warehouse and instantiate grid + layout = make_warehouse() + self.warehouse = OrthogonalMooreGrid( + (layout.shape[0], layout.shape[1], layout.shape[2]), + torus=False, + capacity=1, + random=self.random, + ) + + # Create Inventory Agents + for row in range( + INVENTORY_START_ROW_OFFSET, layout.shape[0] - INVENTORY_START_ROW_OFFSET + ): + for col in range(layout.shape[1]): + for height in range(layout.shape[2]): + if layout[row][col][height].strip(): + item = layout[row][col][height] + InventoryAgent(self, self.warehouse[row, col, height], item) + + # Create Robot Agents + for idx in range(len(self.loading_docks)): + # Create constituting_agents + router = RouteAgent(self) + sensor = SensorAgent(self) + worker = WorkerAgent( + self, + self.warehouse[self.loading_docks[idx]], + self.warehouse[self.charging_stations[idx]], + ) + + # Create meta-agent and place in warehouse + self.RobotAgent = create_meta_agent( + self, + "RobotAgent", + [router, sensor, worker], + CellAgent, + meta_attributes={ + "cell": self.warehouse[self.charging_stations[idx]], + "status": "open", + }, + assume_constituting_agent_attributes=True, + assume_constituting_agent_methods=True, + ) + + def central_move(self, robot): + """ + Consolidates meta-agent behavior in the model class. + + Args: + robot: The robot meta-agent to move. + """ + robot.move(robot.cell.coordinate, robot.path) + + def step(self): + """ + Advance the model by one step. + """ + for robot in self.agents_by_type[type(self.RobotAgent)]: + if robot.status == "open": # Assign a task to the robot + item = self.random.choice(self.agents_by_type[InventoryAgent]) + if item.quantity > 0: + robot.initiate_task(item) + robot.status = "inventory" + self.central_move(robot) + else: + robot.continue_task()