Skip to content

Commit

Permalink
Changes from review
Browse files Browse the repository at this point in the history
  • Loading branch information
ramo-j committed Jan 6, 2025
1 parent c4dc463 commit a0d38bb
Showing 1 changed file with 41 additions and 11 deletions.
52 changes: 41 additions & 11 deletions dftimewolf/lib/containers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@

import dataclasses
import threading
import typing
from typing import cast, Sequence, Type
from typing import Any, cast, Sequence, Type

from dftimewolf.lib.containers import interface


@dataclasses.dataclass
class _MODULE():
"""A helper class for tracking module storage and dependency info.
Attributes:
name: The module name.
dependencies: A list of modules that this module depends on.
storage: A list of containers generated by the associated module.
completed: True if the assiciated module has finished running.
"""
name: str
dependencies: list[str] = dataclasses.field(default_factory=list)
storage: list[interface.AttributeContainer] = dataclasses.field(
Expand All @@ -36,8 +43,15 @@ def __init__(self) -> None:
self._mutex = threading.Lock()
self._modules: dict[str, _MODULE] = {}

def ParseRecipe(self, recipe: dict[str, typing.Any]) -> None:
"""Parses a recipe to build the dependency graph."""
def ParseRecipe(self, recipe: dict[str, Any]) -> None:
"""Parses a recipe to build the dependency graph.
Args:
recipe: The recipe dict, that comes from the recipe manager class.
Raises:
RuntimeError: If there is an error in the recipe definition.
"""
with self._mutex:
self._modules = {}

Expand All @@ -57,19 +71,21 @@ def StoreContainer(self,
Args:
source_module: The module that generated the container.
container: The container to store.
Raises:
RuntimeError: If the manager has not been configured with a recipe yet.
"""
if len(self._modules) == 0:
if not self._modules:
raise RuntimeError("Container manager has not parsed a recipe yet")

with self._mutex:
self._modules[source_module].storage.append(container)


def GetContainers(self,
requesting_module: str,
container_class: Type[interface.AttributeContainer],
metadata_filter_key: str | None = None,
metadata_filter_value: typing.Any = None
metadata_filter_value: Any = None
) -> Sequence[interface.AttributeContainer]:
"""Retrieves stored containers.
Expand All @@ -84,8 +100,12 @@ def GetContainers(self,
Returns:
A sequence of containers that match the various filters.
Raises:
RuntimeError: If the manager has not been configured with a recipe yet; or
if only one of metadata_filter_(key|value) is specified.
"""
if len(self._modules) == 0:
if not self._modules:
raise RuntimeError("Container manager has not parsed a recipe yet")
if bool(metadata_filter_key) ^ bool(metadata_filter_value):
raise RuntimeError('Must specify both key and value for attribute filter')
Expand All @@ -103,7 +123,6 @@ def GetContainers(self,

return cast(Sequence[interface.AttributeContainer], ret_val)


def CompleteModule(self, module_name: str) -> None:
"""Mark a module as completed in storage.
Expand All @@ -113,8 +132,11 @@ def CompleteModule(self, module_name: str) -> None:
Args:
module_name: The module that has completed running.
Raises:
RuntimeError: If the manager has not been configured with a recipe yet.
"""
if len(self._modules) == 0:
if not self._modules:
raise RuntimeError("Container manager has not parsed a recipe yet")

with self._mutex:
Expand All @@ -129,7 +151,15 @@ def CompleteModule(self, module_name: str) -> None:
module.storage = []

def _CheckDependenciesCompletion(self, module_name: str) -> bool:
"""For a module, checks if other modules that depend on are complete."""
"""For a module, checks if other modules that depend on are complete.
Args:
module_name: The module name to check for.
Returns:
True if all modules dependant on the named module have complete; False
otherwise.
"""
for _, module in self._modules.items():
if module_name in module.dependencies:
if not module.completed:
Expand Down

0 comments on commit a0d38bb

Please sign in to comment.