From c2c71e93e12f196b3a4c5043aa2cd0af79cf9039 Mon Sep 17 00:00:00 2001 From: Abraham Flaxman Date: Wed, 1 Jan 2025 10:51:30 -0800 Subject: [PATCH] isort, black --- .../components/boundaries.py | 190 +++++++------ .../components/particles.py | 260 +++++++++++------- .../components/visualizer.py | 220 ++++++++------- 3 files changed, 383 insertions(+), 287 deletions(-) diff --git a/src/vivarium_eye_vessels/components/boundaries.py b/src/vivarium_eye_vessels/components/boundaries.py index 5316af0..fd5df74 100644 --- a/src/vivarium_eye_vessels/components/boundaries.py +++ b/src/vivarium_eye_vessels/components/boundaries.py @@ -1,104 +1,115 @@ from typing import Dict, List, Protocol + import numpy as np import pandas as pd from vivarium import Component from vivarium.framework.engine import Builder + class ForceCalculator(Protocol): """Protocol defining the interface for force calculation strategies""" + def calculate_force_magnitude(self, distances: np.ndarray) -> np.ndarray: """Calculate force magnitudes based on distances""" pass + class HookeanForce: """Implements Hooke's law force calculation""" + def __init__(self, spring_constant: float): self.spring_constant = spring_constant - + def calculate_force_magnitude(self, distances: np.ndarray) -> np.ndarray: return self.spring_constant * distances + class MagneticForce: """Implements inverse square law force calculation""" + def __init__(self, magnetic_strength: float, min_distance: float): self.magnetic_strength = magnetic_strength self.min_distance = min_distance - + def calculate_force_magnitude(self, distances: np.ndarray) -> np.ndarray: capped_distances = np.maximum(distances, self.min_distance) return self.magnetic_strength / (capped_distances * capped_distances) + class BaseForceComponent(Component): """Base class for force-based components with shared caching logic""" - + @property def columns_required(self) -> List[str]: return ["x", "y", "z", "frozen"] - + @property def filter_str(self) -> str: return "not frozen" - + def setup(self, builder: Builder) -> None: self.force_cache = {} self.clock = builder.time.clock() - + # Register force modifiers - for axis in ['x', 'y', 'z']: + for axis in ["x", "y", "z"]: builder.value.register_value_modifier( f"particle.force.{axis}", modifier=getattr(self, f"force_{axis}"), - requires_columns=self.columns_required + requires_columns=self.columns_required, ) def setup_force_calculator(self, config: Dict) -> None: # Set up force calculator if config.force_type == "magnetic": self.force_calculator = MagneticForce( - float(config.magnetic_strength), - float(config.min_distance) + float(config.magnetic_strength), float(config.min_distance) ) else: self.force_calculator = HookeanForce(float(config.spring_constant)) - - + def get_cached_forces(self, index: pd.Index) -> np.ndarray: """Get cached forces or calculate them if needed""" current_time = self.clock() cache_key = (current_time, tuple(index)) - + if cache_key not in self.force_cache: pop = self.population_view.get(index) active_particles = pop.query(self.filter_str) - + if active_particles.empty: self.force_cache[cache_key] = np.zeros((len(index), 3)) else: forces = np.zeros((len(index), 3)) active_forces = self.calculate_forces_vectorized(active_particles) - forces[active_particles.index.get_indexer(active_particles.index)] = active_forces + forces[ + active_particles.index.get_indexer(active_particles.index) + ] = active_forces self.force_cache[cache_key] = forces - + # Clear old cache entries - self.force_cache = {k: v for k, v in self.force_cache.items() if k[0] == current_time} - + self.force_cache = { + k: v for k, v in self.force_cache.items() if k[0] == current_time + } + return self.force_cache[cache_key] - + def force_x(self, index: pd.Index, forces: pd.Series) -> pd.Series: forces += pd.Series(self.get_cached_forces(index)[:, 0], index=index) return forces - + def force_y(self, index: pd.Index, forces: pd.Series) -> pd.Series: forces += pd.Series(self.get_cached_forces(index)[:, 1], index=index) return forces - + def force_z(self, index: pd.Index, forces: pd.Series) -> pd.Series: forces += pd.Series(self.get_cached_forces(index)[:, 2], index=index) return forces + class EllipsoidContainment(BaseForceComponent): """Component that keeps particles within an ellipsoid boundary""" - + CONFIGURATION_DEFAULTS = { "ellipsoid_containment": { "a": 1.0, @@ -123,7 +134,7 @@ def setup(self, builder: Builder) -> None: self.a2 = self.a * self.a self.b2 = self.b * self.b self.c2 = self.c * self.c - + def calculate_forces_vectorized(self, particles: pd.DataFrame) -> np.ndarray: positions = particles[["x", "y", "z"]].to_numpy() @@ -131,40 +142,39 @@ def calculate_forces_vectorized(self, particles: pd.DataFrame) -> np.ndarray: x_norm = positions[:, 0] / self.a y_norm = positions[:, 1] / self.b z_norm = positions[:, 2] / self.c - + # Calculate ellipsoid equation value ellipsoid_val = x_norm**2 + y_norm**2 + z_norm**2 - + # Calculate gradient components - grad = np.column_stack([ - 2 * x_norm / self.a, - 2 * y_norm / self.b, - 2 * z_norm / self.c - ]) - + grad = np.column_stack( + [2 * x_norm / self.a, 2 * y_norm / self.b, 2 * z_norm / self.c] + ) + # Initialize forces array forces = np.zeros_like(positions) outside_mask = ellipsoid_val > 1 - + if np.any(outside_mask): grad_outside = grad[outside_mask] grad_norms = np.linalg.norm(grad_outside, axis=1, keepdims=True) normalized_grads = grad_outside / grad_norms - + # Calculate distances from surface distances = np.sqrt(ellipsoid_val[outside_mask]) - 1 - + # Calculate force magnitudes using the selected force calculator force_magnitudes = self.force_calculator.calculate_force_magnitude(distances) - + # Calculate final forces forces[outside_mask] = -normalized_grads * force_magnitudes[:, np.newaxis] - + return forces + class CylinderExclusion(BaseForceComponent): """Component that repels particles from inside a cylindrical exclusion zone""" - + CONFIGURATION_DEFAULTS = { "cylinder_exclusion": { "radius": 1.0, @@ -176,25 +186,26 @@ class CylinderExclusion(BaseForceComponent): "spring_constant": 0.1, } } - + def setup(self, builder: Builder) -> None: super().setup(builder) - + config = builder.configuration.cylinder_exclusion self.setup_force_calculator(config) - + # Set up geometry parameters self.radius = float(config.radius) self.center = np.array(config.center, dtype=float) self.direction = np.array(config.direction, dtype=float) self.direction /= np.linalg.norm(self.direction) - + # Pre-compute random perpendicular vector - random_perpendicular = np.array([1, 0, 0]) if abs(self.direction[0]) < 0.9 else np.array([0, 1, 0]) + random_perpendicular = ( + np.array([1, 0, 0]) if abs(self.direction[0]) < 0.9 else np.array([0, 1, 0]) + ) self.default_outward = np.cross(self.direction, random_perpendicular) self.default_outward /= np.linalg.norm(self.default_outward) - - + def calculate_forces_vectorized(self, particles: pd.DataFrame) -> np.ndarray: positions = particles[["x", "y", "z"]].to_numpy() @@ -204,62 +215,56 @@ def calculate_forces_vectorized(self, particles: pd.DataFrame) -> np.ndarray: axial_components = axial_dots[:, np.newaxis] * self.direction radial_vectors = rel_positions - axial_components radial_distances = np.linalg.norm(radial_vectors, axis=1) - + # Calculate penetration depths penetrations = self.radius - radial_distances - + # Handle points on axis mask_on_axis = radial_distances < 1e-10 outward_directions = np.zeros_like(positions) outward_directions[mask_on_axis] = self.default_outward - + # Calculate outward directions for off-axis points mask_off_axis = ~mask_on_axis outward_directions[mask_off_axis] = ( - radial_vectors[mask_off_axis] / - radial_distances[mask_off_axis, np.newaxis] + radial_vectors[mask_off_axis] / radial_distances[mask_off_axis, np.newaxis] ) - + # Apply forces only inside cylinder mask_inside = penetrations > 0 force_magnitudes = np.zeros_like(radial_distances) force_magnitudes[mask_inside] = self.force_calculator.calculate_force_magnitude( penetrations[mask_inside] ) - + return outward_directions * force_magnitudes[:, np.newaxis] + class PointRepulsion(BaseForceComponent): """Component that creates a point-based repulsion force""" - + CONFIGURATION_DEFAULTS = { "point_repulsion": { - "position": { - "x": 0.0, - "y": 0.0, - "z": 0.0 - }, - "force_type": "magnetic", # or "hookean" + "position": {"x": 0.0, "y": 0.0, "z": 0.0}, + "force_type": "magnetic", # or "hookean" "magnetic_strength": 0.05, "min_distance": 0.1, "spring_constant": 0.1, "radius": 0.05, # Interaction radius } } - + def setup(self, builder: Builder) -> None: super().setup(builder) - + config = builder.configuration.point_repulsion self.setup_force_calculator(config) - self.position = np.array([ - float(config.position.x), - float(config.position.y), - float(config.position.z) - ]) + self.position = np.array( + [float(config.position.x), float(config.position.y), float(config.position.z)] + ) self.radius = float(config.radius) - + def calculate_forces_vectorized(self, particles: pd.DataFrame) -> np.ndarray: positions = particles[["x", "y", "z"]].to_numpy() @@ -267,21 +272,22 @@ def calculate_forces_vectorized(self, particles: pd.DataFrame) -> np.ndarray: displacements = self.position - positions distances = np.sqrt(np.sum(displacements**2, axis=1)) distances = np.where(distances > self.radius, 0, distances) - + # Calculate normalized directions - with np.errstate(invalid='ignore', divide='ignore'): + with np.errstate(invalid="ignore", divide="ignore"): direction_vectors = displacements / distances[:, np.newaxis] direction_vectors = np.nan_to_num(direction_vectors) - + # Calculate force magnitudes using selected force calculator force_magnitudes = self.force_calculator.calculate_force_magnitude(distances) - + # Return repulsive forces return -direction_vectors * force_magnitudes[:, np.newaxis] + class FrozenRepulsion(BaseForceComponent): """Component that repels active particles from frozen particles using spatial indexing""" - + CONFIGURATION_DEFAULTS = { "frozen_repulsion": { "interaction_radius": 0.2, @@ -325,13 +331,13 @@ def on_time_step(self, event) -> None: for i, frozen_neighbors in enumerate(neighbor_lists): # Calculate displacement vectors from frozen particles frozen = self.freezer.get_population(frozen_neighbors) - frozen = (((frozen.path_id == particles.iloc[i].path_id) - & ((self.clock() - frozen.freeze_time)/pd.Timedelta(days=1) > self.delay)) - | (frozen.path_id != particles.iloc[i].path_id) - ) + frozen = ( + (frozen.path_id == particles.iloc[i].path_id) + & ((self.clock() - frozen.freeze_time) / pd.Timedelta(days=1) > self.delay) + ) | (frozen.path_id != particles.iloc[i].path_id) if frozen.sum() > 0: to_freeze.append(particles.index[i]) - + if len(to_freeze) > 0: to_freeze = particles.loc[to_freeze] to_freeze.loc[:, "frozen"] = True @@ -339,40 +345,44 @@ def on_time_step(self, event) -> None: to_freeze.loc[:, "path_id"] = -1 # Mark as end of path self.population_view.update(to_freeze) - def calculate_forces_vectorized(self, particles: pd.DataFrame) -> np.ndarray: """Calculate repulsion forces from frozen particles""" positions = particles[["x", "y", "z"]].to_numpy() forces = np.zeros_like(positions) neighbor_lists = self.freezer.query_radius(positions, self.interaction_radius) - + if neighbor_lists is None: return forces - + for i, frozen_neighbors in enumerate(neighbor_lists): # Calculate displacement vectors from frozen particles frozen = self.freezer.get_population(frozen_neighbors) - frozen = frozen[((frozen.path_id == particles.iloc[i].path_id) - & ((self.clock() - frozen.freeze_time)/pd.Timedelta(days=1) > self.delay)) - | (frozen.path_id != particles.iloc[i].path_id)] + frozen = frozen[ + ( + (frozen.path_id == particles.iloc[i].path_id) + & ( + (self.clock() - frozen.freeze_time) / pd.Timedelta(days=1) + > self.delay + ) + ) + | (frozen.path_id != particles.iloc[i].path_id) + ] frozen_neighbor_positions = frozen[["x", "y", "z"]].to_numpy() displacements = positions[i] - frozen_neighbor_positions - + # Calculate distances distances = np.sqrt(np.sum(displacements**2, axis=1)) - + # Calculate normalized direction vectors - with np.errstate(invalid='ignore', divide='ignore'): + with np.errstate(invalid="ignore", divide="ignore"): directions = displacements / distances[:, np.newaxis] directions = np.nan_to_num(directions) - + # Calculate and sum forces from all frozen neighbors force_magnitudes = self.force_calculator.calculate_force_magnitude( self.interaction_radius - distances ) - forces[i] = np.sum( - directions * force_magnitudes[:, np.newaxis], axis=0 - ) - + forces[i] = np.sum(directions * force_magnitudes[:, np.newaxis], axis=0) + return forces diff --git a/src/vivarium_eye_vessels/components/particles.py b/src/vivarium_eye_vessels/components/particles.py index 801f1fc..54fe8f7 100644 --- a/src/vivarium_eye_vessels/components/particles.py +++ b/src/vivarium_eye_vessels/components/particles.py @@ -2,9 +2,8 @@ import numpy as np import pandas as pd -from scipy.stats import norm from scipy.spatial import cKDTree - +from scipy.stats import norm from vivarium import Component from vivarium.framework.engine import Builder from vivarium.framework.event import Event @@ -17,9 +16,14 @@ class Particle3D(Component): @property def columns_created(self) -> List[str]: return [ - "x", "y", "z", - "vx", "vy", "vz", - "frozen", "freeze_time", + "x", + "y", + "z", + "vx", + "vy", + "vz", + "frozen", + "freeze_time", "depth", "parent_id", "path_id", @@ -29,7 +33,7 @@ def columns_created(self) -> List[str]: "particles": { "overall_max_velocity_change": 0.1, "initial_velocity_range": (-0.05, 0.05), - "terminal_velocity": 0.2, # Maximum allowed velocity magnitude + "terminal_velocity": 0.2, # Maximum allowed velocity magnitude "initial_circle": {"center": [1.5, 0.0, 0.5], "radius": 0.1, "n_vessels": 5}, } } @@ -63,7 +67,7 @@ def setup_scale(self, builder): a = float(config.a) b = float(config.b) c = float(config.c) - self.scale = np.array([a,b,c]) + self.scale = np.array([a, b, c]) else: self.scale = np.ones(3) @@ -71,23 +75,20 @@ def register_force_pipelines(self, builder: Builder) -> None: """Register pipelines for force components and total magnitude.""" # Register individual force component pipelines self.force_x = builder.value.register_value_producer( - "particle.force.x", - source=lambda index: pd.Series(0.0, index=index) + "particle.force.x", source=lambda index: pd.Series(0.0, index=index) ) self.force_y = builder.value.register_value_producer( - "particle.force.y", - source=lambda index: pd.Series(0.0, index=index) + "particle.force.y", source=lambda index: pd.Series(0.0, index=index) ) self.force_z = builder.value.register_value_producer( - "particle.force.z", - source=lambda index: pd.Series(0.0, index=index) + "particle.force.z", source=lambda index: pd.Series(0.0, index=index) ) # Register total force magnitude pipeline self.force_magnitude = builder.value.register_value_producer( "particle.force.magnitude", source=self.get_force_magnitude, - requires_values=['particle.force.x', 'particle.force.y', 'particle.force.z'] + requires_values=["particle.force.x", "particle.force.y", "particle.force.z"], ) def get_force_magnitude(self, index: pd.Index) -> pd.Series: @@ -102,14 +103,18 @@ def on_initialize_simulants(self, simulant_data: SimulantData) -> None: pop = pd.DataFrame(index=simulant_data.index) # Generate 3D normal points using ppf (inverse CDF) - points = np.column_stack([ - norm.ppf(self.randomness.get_draw(pop.index, additional_key=f'xyz_{i}')) - for i in range(3) - ]) - + points = np.column_stack( + [ + norm.ppf(self.randomness.get_draw(pop.index, additional_key=f"xyz_{i}")) + for i in range(3) + ] + ) + # Normalize and scale by random radius points /= np.linalg.norm(points, axis=1)[:, np.newaxis] - radii = np.array(self.randomness.get_draw(pop.index, additional_key='radius'))**(1/3) + radii = np.array(self.randomness.get_draw(pop.index, additional_key="radius")) ** ( + 1 / 3 + ) points *= radii[:, np.newaxis] pop[["x", "y", "z"]] = points * self.scale @@ -150,7 +155,9 @@ def initialize_circle_positions(self, pop: pd.DataFrame) -> None: center[2], ] pop.loc[i, "path_id"] = i - pop.loc[i, ["vz"]] = [0,] + pop.loc[i, ["vz"]] = [ + 0, + ] def on_time_step(self, event: Event) -> None: """Update positions and velocities of non-frozen particles and track blocking forces.""" @@ -177,8 +184,13 @@ def update_positions(self, particles: pd.DataFrame) -> None: # Update velocities with random changes and forces for i, (v, f) in enumerate(zip(["vx", "vy", "vz"], [fx, fy, fz])): # Random velocity change - dv = (self.randomness.get_draw(particles.index, additional_key=f"d{v}") - 0.5) * 2 * max_velocity_change * self.scale[i] - + dv = ( + (self.randomness.get_draw(particles.index, additional_key=f"d{v}") - 0.5) + * 2 + * max_velocity_change + * self.scale[i] + ) + # Add force contribution to velocity particles.loc[:, v] += (dv + f) * self.step_size @@ -186,7 +198,7 @@ def update_positions(self, particles: pd.DataFrame) -> None: velocity_vectors = particles[["vx", "vy", "vz"]].to_numpy() / self.scale velocities_magnitude = np.linalg.norm(velocity_vectors, axis=1) over_limit = velocities_magnitude > self.terminal_velocity - + if np.any(over_limit): # Scale down velocity components to satisfy terminal velocity scale_factors = self.terminal_velocity / velocities_magnitude[over_limit] @@ -208,10 +220,17 @@ class PathFreezer(Component): @property def columns_required(self) -> List[str]: return [ - "x", "y", "z", - "vx", "vy", "vz", - "frozen", "freeze_time", "depth", - "parent_id", "path_id", + "x", + "y", + "z", + "vx", + "vy", + "vz", + "frozen", + "freeze_time", + "depth", + "parent_id", + "path_id", ] def setup(self, builder: Builder) -> None: @@ -254,14 +273,16 @@ def query_radius(self, pop, radius: float): positions = pop return self._current_tree.query_ball_point(positions, radius) - + def nearest_index(self, near_list): - nearest_frozen_indices = [self._current_frozen.index[indices[0]] for indices in near_list] + nearest_frozen_indices = [ + self._current_frozen.index[indices[0]] for indices in near_list + ] return nearest_frozen_indices def get_population(self, indices: List[int]) -> pd.DataFrame: pos = self._current_frozen.reindex(indices) - pos = pos.dropna(how='all') + pos = pos.dropna(how="all") return pos def freeze_particles(self, pop: pd.DataFrame) -> None: @@ -272,7 +293,7 @@ def freeze_particles(self, pop: pd.DataFrame) -> None: available = pop[~pop.frozen & (pop.path_id < 0)] if len(available) >= len(active): - to_freeze = available.iloc[:len(active)] + to_freeze = available.iloc[: len(active)] to_freeze = to_freeze.assign( x=active.x.values, @@ -299,12 +320,12 @@ class PathExtinction(Component): CONFIGURATION_DEFAULTS = { "path_extinction": { - "extinction_start_time": "2020-01-01", + "extinction_start_time": "2020-01-01", "extinction_end_time": "2020-12-31", "initial_freeze_probability": 0.0, "final_freeze_probability": 0.3, "check_interval": 5, - "force_threshold": 10.0 # Force magnitude threshold for extinction + "force_threshold": 10.0, # Force magnitude threshold for extinction } } @@ -319,18 +340,18 @@ def setup(self, builder: Builder) -> None: self.p_start = self.config.initial_freeze_probability self.p_end = self.config.final_freeze_probability self.force_threshold = self.config.force_threshold - + self.clock = builder.time.clock() self.step_count = 0 self.randomness = builder.randomness.get_stream("path_extinction") - + # Get the force magnitude pipeline self.force_magnitude = builder.value.get_value("particle.force.magnitude") def get_current_freeze_probability(self) -> float: """Calculate current freeze probability based on time.""" current_time = self.clock() - + if current_time < self.start_time: return self.p_start elif current_time > self.end_time: @@ -342,13 +363,13 @@ def get_current_freeze_probability(self) -> float: def on_time_step(self, event: Event) -> None: """Check for path freezing based on time and force magnitude.""" self.step_count += 1 - + if self.step_count % self.config.check_interval != 0: return pop = self.population_view.get(event.index) active = pop[~pop.frozen & (pop.path_id >= 0)] - + if active.empty: return @@ -357,13 +378,14 @@ def on_time_step(self, event: Event) -> None: p_freeze = p_freeze_mean * forces / forces.sum() to_freeze = active[self.randomness.get_draw(active.index) < p_freeze] - + if not to_freeze.empty: to_freeze.loc[:, "frozen"] = True to_freeze.loc[:, "freeze_time"] = self.clock() to_freeze.loc[:, "path_id"] = -1 # Mark as end of path self.population_view.update(to_freeze) + class PathSplitter(Component): """Component for splitting particle paths into two branches.""" @@ -378,16 +400,23 @@ class PathSplitter(Component): @property def columns_required(self) -> List[str]: return [ - "x", "y", "z", - "vx", "vy", "vz", - "frozen", "freeze_time", "depth", - "parent_id", "path_id", + "x", + "y", + "z", + "vx", + "vy", + "vz", + "frozen", + "freeze_time", + "depth", + "parent_id", + "path_id", ] def setup(self, builder: Builder) -> None: self.config = builder.configuration.path_splitter self.step_count = 80 - self.next_path_id = builder.configuration.particles.initial_circle.n_vessels+1 + self.next_path_id = builder.configuration.particles.initial_circle.n_vessels + 1 self.step_size = builder.configuration.time.step_size self.randomness = builder.randomness.get_stream("path_splitter") self.clock = builder.time.clock() @@ -406,7 +435,9 @@ def split_paths(self, pop: pd.DataFrame) -> None: return # Determine which paths will split - to_split = self.randomness.filter_for_probability(active.index, self.config.split_probability) + to_split = self.randomness.filter_for_probability( + active.index, self.config.split_probability + ) if to_split.empty: return @@ -416,7 +447,7 @@ def split_paths(self, pop: pd.DataFrame) -> None: return # Sample particles for new branches - two per split point - new_branches = available.iloc[:(2 * len(to_split))] + new_branches = available.iloc[: (2 * len(to_split))] angle_rad = np.radians(self.config.split_angle / 2) # Track updates for frozen originals and new branches @@ -455,51 +486,66 @@ def split_paths(self, pop: pd.DataFrame) -> None: # Freeze original particle at split point original_update = pd.DataFrame( { - 'x': [original.x], 'y': [original.y], 'z': [original.z], - 'vx': [original.vx], 'vy': [original.vy], 'vz': [original.vz], - 'frozen': [True], - 'freeze_time': [self.clock()], - 'depth': [original.depth], - 'path_id': [original.path_id], - 'parent_id': [original.parent_id], - }, index=[orig_idx] + "x": [original.x], + "y": [original.y], + "z": [original.z], + "vx": [original.vx], + "vy": [original.vy], + "vz": [original.vz], + "frozen": [True], + "freeze_time": [self.clock()], + "depth": [original.depth], + "path_id": [original.path_id], + "parent_id": [original.parent_id], + }, + index=[orig_idx], ) updates.append(original_update) - + # Create first new branch new_branch_1 = pd.DataFrame( { - 'x': [pos_1[0]], 'y': [pos_1[1]], 'z': [pos_1[2]], - 'vx': [new_vel_1[0]], 'vy': [new_vel_1[1]], 'vz': [new_vel_1[2]], - 'frozen': [False], - 'freeze_time': [pd.NaT], - 'depth': [original.depth + 1], - 'path_id': [self.next_path_id], - 'parent_id': [orig_idx], - }, index=[new_branches.iloc[2*idx].name] + "x": [pos_1[0]], + "y": [pos_1[1]], + "z": [pos_1[2]], + "vx": [new_vel_1[0]], + "vy": [new_vel_1[1]], + "vz": [new_vel_1[2]], + "frozen": [False], + "freeze_time": [pd.NaT], + "depth": [original.depth + 1], + "path_id": [self.next_path_id], + "parent_id": [orig_idx], + }, + index=[new_branches.iloc[2 * idx].name], ) updates.append(new_branch_1) - + # Create second new branch new_branch_2 = pd.DataFrame( { - 'x': [pos_2[0]], 'y': [pos_2[1]], 'z': [pos_2[2]], - 'vx': [new_vel_2[0]], 'vy': [new_vel_2[1]], 'vz': [new_vel_2[2]], - 'frozen': [False], - 'freeze_time': [pd.NaT], - 'depth': [original.depth + 1], - 'path_id': [self.next_path_id + 1], - 'parent_id': [orig_idx], - }, index=[new_branches.iloc[2*idx + 1].name] + "x": [pos_2[0]], + "y": [pos_2[1]], + "z": [pos_2[2]], + "vx": [new_vel_2[0]], + "vy": [new_vel_2[1]], + "vz": [new_vel_2[2]], + "frozen": [False], + "freeze_time": [pd.NaT], + "depth": [original.depth + 1], + "path_id": [self.next_path_id + 1], + "parent_id": [orig_idx], + }, + index=[new_branches.iloc[2 * idx + 1].name], ) updates.append(new_branch_2) - + self.next_path_id += 2 if updates: # Combine all updates with consistent dtypes all_updates = pd.concat(updates, axis=0) - + self.population_view.update(all_updates) @staticmethod @@ -510,16 +556,18 @@ def _rotation_matrix(axis: np.ndarray, theta: float) -> np.ndarray: b, c, d = -axis * np.sin(theta / 2.0) aa, bb, cc, dd = a * a, b * b, c * c, d * d bc, ad, ac, ab, bd, cd = b * c, a * d, a * c, a * b, b * d, c * d - return np.array([ - [aa + bb - cc - dd, 2 * (bc + ad), 2 * (bd - ac)], - [2 * (bc - ad), aa + cc - bb - dd, 2 * (cd + ab)], - [2 * (bd + ac), 2 * (cd - ab), aa + dd - bb - cc] - ]) + return np.array( + [ + [aa + bb - cc - dd, 2 * (bc + ad), 2 * (bd - ac)], + [2 * (bc - ad), aa + cc - bb - dd, 2 * (cd + ab)], + [2 * (bd + ac), 2 * (cd - ab), aa + dd - bb - cc], + ] + ) class PathDLA(Component): """Component for freezing particles at the end of a path using DLA. - + The near radius scales exponentially from initial_near_radius to final_near_radius between dla_start_time and dla_end_time. """ @@ -530,16 +578,21 @@ class PathDLA(Component): "initial_near_radius": 0.1, "final_near_radius": 0.01, "dla_start_time": "2000-01-01", # Start time for DLA freezing - "dla_end_time": "2001-01-01", # End time for radius scaling + "dla_end_time": "2001-01-01", # End time for radius scaling } } @property def columns_required(self) -> List[str]: return [ - "x", "y", "z", - "frozen", "freeze_time", "depth", - "path_id", "parent_id", + "x", + "y", + "z", + "frozen", + "freeze_time", + "depth", + "path_id", + "parent_id", ] def setup(self, builder: Builder) -> None: @@ -552,35 +605,40 @@ def setup(self, builder: Builder) -> None: # Convert times to pandas Timestamps self.dla_start_time = pd.Timestamp(self.config.dla_start_time) self.dla_end_time = pd.Timestamp(self.config.dla_end_time) - + # Validate configuration if self.dla_end_time <= self.dla_start_time: raise ValueError("dla_end_time must be after dla_start_time") - + if self.config.initial_near_radius <= 0 or self.config.final_near_radius <= 0: raise ValueError("near radius values must be positive") - + if self.config.final_near_radius > self.config.initial_near_radius: raise ValueError("final_near_radius must be smaller than initial_near_radius") - + # Calculate decay rate for exponential scaling total_time = (self.dla_end_time - self.dla_start_time).total_seconds() - self.decay_rate = -np.log(self.config.final_near_radius / self.config.initial_near_radius) / total_time + self.decay_rate = ( + -np.log(self.config.final_near_radius / self.config.initial_near_radius) + / total_time + ) def get_current_near_radius(self) -> float: """Calculate the current near radius based on exponential decay.""" current_time = self.clock() - + if current_time < self.dla_start_time: return self.config.initial_near_radius elif current_time > self.dla_end_time: return self.config.final_near_radius - + # Calculate time since start time_elapsed = (current_time - self.dla_start_time).total_seconds() - + # Calculate exponentially decayed radius - current_radius = self.config.initial_near_radius * np.exp(-self.decay_rate * time_elapsed) + current_radius = self.config.initial_near_radius * np.exp( + -self.decay_rate * time_elapsed + ) return current_radius def on_time_step(self, event: Event) -> None: @@ -602,24 +660,24 @@ def dla_freeze(self, pop: pd.DataFrame) -> None: if not_frozen.empty: return - near_frozen_indices = self.freezer.query_radius( - not_frozen, self.near_radius - ) + near_frozen_indices = self.freezer.query_radius(not_frozen, self.near_radius) # FIXME: should only use particles with path_id < 0 (i.e. in frozen DataFrame, not all in freezer object ) near_particles = np.array([len(indices) > 0 for indices in near_frozen_indices]) stickiness_probabilities = self.randomness.get_draw( not_frozen.index, additional_key="stickiness" ) - + freeze_condition = stickiness_probabilities < self.config.stickiness freeze_mask = near_particles & freeze_condition to_freeze = not_frozen[freeze_mask].copy() if not to_freeze.empty: - to_freeze["parent_id"] = self.freezer.nearest_index(near_frozen_indices[freeze_mask]) + to_freeze["parent_id"] = self.freezer.nearest_index( + near_frozen_indices[freeze_mask] + ) to_freeze["path_id"] = -1 to_freeze["depth"] = 1000 to_freeze["frozen"] = True to_freeze["freeze_time"] = self.clock() - - self.population_view.update(to_freeze) \ No newline at end of file + + self.population_view.update(to_freeze) diff --git a/src/vivarium_eye_vessels/components/visualizer.py b/src/vivarium_eye_vessels/components/visualizer.py index 169d06d..a0628dd 100644 --- a/src/vivarium_eye_vessels/components/visualizer.py +++ b/src/vivarium_eye_vessels/components/visualizer.py @@ -36,7 +36,19 @@ class ParticleVisualizer3D(Component): @property def columns_required(self) -> List[str]: - return ["x", "y", "z", "vx", "vy", "vz", "frozen", "freeze_time", "depth", "parent_id", "path_id"] + return [ + "x", + "y", + "z", + "vx", + "vy", + "vz", + "frozen", + "freeze_time", + "depth", + "parent_id", + "path_id", + ] def setup(self, builder: Builder): pygame.init() @@ -90,68 +102,68 @@ def setup(self, builder: Builder): self.particle_surface = pygame.Surface((self.width, self.height), pygame.SRCALPHA) self.connection_surface = pygame.Surface((self.width, self.height), pygame.SRCALPHA) - self.force_x = builder.value.get_value('particle.force.x') - self.force_y = builder.value.get_value('particle.force.y') - self.force_z = builder.value.get_value('particle.force.z') + self.force_x = builder.value.get_value("particle.force.x") + self.force_y = builder.value.get_value("particle.force.y") + self.force_z = builder.value.get_value("particle.force.z") def on_simulation_end(self, event: Event) -> None: - """Keep the visualization window open until user exits. - - Parameters - ---------- - event : Event - The event that triggered the function call. - """ - population = self.population_view.get(event.index) - - # Draw one final frame - self.screen.fill(self.config["background_color"]) - - # Calculate rotation matrix for final frame - cy, sy = np.cos(self.y_rotation), np.sin(self.y_rotation) - cx, sx = np.cos(self.x_rotation), np.sin(self.x_rotation) - cz, sz = np.cos(self.z_rotation), np.sin(self.z_rotation) - - y_rotation_matrix = np.array([[cy, 0, sy], [0, 1, 0], [-sy, 0, cy]]) - x_rotation_matrix = np.array([[1, 0, 0], [0, cx, -sx], [0, sx, cx]]) - z_rotation_matrix = np.array([[cz, -sz, 0], [sz, cz, 0], [0, 0, 1]]) - - rotation_matrix = z_rotation_matrix @ x_rotation_matrix @ y_rotation_matrix - - # Project points and prepare colors - points = population[["x", "y", "z"]].values - screen_points, mask = self._project_points(points, rotation_matrix) - - colors = np.where( - population["frozen"].values[:, np.newaxis], - self.config["frozen_color"], - self.config["particle_color"] - ) - - # Clear surfaces - self.connection_surface.fill((0, 0, 0, 0)) - self.particle_surface.fill((0, 0, 0, 0)) - - # Draw final state - self._draw_connections(population, screen_points, mask, self.connection_surface) - self._draw_particles(screen_points, colors, mask, self.particle_surface) - self.screen.blit(self.connection_surface, (0, 0)) - self.screen.blit(self.particle_surface, (0, 0)) - - # Draw additional elements - if self.has_ellipsoid: - self._draw_ellipsoid(rotation_matrix) - if self.has_cylinder: - self._draw_cylinder(rotation_matrix) - - self._draw_axes(rotation_matrix) - self._draw_progress_bar() - self._draw_fps() - self._draw_controls_help() - - pygame.display.flip() - - self._wait_for_exit() + """Keep the visualization window open until user exits. + + Parameters + ---------- + event : Event + The event that triggered the function call. + """ + population = self.population_view.get(event.index) + + # Draw one final frame + self.screen.fill(self.config["background_color"]) + + # Calculate rotation matrix for final frame + cy, sy = np.cos(self.y_rotation), np.sin(self.y_rotation) + cx, sx = np.cos(self.x_rotation), np.sin(self.x_rotation) + cz, sz = np.cos(self.z_rotation), np.sin(self.z_rotation) + + y_rotation_matrix = np.array([[cy, 0, sy], [0, 1, 0], [-sy, 0, cy]]) + x_rotation_matrix = np.array([[1, 0, 0], [0, cx, -sx], [0, sx, cx]]) + z_rotation_matrix = np.array([[cz, -sz, 0], [sz, cz, 0], [0, 0, 1]]) + + rotation_matrix = z_rotation_matrix @ x_rotation_matrix @ y_rotation_matrix + + # Project points and prepare colors + points = population[["x", "y", "z"]].values + screen_points, mask = self._project_points(points, rotation_matrix) + + colors = np.where( + population["frozen"].values[:, np.newaxis], + self.config["frozen_color"], + self.config["particle_color"], + ) + + # Clear surfaces + self.connection_surface.fill((0, 0, 0, 0)) + self.particle_surface.fill((0, 0, 0, 0)) + + # Draw final state + self._draw_connections(population, screen_points, mask, self.connection_surface) + self._draw_particles(screen_points, colors, mask, self.particle_surface) + self.screen.blit(self.connection_surface, (0, 0)) + self.screen.blit(self.particle_surface, (0, 0)) + + # Draw additional elements + if self.has_ellipsoid: + self._draw_ellipsoid(rotation_matrix) + if self.has_cylinder: + self._draw_cylinder(rotation_matrix) + + self._draw_axes(rotation_matrix) + self._draw_progress_bar() + self._draw_fps() + self._draw_controls_help() + + pygame.display.flip() + + self._wait_for_exit() def _wait_for_exit(self) -> None: """Run an event loop until the user exits.""" @@ -159,7 +171,8 @@ def _wait_for_exit(self) -> None: while running: for event in pygame.event.get(): if event.type == pygame.QUIT or ( - event.type == pygame.KEYDOWN and event.key in [pygame.K_ESCAPE, pygame.K_q] + event.type == pygame.KEYDOWN + and event.key in [pygame.K_ESCAPE, pygame.K_q] ): running = False pygame.quit() @@ -418,23 +431,23 @@ def on_time_step(self, event: Event) -> None: screen_points, mask = self._project_points(points, rotation_matrix) current_time = self.clock() - frozen_time = ((current_time - population["freeze_time"]) / pd.Timedelta(days=1)).values + frozen_time = ( + (current_time - population["freeze_time"]) / pd.Timedelta(days=1) + ).values frozen = population["frozen"].values delay = float(self.delay) - + # Initialize with default particle color colors = np.full((len(population), 3), self.config["particle_color"]) - + # Set frozen particle colors frozen_mask = frozen colors[frozen_mask] = self.config["frozen_color"] - + # Set repulsive particle colors (frozen longer than delay) repulsive_mask = frozen & (frozen_time > delay) colors[repulsive_mask] = (200, 150, 150) # TODO: Make this a config option - - # Override colors for points in cylinder if self.has_cylinder: points = population[["x", "y", "z"]].values @@ -614,71 +627,86 @@ def _draw_controls_help(self) -> None: def _calculate_path_widths(self, population: pd.DataFrame) -> None: """Calculate branching counts and path widths for all paths using optimized methods.""" - path_widths = np.where(population["parent_id"] >= 0, - np.maximum(5-population["depth"], 2), - 0) + path_widths = np.where( + population["parent_id"] >= 0, np.maximum(5 - population["depth"], 2), 0 + ) return path_widths - def _draw_vectors(self, population: pd.DataFrame, screen_points: np.ndarray, mask: np.ndarray, rotation_matrix: np.ndarray) -> None: + def _draw_vectors( + self, + population: pd.DataFrame, + screen_points: np.ndarray, + mask: np.ndarray, + rotation_matrix: np.ndarray, + ) -> None: """Draw force and velocity vectors for active non-frozen particles.""" # Get only active non-frozen particles - active_mask = (population['path_id'] >= 0) & (~population['frozen']) + active_mask = (population["path_id"] >= 0) & (~population["frozen"]) active_particles = population[active_mask] - + if active_particles.empty: return - + # Get force components for active particles force_x = self.force_x(active_particles.index) force_y = self.force_y(active_particles.index) force_z = self.force_z(active_particles.index) - + # Get velocity components directly from population - velocities = active_particles[['vx', 'vy', 'vz']].values + velocities = active_particles[["vx", "vy", "vz"]].values forces = np.column_stack([force_x, force_y, force_z]) # Calculate magnitudes force_magnitudes = np.linalg.norm(forces, axis=1) velocity_magnitudes = np.linalg.norm(velocities, axis=1) - + # Normalize vectors by their max magnitudes and apply scaling - scaled_forces = 10*forces #* (force_scale / max_force_mag) if max_force_mag > 0 else forces - scaled_velocities = 5*velocities #* (vel_scale / max_vel_mag) if max_vel_mag > 0 else velocities - + scaled_forces = ( + 10 * forces + ) # * (force_scale / max_force_mag) if max_force_mag > 0 else forces + scaled_velocities = ( + 5 * velocities + ) # * (vel_scale / max_vel_mag) if max_vel_mag > 0 else velocities + # Get start points for vectors (current particle positions) start_points = screen_points[active_mask] start_mask = mask[active_mask] - + # Calculate end points for both force and velocity vectors - force_end_points_3d = active_particles[['x', 'y', 'z']].values + scaled_forces - velocity_end_points_3d = active_particles[['x', 'y', 'z']].values + scaled_velocities - - force_end_points, force_end_mask = self._project_points(force_end_points_3d, rotation_matrix) - velocity_end_points, velocity_end_mask = self._project_points(velocity_end_points_3d, rotation_matrix) - + force_end_points_3d = active_particles[["x", "y", "z"]].values + scaled_forces + velocity_end_points_3d = active_particles[["x", "y", "z"]].values + scaled_velocities + + force_end_points, force_end_mask = self._project_points( + force_end_points_3d, rotation_matrix + ) + velocity_end_points, velocity_end_mask = self._project_points( + velocity_end_points_3d, rotation_matrix + ) + # Draw vectors def draw_arrow(start, end, color, width=2): """Helper to draw an arrow with proportional head size""" if not (np.all(np.isfinite(start)) and np.all(np.isfinite(end))): return - + # Draw main line pygame.draw.line(self.screen, color, start, end, width) - - + # Draw all vectors - force_color = self.config.get('force_color', (255, 255, 0)) # Yellow for forces - velocity_color = self.config.get('velocity_color', (0, 255, 255)) # Cyan for velocities - + force_color = self.config.get("force_color", (255, 255, 0)) # Yellow for forces + velocity_color = self.config.get( + "velocity_color", (0, 255, 255) + ) # Cyan for velocities + visible_mask = start_mask & velocity_end_mask - + for i in range(len(start_points)): if visible_mask[i]: # Draw force vector (if significant) if force_magnitudes[i] > 1e-6: draw_arrow(start_points[i], force_end_points[i], force_color) - + # Draw velocity vector (if significant) if velocity_magnitudes[i] > 1e-6: draw_arrow(start_points[i], velocity_end_points[i], velocity_color)