diff --git a/examples/example_pipeline.json b/examples/example_pipeline.json index 6f2e58eb..93f00cb0 100644 --- a/examples/example_pipeline.json +++ b/examples/example_pipeline.json @@ -21,28 +21,6 @@ "sex_id" ], "stages": { - "preprocessing": { - "name": "preprocessing", - "config": { - "id_columns": [ - "year_id", - "sex_id", - "age_group_id", - "location_id" - ], - "model_type": "binomial", - "observation_column": "obs", - "prediction_column": "pred", - "weight_column": "weights", - "test_column": "test", - "holdout_columns": [], - "coef_bounds": {} - }, - "input": { - "data": "/path/to/data.parquet" - }, - "type": "PreprocessingStage" - }, "covariate_selection": { "name": "covariate_selection", "config": { @@ -80,10 +58,7 @@ ], "crossby": [], "input": { - "data": { - "stage": "preprocessing", - "path": "/path/to/experiment/directory/preprocessing/data.parquet" - } + "data": "/path/to/data.parquet" }, "type": "RoverStage" }, @@ -179,10 +154,7 @@ ], "crossby": [], "input": { - "data": { - "stage": "preprocessing", - "path": "/path/to/experiment/directory/preprocessing/data.parquet" - }, + "data": "/path/to/data.parquet", "offset": { "stage": "global_model", "path": "/path/to/experiment/directory/global_model/predictions.parquet" @@ -233,10 +205,7 @@ ], "crossby": [], "input": { - "data": { - "stage": "preprocessing", - "path": "/path/to/experiment/directory/preprocessing/data.parquet" - }, + "data": "/path/to/data.parquet", "offset": { "stage": "location_model", "path": "/path/to/experiment/directory/location_model/predictions.parquet" @@ -274,10 +243,7 @@ ], "module": "/path/to/custom_stage.py", "input": { - "observations": { - "stage": "preprocessing", - "path": "/path/to/experiment/directory/preprocessing/data.parquet" - }, + "observations": "/path/to/data.parquet", "predictions": { "stage": "smoothing", "path": "/path/to/experiment/directory/smoothing/predictions.parquet" @@ -287,25 +253,18 @@ } }, "dependencies": { - "preprocessing": [], - "covariate_selection": [ - "preprocessing" - ], + "covariate_selection": [], "global_model": [ - "covariate_selection", - "preprocessing" + "covariate_selection" ], "location_model": [ - "global_model", - "preprocessing" + "global_model" ], "smoothing": [ - "location_model", - "preprocessing" + "location_model" ], "custom_stage": [ - "smoothing", - "preprocessing" + "smoothing" ] } } \ No newline at end of file diff --git a/examples/pipeline_example.py b/examples/pipeline_example.py index 75e1a709..d32421b8 100644 --- a/examples/pipeline_example.py +++ b/examples/pipeline_example.py @@ -4,14 +4,13 @@ from custom_stage import CustomStage from onemod import Pipeline -from onemod.stage import KregStage, PreprocessingStage, RoverStage, SpxmodStage +from onemod.stage import KregStage, RoverStage, SpxmodStage def create_pipeline(directory: str, data: str): # Create stages # Stage-specific validation specifications go here. # Stage classes may also implement default validation specifications. - preprocessing = PreprocessingStage(name="preprocessing", config={}) covariate_selection = RoverStage( name="covariate_selection", config={"cov_exploring": ["cov1", "cov2", "cov3"]}, @@ -68,7 +67,6 @@ def create_pipeline(directory: str, data: str): # Add stages example_pipeline.add_stages( [ - preprocessing, covariate_selection, global_model, location_model, @@ -78,24 +76,13 @@ def create_pipeline(directory: str, data: str): ) # Define dependencies - preprocessing(data=example_pipeline.data) - covariate_selection(data=preprocessing.output["data"]) + covariate_selection(data=data) global_model( - data=preprocessing.output["data"], - selected_covs=covariate_selection.output["selected_covs"], - ) - location_model( - data=preprocessing.output["data"], - offset=global_model.output["predictions"], - ) - smoothing( - data=preprocessing.output["data"], - offset=location_model.output["predictions"], - ) - custom_stage( - observations=preprocessing.output["data"], - predictions=smoothing.output["predictions"], + data=data, selected_covs=covariate_selection.output["selected_covs"] ) + location_model(data=data, offset=global_model.output["predictions"]) + smoothing(data=data, offset=location_model.output["predictions"]) + custom_stage(observations=data, predictions=smoothing.output["predictions"]) # Serialize pipeline example_pipeline.to_json() @@ -111,7 +98,7 @@ def create_pipeline(directory: str, data: str): # Fit specific stages example_pipeline.evaluate( - method="fit", stages=["preprocessing", "covariate_selection"] + method="fit", stages=["covariate_selection", "global_model"] ) # Predict for specific locations diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 411c99bb..6afdd1ab 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -31,7 +31,7 @@ import sys from pathlib import Path -from typing import Any, Literal, cast +from typing import Literal import yaml from jobmon.client.api import Tool @@ -96,7 +96,7 @@ def get_tasks( name=f"{stage.name}_{method}", upstream_tasks=upstream_tasks, max_attempts=1, - **cast(dict[str, Any], task_args), + **task_args, ) ] @@ -141,8 +141,8 @@ def get_command_template( "{python}" f" {Path(__file__).parents[1] / 'main.py'}" " --config {config}" - f" --stage_name {stage_name}" f" --method {method}" + f" --stages {stage_name}" ) for node_arg in node_args: @@ -167,21 +167,38 @@ def get_task_resources( def get_upstream_tasks( stage: Stage, method: Literal["run", "fit", "predict"], - stages: dict[str, Stage], + stage_dict: dict[str, Stage], task_dict: dict[str, list[Task]], - specified_stages: set[str] | None = None, + stages: set[str] | None = None, ) -> list[Task]: - """Get upstream stage tasks.""" + """Get upstream stage tasks. + + Parameters + ---------- + stage : Stage + Current stage. + method : str + Name of method to evaluate. + stage_dict : dict[str, Stage] + Dictionary of all upstream pipeline stages. + task_dict : dict[str, list[Task]] + Dictionary of all tasks being evaluated. + stages : set[str] or None, optional + Name of all pipeline stages being evaluated. + + Returns + ------- + list of Task + Upstream tasks for current stage. + + """ upstream_tasks = [] for upstream_name in stage.dependencies: - if ( - specified_stages is not None - and upstream_name not in specified_stages - ): + if stages is not None and upstream_name not in stages: continue - upstream = stages[upstream_name] + upstream = stage_dict[upstream_name] if method not in upstream.skip: if ( isinstance(upstream, ModelStage) @@ -200,8 +217,8 @@ def evaluate_with_jobmon( cluster: str, resources: Path | str, python: Path | str | None = None, - method: Literal["run", "fit", "predict", "collect"] = "run", - stages: list[str] | None = None, + method: Literal["run", "fit", "predict"] = "run", + stages: set[str] | None = None, ) -> None: """Evaluate pipeline or stage method with Jobmon. @@ -219,7 +236,8 @@ def evaluate_with_jobmon( method : str, optional Name of method to evalaute. Default is 'run'. stages : set of str or None, optional - Set of stage names to evaluate. Default is None. + Names of stages to evaluate if `model` is a pipeline instance. + If None, evaluate entire pipeline. Default is None. TODO: Optional stage-specific Python environments TODO: User-defined max_attempts @@ -231,13 +249,13 @@ def evaluate_with_jobmon( # Set config if isinstance(model, Stage): - model_config = model.dataif.load(key="config") + config_path = model.dataif.get_path("config") elif isinstance(model, Pipeline): - model_config = model.config + config_path = model.directory / f"{model.name}.json" task_args: dict[str, str] = { "python": str(python or sys.executable), - "config": str(model_config), + "config": str(config_path), } # Create tasks @@ -245,20 +263,11 @@ def evaluate_with_jobmon( tasks = [] task_dict: dict[str, list[Task]] = {} - if stages is None: - stages = model.get_execution_order() - - for stage_name in stages: + for stage_name in model.get_execution_order(stages): stage = model.stages[stage_name] - if ( - method not in stage.skip and method != "collect" - ): # TODO: handle collect + if method not in stage.skip: upstream_tasks = get_upstream_tasks( - stage, - method, - model.stages, - task_dict, - specified_stages=set(stages), + stage, method, model.stages, task_dict, stages ) task_dict[stage_name] = get_tasks( tool, resources, stage, method, task_args, upstream_tasks diff --git a/src/onemod/backend/local_backend.py b/src/onemod/backend/local_backend.py index 08e2323f..0733dc30 100644 --- a/src/onemod/backend/local_backend.py +++ b/src/onemod/backend/local_backend.py @@ -13,7 +13,7 @@ def evaluate_local( model: Pipeline | Stage, method: Literal["run", "fit", "predict"] = "run", - stages: list[str] | None = None, + stages: set[str] | None = None, **kwargs, ) -> None: """Evaluate pipeline or stage method locally. @@ -24,6 +24,9 @@ def evaluate_local( Pipeline or stage instance. method : str, optional Name of method to evaluate. Default is 'run'. + stages : set of str or None, optional + Names of stages to evaluate if `model` is a pipeline instance. + If None, evaluate entire pipeline. Default is None. Other Parameters ---------------- @@ -31,15 +34,10 @@ def evaluate_local( Submodel data subset ID. Only used for model stages. param_id : int, optional Submodel parameter set ID. Only used for model stages. - stages : list of str or None, optional - List of stage names to evaluate. Default is None. """ if isinstance(model, Pipeline): - if stages is None: - stages = model.get_execution_order() - - for stage_name in stages: + for stage_name in model.get_execution_order(stages): stage = model.stages[stage_name] if method not in stage.skip: _evaluate_stage(stage, method) diff --git a/src/onemod/config/__init__.py b/src/onemod/config/__init__.py index 1f7a934e..ab1299c3 100644 --- a/src/onemod/config/__init__.py +++ b/src/onemod/config/__init__.py @@ -1,5 +1,4 @@ from onemod.config.base import Config, ModelConfig, PipelineConfig, StageConfig -from onemod.config.data_config import PreprocessingConfig from onemod.config.model_config import KregConfig, RoverConfig, SpxmodConfig __all__ = [ @@ -7,7 +6,6 @@ "PipelineConfig", "StageConfig", "ModelConfig", - "PreprocessingConfig", "RoverConfig", "SpxmodConfig", "KregConfig", diff --git a/src/onemod/config/data_config/__init__.py b/src/onemod/config/data_config/__init__.py index 7a6deb77..e69de29b 100644 --- a/src/onemod/config/data_config/__init__.py +++ b/src/onemod/config/data_config/__init__.py @@ -1,3 +0,0 @@ -from onemod.config.data_config.preprocessing_config import PreprocessingConfig - -__all__ = ["PreprocessingConfig"] diff --git a/src/onemod/config/data_config/preprocessing_config.py b/src/onemod/config/data_config/preprocessing_config.py deleted file mode 100644 index 5705dae8..00000000 --- a/src/onemod/config/data_config/preprocessing_config.py +++ /dev/null @@ -1,9 +0,0 @@ -"""Preprocessing stage settings.""" - -from onemod.config import StageConfig - - -class PreprocessingConfig(StageConfig): - """Preprocessing stage settings.""" - - pass diff --git a/src/onemod/dtypes/data.py b/src/onemod/dtypes/data.py index 09e5cdc8..189bc708 100644 --- a/src/onemod/dtypes/data.py +++ b/src/onemod/dtypes/data.py @@ -12,6 +12,8 @@ handle_error, ) +# FIXME: Update for new DataInterface class + class Data(BaseModel): stage: str @@ -42,24 +44,25 @@ def validate_metadata( "File path is required.", collector, ) - else: - # FIXME: Path won't exist until stage has been run - # if kind == "input" and not self.path.exists(): - # handle_error( - # self.stage, - # "Data validation", - # FileNotFoundError, - # f"File {self.path} does not exist.", - # collector, - # ) - if self.format not in DataIOHandler.supported_formats: - handle_error( - self.stage, - "Data validation", - ValueError, - f"Unsupported file format {self.format}.", - collector, - ) + # else: + # FIXME: Path won't exist until stage has been run + # if kind == "input" and not self.path.exists(): + # handle_error( + # self.stage, + # "Data validation", + # FileNotFoundError, + # f"File {self.path} does not exist.", + # collector, + # ) + # FIXME: Update for new DataInterface class + # if self.format not in DataIOHandler.supported_formats: + # handle_error( + # self.stage, + # "Data validation", + # ValueError, + # f"Unsupported file format {self.format}.", + # collector, + # ) if self.shape: if not isinstance(self.shape, tuple) or len(self.shape) != 2: diff --git a/src/onemod/fsutils/interface.py b/src/onemod/fsutils/interface.py index c1aa876a..ed22c8a7 100644 --- a/src/onemod/fsutils/interface.py +++ b/src/onemod/fsutils/interface.py @@ -1,6 +1,7 @@ from pathlib import Path from typing import Any, Literal +import pandas as pd import polars as pl from onemod.fsutils.config_loader import ConfigLoader @@ -62,7 +63,7 @@ def load( def dump(self, obj: Any, *fparts: str, key: str, **options) -> None: """Dump data or config files based on object type and key.""" path = self.get_full_path(*fparts, key=key) - if isinstance(obj, (pl.DataFrame, pl.LazyFrame)): + if isinstance(obj, (pl.DataFrame, pl.LazyFrame, pd.DataFrame)): return self.data_loader.dump(obj, path, **options) else: return self.config_loader.dump(obj, path, **options) diff --git a/src/onemod/fsutils/io.py b/src/onemod/fsutils/io.py index bffb4ee2..06d4c1f5 100644 --- a/src/onemod/fsutils/io.py +++ b/src/onemod/fsutils/io.py @@ -45,10 +45,13 @@ def dump( self, obj: pl.DataFrame | pl.LazyFrame | pd.DataFrame, fpath: Path | str, + mkdir: bool = True, **options, ): """Dump data to given path.""" fpath = Path(fpath) + if mkdir: + fpath.parent.mkdir(parents=True, exist_ok=True) if isinstance(obj, pl.LazyFrame): obj = obj.collect() diff --git a/src/onemod/io/base.py b/src/onemod/io/base.py index 15e08918..888d8738 100644 --- a/src/onemod/io/base.py +++ b/src/onemod/io/base.py @@ -7,6 +7,11 @@ * Provides validation * No need to create stage-specific subclasses +TODO: Could simplify Input class functions if all items were Data (as +opposed to Data or Path). Would have to update Data class to allow stage +attribute to be None, and would have to convert Paths to Data when +adding to Input.items. + """ from abc import ABC @@ -80,7 +85,9 @@ def model_post_init(self, *args, **kwargs) -> None: } self._expected_types = {} for item in {*self.required, *self.optional}: - item_name, item_type = item.split(".") + item_specs = item.split(".") + item_name = item_specs[0] + item_type = "directory" if len(item_specs) == 1 else item_specs[1] self._expected_types[item_name] = item_type if self.items: for item_name in list(self.items.keys()): @@ -107,6 +114,20 @@ def clear(self) -> None: def check_missing( self, items: dict[str, Data | Path] | None = None ) -> None: + """Check stage input items have been defined. + + Parameters + ---------- + items : dict of str: Data or Path, optional + Input items to check. If None, check all stage input items. + Default is None. + + Raises + ------ + KeyError + If any stage input items have not been defined. + + """ items = items or self.items missing_items = [ item_name @@ -115,7 +136,52 @@ def check_missing( ] if missing_items: raise KeyError( - f"{self.stage} missing required input: {missing_items}" + f"Stage '{self.stage}' missing required input: {missing_items}" + ) + + def check_exists( + self, + item_names: set[str] | None = None, + upstream_stages: set[str] | None = None, + ) -> None: + """Check stage input items exist. + + Parameters + ---------- + item_names : set of str, optional + Names of input items to check. If None, check all input + path items and all input data items in `upstream_stages`. + Default is None. + upstream_stages : set of str, optional + Names of upstream stages to check input items from. If None, + check input items from all upstream stages. + + Raises + ------ + FileNotFoundError + If any stage input items do not exist. + + FIXME: Assumes pipeline has been built (so paths are absolute) + + """ + item_names = item_names or set(self.items.keys()) + upstream_stages = upstream_stages or self.dependencies + + missing_items = {} + for item_name in item_names: + if isinstance(item_value := self.__getitem__(item_name), Path): + item_path = item_value + else: # item_value: Data + if item_value.stage in upstream_stages: + item_path = item_value.path + else: + continue + if not item_path.exists(): + missing_items[item_name] = str(item_path) + + if missing_items: + raise FileNotFoundError( + f"Stage {self.stage} input items do not exist: {missing_items}" ) def _check_cycles( @@ -157,7 +223,8 @@ def _check_type(self, item_name: str, item_value: Data | Path) -> None: if item_name in self._expected_types: if isinstance(item_value, Data): item_value = item_value.path - if item_value.suffix[1:] != self._expected_types[item_name]: + suffix = item_value.suffix[1:] or "directory" + if suffix != self._expected_types[item_name]: raise TypeError( f"Invalid type for {self.stage} input: {item_name}" ) diff --git a/src/onemod/main.py b/src/onemod/main.py index 9b318267..c60c111a 100644 --- a/src/onemod/main.py +++ b/src/onemod/main.py @@ -53,7 +53,8 @@ def load_stage(config: Path | str, stage_name: str) -> Stage: """ stage_class = _get_stage(config, stage_name) - return stage_class.from_json(config, stage_name) + stage = stage_class.from_json(config, stage_name) + return stage def _get_stage(config: Path | str, stage_name: str) -> Stage: @@ -71,17 +72,26 @@ def _get_stage(config: Path | str, stage_name: str) -> Stage: Stage Stage class. + Notes + ----- + When a custom stage class has the same name as a built-in OneMod + stage class, this function returns the custom stage class. + """ with open(config, "r") as f: config_dict = json.load(f) if stage_name not in config_dict["stages"]: raise KeyError(f"Config does not contain a stage named '{stage_name}'") config_dict = config_dict["stages"][stage_name] - if hasattr(onemod_stages, stage_type := config_dict["type"]): + stage_type = config_dict["type"] + + if "module" in config_dict: + return _get_custom_stage(stage_type, config_dict["module"]) + if hasattr(onemod_stages, stage_type): return getattr(onemod_stages, stage_type) - if "module" not in config_dict: - raise KeyError(f"Config does not contain a module for {stage_name}") - return _get_custom_stage(stage_type, config_dict["module"]) + raise KeyError( + f"Config does not contain a module for custom stage '{stage_name}'" + ) def _get_custom_stage(stage_type: str, module: str) -> Stage: @@ -122,8 +132,8 @@ def _get_custom_stage(stage_type: str, module: str) -> Stage: @validate_call def evaluate( config: Path | str, - stage_name: str | None = None, method: Literal["run", "fit", "predict", "collect"] = "run", + stages: str | set[str] | None = None, backend: Literal["local", "jobmon"] = "local", **kwargs, ) -> None: @@ -133,11 +143,11 @@ def evaluate( ---------- config : Path or str Path to config file. - stage_name : str or None, optional - Name of stage to evaluate. If None, evaluate pipeline. - Default is None. method : str, optional Name of method to evaluate. Default is 'run'. + stages : str, set of str, or None, optional + Names of stages to evaluate. If None, evaluate entire pipeline. + Default is None. backend : str, optional Whether to evaluate the method locally or with Jobmon. Default is 'local'. @@ -156,14 +166,12 @@ def evaluate( """ model: Pipeline | Stage - if stage_name is None: - model = load_pipeline(config) - if method == "collect": - raise ValueError(f"{method} is not a valid method for a pipeline") + if isinstance(stages, str): + model = load_stage(config, stages) model.evaluate(method, backend, **kwargs) else: - model = load_stage(config, stage_name) - model.evaluate(method, backend, **kwargs) + model = load_pipeline(config) + model.evaluate(method, stages, backend, **kwargs) def call_function( diff --git a/src/onemod/pipeline.py b/src/onemod/pipeline.py index af434df0..c1f70d2a 100644 --- a/src/onemod/pipeline.py +++ b/src/onemod/pipeline.py @@ -137,23 +137,33 @@ def add_stage(self, stage: Stage) -> None: stage.config.inherit(self.config) self._stages[stage.name] = stage - def check_upstream_outputs_exist( - self, stage_name: str, upstream_name: str - ) -> bool: - """Check if outputs from the specified upstream dependency exist for the inputs of a given stage.""" - stage = self.stages[stage_name] - - for input_name, input_data in stage.input.items.items(): - if input_data.stage == upstream_name: - upstream_output_path = input_data.path - if not upstream_output_path.exists(): - return False - return True - - def get_execution_order(self) -> list[str]: - """ - Return topologically sorted order of stages, ensuring no cycles. - Uses Kahn's algorithm to find the topological order of the stages. + def get_execution_order(self, stages: set[str] | None = None) -> list[str]: + """Get stages sorted in execution order. + + Use Kahn's algorithm to find the topoligical order of the + stages, ensuring no cycles. + + Parameters + ---------- + stages: set of str, optional + Name of stages to sort. If None, sort all pipeline stages. + Default is None. + + Returns + ------- + list of str + Stages sorted in execution order. + + Raises + ------ + ValueError + If cycle detected in DAG. + + TODO: What if stages have a gap? For example, pipeline has + Rover -> SPxMod -> KReg, but `stages` only includes Rover and + Kreg. KReg will be run on outdated SPxMod results (if they + exist). + """ reverse_graph: dict[str, list[str]] = { stage: [] for stage in self.dependencies @@ -186,6 +196,8 @@ def get_execution_order(self) -> list[str]: f"Cycle detected! Unable to process the following stages: {unvisited}" ) + if stages: + return [stage for stage in topological_order if stage in stages] return topological_order def validate_dag(self, collector: ValidationErrorCollector) -> None: @@ -282,10 +294,10 @@ def save_validation_report( @validate_call def evaluate( self, - method: Literal["run", "fit", "predict"] = "run", + method: Literal["run", "fit", "predict", "collect"] = "run", + stages: set[str] | None = None, backend: Literal["local", "jobmon"] = "local", build: bool = True, - stages: set[str] | None = None, id_subsets: dict[str, list[Any]] | None = None, **kwargs, ) -> None: @@ -295,12 +307,14 @@ def evaluate( ---------- method : str, optional Name of method to evaluate. Default is 'run'. + stages : set of str, optional + Names of stages to evaluate. Default is None. + If None, evaluate entire pipeline. backend : str, optional How to evaluate the method. Default is 'local'. build : bool, optional - Whether to build the pipeline before evaluation. Default is True. - stages : set of str, optional - Stages to evaluate. Default is None. + Whether to build the pipeline before evaluation. + Default is True. id_subsets : dict of str: list of Any, optional Other Parameters @@ -310,44 +324,39 @@ def evaluate( resources : Path or str, optional Path to resources yaml file. Required if `backend` is 'jobmon'. + """ + if method == "collect": + raise ValueError( + "Method 'collect' can only be called on a 'ModelStage' object" + ) + if build: self.build(id_subsets=id_subsets) - if stages is not None: - for stage_name in stages: - if stage_name not in self.stages: - raise ValueError( - f"Stage '{stage_name}' not found in pipeline." - ) - - for stage_name in stages: - stage: Stage = self.stages.get(stage_name) - for dep in stage.dependencies: - if dep not in stages: - if not self.check_upstream_outputs_exist( - stage_name, dep - ): - raise ValueError( - f"Required input to stage '{stage_name}' is missing. Missing output from upstream dependency '{dep}'." - ) - - ordered_stages = ( - [stage for stage in self.get_execution_order() if stage in stages] - if stages is not None - else self.get_execution_order() - ) + stages = stages or self.stages.keys() + for stage_name in stages: + if stage_name not in self.stages: + raise ValueError(f"Stage '{stage_name}' not found in pipeline.") + else: + # Check input from upstream stages not being run already exists + stage = self.stages[stage_name] + stage.input.check_exists( + upstream_stages=[ + dep for dep in stage.dependencies if dep not in stages + ] + ) if backend == "jobmon": from onemod.backend.jobmon_backend import evaluate_with_jobmon evaluate_with_jobmon( - model=self, method=method, stages=ordered_stages, **kwargs + model=self, method=method, stages=stages, **kwargs ) else: from onemod.backend.local_backend import evaluate_local - evaluate_local(model=self, method=method, stages=ordered_stages) + evaluate_local(model=self, method=method, stages=stages) def run( self, @@ -382,6 +391,6 @@ def resume(self) -> None: def __repr__(self) -> str: return ( - f"{type(self).__name__}({self.name}," + f"{type(self).__name__}(name={self.name}," f" stages={list(self.stages.values())})" ) diff --git a/src/onemod/stage/__init__.py b/src/onemod/stage/__init__.py index 1dd5b828..4e517de8 100644 --- a/src/onemod/stage/__init__.py +++ b/src/onemod/stage/__init__.py @@ -1,12 +1,4 @@ from onemod.stage.base import ModelStage, Stage -from onemod.stage.data_stages import PreprocessingStage from onemod.stage.model_stages import KregStage, RoverStage, SpxmodStage -__all__ = [ - "Stage", - "ModelStage", - "PreprocessingStage", - "RoverStage", - "SpxmodStage", - "KregStage", -] +__all__ = ["Stage", "ModelStage", "RoverStage", "SpxmodStage", "KregStage"] diff --git a/src/onemod/stage/base.py b/src/onemod/stage/base.py index 06a411c9..73bbc614 100644 --- a/src/onemod/stage/base.py +++ b/src/onemod/stage/base.py @@ -38,6 +38,24 @@ class Stage(BaseModel, ABC): output_validation : dict, optional Optional specification of output data validation. + Notes + ----- + * Private attributes that are defined automatically: + * `_dataif : `DataInterface` object for loading/dumping input, + created in `Pipeline.build()` or `Stage.from_json()`. + * `_module`: Path to custom stage definition, created in + `Stage.module` or `Stage.from_json()`. + * `_input`: `Input` object that organizes `Stage` input, created + in `Stage.input` or `Stage.from_json()`, modified by + `Stage.__call__()`. + * Private attributes that must be defined by class: + * `_required_input`, `_optional_input`, `_output`: Strings with + syntax "f{name}.{extension}". For example, "data.parquet". If + input/output is a directory instead of a file, exclude the + extension. For example, "submodels". + * `_skip`: Methods that the stage does not implement (e.g., 'fit' + or 'predict'). + """ model_config = ConfigDict(validate_assignment=True) @@ -46,13 +64,13 @@ class Stage(BaseModel, ABC): config: StageConfig input_validation: dict[str, Data] = Field(default_factory=dict) output_validation: dict[str, Data] = Field(default_factory=dict) - _dataif: DataInterface | None = None # set by Pipeline.build, from_json - _module: Path | None = None # set by from_json - _skip: set[str] = set() # defined by class - _input: Input | None = None # set by __call__, from_json - _required_input: set[str] = set() # name.extension, defined by class - _optional_input: set[str] = set() # name.extension, defined by class - _output: set[str] = set() # name.extension, defined by class + _dataif: DataInterface | None = None + _module: Path | None = None + _input: Input | None = None + _required_input: set[str] = set() + _optional_input: set[str] = set() + _output: set[str] = set() + _skip: set[str] = set() @property def dataif(self) -> DataInterface: @@ -60,12 +78,19 @@ def dataif(self) -> DataInterface: Examples -------- - * Load config: stage.dataif.load(key="config") - * Load input: stage.dataif.load(key=f"{input_name}") - * Load output: - stage.dataif.load(f"{output_name}.{output_extension}", key="output") - * Dump output: - stage.dataif.dump(output, f"{output_name}.{output_extension}", key="output") + Load input file: + * _requred_input: {"data.parquet"} + * data = self.dataif.load(key="data") + + Load file from input directory: + * _required_input: {"submodels"} + * model = self.dataif.load(f"model_{subset_id}.pkl", key="submodels") + + Load output file: + * subsets = self.dataif.load("subsets.csv", key="output") + + Dump output file: + * self.dataif.dump(f"submodels/model_{subset_id}.pkl", key="output") """ if self._dataif is None: @@ -73,9 +98,21 @@ def dataif(self) -> DataInterface: return self._dataif def set_dataif(self, config_path: Path | str) -> None: - if self.input is None: - return + """Set stage data interface. + Parameters + ---------- + config_path : Path or str + Path to config file. + + Notes + ----- + * This method is called in Pipeline.build. + * This method assumes the pipeline's data flow has already been + defined (i.e., if the stage's input is changed after pipeline + is built, the data interface will not contain the new input). + + """ directory = Path(config_path).parent self._dataif = DataInterface( directory=directory, @@ -86,9 +123,10 @@ def set_dataif(self, config_path: Path | str) -> None: if isinstance(item_value, Path): self._dataif.add_path(item_name, item_value) elif isinstance(item_value, Data): - self._dataif.add_path( - item_name, directory / item_value.stage / item_value.path - ) + item_value.path = directory / item_value.stage / item_value.path + self._dataif.add_path(item_name, item_value.path) + for item_name, item_value in self.output.items.items(): + item_value.path = directory / self.name / item_value.path if not (directory / self.name).exists(): (directory / self.name).mkdir() @@ -121,8 +159,12 @@ def input(self) -> Input | None: def output(self) -> Output: output_items = {} for item in self._output: - item_name = item.split(".")[0] # remove extension - output_items[item_name] = Data(stage=self.name, path=Path(item)) + item_specs = item.split(".") + item_name = item_specs[0] + item_type = "directory" if len(item_specs) == 1 else item_specs[1] + output_items[item_name] = Data( + stage=self.name, path=Path(item), format=item_type + ) return Output(stage=self.name, items=output_items) @property @@ -166,8 +208,8 @@ def from_json(cls, config_path: Path | str, stage_name: str) -> Stage: stage._module = stage_config["module"] if hasattr(stage, "apply_stage_specific_config"): stage.apply_stage_specific_config(stage_config) - if "input" in stage_config: - stage(**stage_config["input"]) + if (input := stage_config.get("input")) is not None: + stage(**input) stage.set_dataif(config_path) return stage @@ -206,10 +248,19 @@ def evaluate( `predict` is called. """ + if method == "collect": + raise ValueError( + "Method 'collect' can only be called for 'ModelStage' objects" + ) + if method in self.skip: - warnings.warn(f"{self.name} skips the '{method}' method") + warnings.warn(f"'{self.name}' stage skips the '{method}' method") return + method = method if hasattr(self, method) else "run" + + self.input.check_exists() + if backend == "jobmon": from onemod.backend.jobmon_backend import evaluate_with_jobmon @@ -295,12 +346,13 @@ def run(self, *args, **kwargs) -> None: @validate_call def __call__(self, **input: Data | Path) -> Output: """Define stage dependencies.""" + # FIXME: Update data interface if it exists? self.input.check_missing({**self.input.items, **input}) self.input.update(input) return self.output def __repr__(self) -> str: - return f"{self.type}({self.name})" + return f"{self.type}(name={self.name})" class ModelStage(Stage, ABC): @@ -313,8 +365,9 @@ class ModelStage(Stage, ABC): Model stages can also be run for different parameter combinations using the `crossby` attribute. For example, a single stage could be run for various hyperparameter values, and then the results could be - combined into an ensemble. Any parameter in config.crossable_params - can be specified as either a single value or a list of values. + combined into an ensemble. Any parameter in + `config.crossable_params` can be specified as either a single value + or a list of values. When a model stage method is evaluated, all submodels (identified by their `subset_id` and `param_id`) are evaluated, and then, if @@ -335,15 +388,45 @@ class ModelStage(Stage, ABC): output_validation : dict, optional Description. + Notes + ----- + * Private attributes that are defined automatically: + * `_dataif : `DataInterface` object for loading/dumping input, + created `Stage.set_dataif()` and called by `Pipeline.build()` + or `Stage.from_json()`. + * `_module`: Path to custom stage definition, created in + `Stage.module` or `Stage.from_json()`. + * `_input`: `Input` object that organizes `Stage` input, created + in `Stage.input` or `Stage.from_json()`, modified by + `Stage.__call__()`. + * `_crossby`: Names of parameters using multiple values. Created + in `ModelStage.create_stage_params()` AND CALLED BY + * `_subset_ids`: Data subset ID values. Created in + `ModelStage.create_stage_subsets()` and CALLED BY + * `_param_ids`: Parameter set ID values. Created in + `ModelStage.create_stage_params()` and CALLED BY + * Private attributes that must be defined by class: + * `_required_input`, `_optional_input`, `_output`: Strings with + syntax "f{name}.{extension}". For example, "data.parquet" + (required to use `groupby` attribute). If input/output is a + directory instead of a file, exclude the extension. For example, + "submodels". + * `_skip`: Methods that the stage does not implement (e.g., 'fit' + or 'predict'). + * `_collect_after`: Methods that create submodel results (e.g., + data subsets or parameter sets) that must be collected. For + example, collect submodel results for parameter sets to select + best parameter values after the 'fit' method, or collect + submodel results for data subsets after the 'predict' method. + """ config: ModelConfig groupby: set[str] | None = None - _crossby: set[str] | None = None # set by create_stage_params - _subset_ids: set[int] = set() # set by create_stage_subsets - _param_ids: set[int] = set() # set by create_stage_params - _required_input: set[str] = set() # data required for groupby - _collect_after: set[str] = set() # defined by class + _crossby: set[str] | None = None + _subset_ids: set[int] = set() + _param_ids: set[int] = set() + _collect_after: set[str] = set() @computed_property def crossby(self) -> set[str] | None: @@ -398,15 +481,17 @@ def create_stage_subsets( return_type="polars_lazyframe", ) - subsets_df = create_subsets(self.groupby, lf) + subsets_df = create_subsets(self.groupby, lf.collect().to_pandas()) self._subset_ids = set(subsets_df["subset_id"].to_list()) self.dataif.dump(subsets_df, "subsets.csv", key="output") - def get_stage_subset(self, subset_id: int) -> DataFrame: - """Get stage data subset.""" + def get_stage_subset( + self, subset_id: int, *fparts: str, key: str = "data", **options + ) -> DataFrame: + """Filter data by stage subset_id.""" return get_subset( - self.dataif.load(key="data"), + self.dataif.load(*fparts, key=key, **options), self.dataif.load("subsets.csv", key="output"), subset_id, ) @@ -447,11 +532,11 @@ def evaluate( Other Parameters ---------------- subset_id : int, optional - Submodel data subset ID. Ignored if `backend` is 'jobmon'. + Submodel data subset ID. Ignored if `backend` is 'jobmon' or + method is `collect`. param_id : int, optional - Submodel parameter set ID. Ignored if `backend` is 'jobmon'. - config : Path or str, optional - Path to config file. Required if `backend` is 'jobmon'. + Submodel parameter set ID. Ignored if `backend` is 'jobmon' + or method is `collect`. cluster : str, optional Cluster name. Required if `backend` is 'jobmon'. resources : Path or str, optional @@ -472,7 +557,15 @@ def evaluate( if method in self.skip: warnings.warn(f"{self.name} skips the '{method}' method") return + + self.input.check_exists() + if backend == "jobmon": + if method == "collect": + raise ValueError( + "Method 'collect' cannot be used with 'jobmon' backend" + ) + from onemod.backend.jobmon_backend import evaluate_with_jobmon evaluate_with_jobmon(model=self, method=method, **kwargs) @@ -512,7 +605,7 @@ def collect(self) -> None: raise NotImplementedError("Subclasses must implement this method.") def __repr__(self) -> str: - stage_str = f"{self.type}({self.name}" + stage_str = f"{self.type}(name={self.name}" if self.groupby is not None: stage_str += f", groupby={self.groupby}" if self.crossby is not None: diff --git a/src/onemod/stage/data_stages/__init__.py b/src/onemod/stage/data_stages/__init__.py index 94c2b0f7..e69de29b 100644 --- a/src/onemod/stage/data_stages/__init__.py +++ b/src/onemod/stage/data_stages/__init__.py @@ -1,3 +0,0 @@ -from onemod.stage.data_stages.preprocessing_stage import PreprocessingStage - -__all__ = ["PreprocessingStage"] diff --git a/src/onemod/stage/data_stages/preprocessing_stage.py b/src/onemod/stage/data_stages/preprocessing_stage.py deleted file mode 100644 index d594695d..00000000 --- a/src/onemod/stage/data_stages/preprocessing_stage.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Preprocessing stage.""" - -from onemod.config import PreprocessingConfig -from onemod.stage import Stage - - -class PreprocessingStage(Stage): - """Preprocessing stage.""" - - config: PreprocessingConfig - _skip: set[str] = {"predict"} - _required_input: set[str] = {"data.parquet"} - _optional_input: set[str] = { - "age_metadata.parquet", - "location_metadata.parquet", - } - _output: set[str] = {"data.parquet"} - - def run(self) -> None: - """Run preprocessing stage.""" - print(f"running {self.name}") diff --git a/src/onemod/stage/model_stages/rover_stage.py b/src/onemod/stage/model_stages/rover_stage.py index c644fd2d..e41ef606 100644 --- a/src/onemod/stage/model_stages/rover_stage.py +++ b/src/onemod/stage/model_stages/rover_stage.py @@ -8,6 +8,8 @@ 'age_group_id', submodels will be fit separately for each age/sex pair, and covariates will be selected separately for each sex. +TODO: Update pandas commands to polars + """ import warnings @@ -68,7 +70,7 @@ def fit(self, subset_id: int, *args, **kwargs) -> None: # Fit submodel submodel.fit( - data=data, + data=data.to_pandas(), strategies=list(self.config.strategies), top_pct_score=self.config.top_pct_score, top_pct_learner=self.config.top_pct_learner, @@ -121,14 +123,18 @@ def collect(self) -> None: def _get_rover_summaries(self) -> pd.DataFrame: """Concatenate rover coefficient summaries.""" - subsets = self.dataif.load("subsets.csv", key="output") + subsets = self.dataif.load( + "subsets.csv", key="output", return_type="pandas_dataframe" + ) # Collect coefficient summaries summaries = [] for subset_id in self.subset_ids: try: summary = self.dataif.load( - f"submodels/{subset_id}/summary.csv", key="output" + f"submodels/{subset_id}/summary.csv", + key="output", + return_type="pandas_dataframe", ) summary["subset_id"] = subset_id summaries.append(summary) diff --git a/src/onemod/stage/model_stages/spxmod_stage.py b/src/onemod/stage/model_stages/spxmod_stage.py index 2a02cf2c..9cf69459 100644 --- a/src/onemod/stage/model_stages/spxmod_stage.py +++ b/src/onemod/stage/model_stages/spxmod_stage.py @@ -12,12 +12,12 @@ TODO: Update for new spxmod version with splines TODO: Make selected_covs more flexible, not hard-coded to age_mid TODO: Implement priors input +TODO: Update pandas commands to polars """ import numpy as np import pandas as pd -import polars as pl from loguru import logger from spxmod.model import XModel from xspline import XSpline @@ -193,6 +193,7 @@ def collect(self) -> None: self.dataif.load( f"submodels/{subset_id}/predictions.parquet", key="output", + return_type="pandas_dataframe", ) for subset_id in self.subset_ids ] @@ -209,11 +210,7 @@ def _get_submodel_data( """Load submodel data.""" # Load data and filter by subset logger.info(f"Loading {self.name} data subset {subset_id}") - data_subset = self.get_stage_subset( - subset_id - ) # TODO: Potentially add pandas as backend to internal methods if users will be using them in their custom stages - if isinstance(data_subset, pl.DataFrame): - data = data_subset.to_pandas() + data = self.get_stage_subset(subset_id).to_pandas() # Add spline basis to data spline_vars = [] @@ -234,6 +231,7 @@ def _get_submodel_data( columns=list(self.config["id_columns"]) + [self.config["prediction_column"]], key="offset", + return_type="pandas_dataframe", ).rename(columns={self.config["prediction_column"]: "offset"}), on=list(self.config["id_columns"]), how="left", @@ -274,7 +272,7 @@ def _get_covs(self, subset_id: int) -> list[str]: subset_id, id_names=pipeline_groupby, ) - selected_covs = selected_covs["cov"].tolist() + selected_covs = selected_covs["cov"].to_list() # Get fixed covariates fixed_covs = self.get_field( diff --git a/src/onemod/utils/subsets.py b/src/onemod/utils/subsets.py index 3849f337..38037123 100644 --- a/src/onemod/utils/subsets.py +++ b/src/onemod/utils/subsets.py @@ -1,18 +1,19 @@ """Functions for working with groupby and subsets.""" +import pandas as pd import polars as pl -def create_subsets(groupby: set[str], lf: pl.LazyFrame) -> pl.DataFrame: +def create_subsets(groupby: set[str], data: pd.DataFrame) -> pd.DataFrame: """Create subsets from groupby.""" - subsets = ( - lf.select(list(groupby)) - .unique() - .with_row_index(name="subset_id") - .collect() + sorted_groupby = sorted(groupby) + groups = data.groupby(sorted_groupby) + subsets = pd.DataFrame( + [subset for subset in groups.groups.keys()], columns=sorted_groupby ) - - return subsets + subsets.sort_values(by=sorted_groupby) + subsets["subset_id"] = subsets.index + return subsets[["subset_id", *sorted_groupby]] def get_subset( diff --git a/tests/e2e/test_e2e_onemod_example1_sequential.py b/tests/e2e/test_e2e_onemod_example1_sequential.py index 06c43195..c81e3044 100644 --- a/tests/e2e/test_e2e_onemod_example1_sequential.py +++ b/tests/e2e/test_e2e_onemod_example1_sequential.py @@ -3,9 +3,8 @@ import pytest from onemod import Pipeline -from onemod.config import PreprocessingConfig from onemod.dtypes import Data -from onemod.stage import KregStage, PreprocessingStage, RoverStage, SpxmodStage +from onemod.stage import KregStage, RoverStage, SpxmodStage @pytest.mark.skip( @@ -13,25 +12,11 @@ ) @pytest.mark.e2e @pytest.mark.requires_data -def test_e2e_onemod_example1_sequential(small_input_data, test_base_dir): +def test_e2e_onemod_example1_sequential(test_base_dir): """ End-to-end test for a the OneMod example1 pipeline. """ - # Define Stages - preprocessing = PreprocessingStage( - name="1_preprocessing", - config=PreprocessingConfig(data=small_input_data), - input_validation=dict(data=small_input_data), - output_validation=dict( - data=Data( - stage="1_preprocessing", - path=Path(test_base_dir, "data", "preprocessed_data.parquet"), - format="parquet", - ) - ), - ) - covariate_selection = RoverStage( name="2_covariate_selection", config=dict(cov_exploring=["cov1", "cov2", "cov3"]), @@ -147,30 +132,17 @@ def test_e2e_onemod_example1_sequential(small_input_data, test_base_dir): # Add stages dummy_pipeline.add_stages( - [ - preprocessing, - covariate_selection, - global_model, - location_model, - smoothing, - ] + [covariate_selection, global_model, location_model, smoothing] ) # Define dependencies - preprocessing(data=Path(test_base_dir, "data", "input_data.parquet")) - covariate_selection(data=preprocessing.output["data"]) + data = Path(test_base_dir, "data", "input_data.parquet") + covariate_selection(data=data) global_model( - data=preprocessing.output["data"], - selected_covs=covariate_selection.output["selected_covs"], - ) - location_model( - data=preprocessing.output["data"], - offset=global_model.output["predictions"], - ) - smoothing( - data=preprocessing.output["data"], - offset=location_model.output["predictions"], + data=data, selected_covs=covariate_selection.output["selected_covs"] ) + location_model(data=data, offset=global_model.output["predictions"]) + smoothing(data=data, offset=location_model.output["predictions"]) # Execute stages in sequence dummy_pipeline.evaluate(backend="local") diff --git a/tests/helpers/dummy_pipeline.py b/tests/helpers/dummy_pipeline.py index f3b67a6d..d3def417 100644 --- a/tests/helpers/dummy_pipeline.py +++ b/tests/helpers/dummy_pipeline.py @@ -11,9 +11,9 @@ from onemod.config import ( KregConfig, PipelineConfig, - PreprocessingConfig, RoverConfig, SpxmodConfig, + StageConfig, ) from onemod.constraints import Constraint from onemod.dtypes import ColumnSpec, Data @@ -23,7 +23,7 @@ def setup_dummy_pipeline(test_input_data, test_base_dir): """Set up a dummy pipeline, including specific dummy stages.""" preprocessing = DummyPreprocessingStage( name="preprocessing", - config=PreprocessingConfig(), + config=StageConfig(), input_validation={ "data": Data( stage="input_data", diff --git a/tests/helpers/dummy_stages.py b/tests/helpers/dummy_stages.py index 28639ef8..557856ad 100644 --- a/tests/helpers/dummy_stages.py +++ b/tests/helpers/dummy_stages.py @@ -4,9 +4,9 @@ from onemod.config import ( KregConfig, ModelConfig, - PreprocessingConfig, RoverConfig, SpxmodConfig, + StageConfig, ) from onemod.stage import ModelStage, Stage @@ -115,7 +115,7 @@ def get_log(self) -> list[str]: class DummyPreprocessingStage(Stage): """Preprocessing stage.""" - config: PreprocessingConfig + config: StageConfig _skip: set[str] = {"predict"} _required_input: set[str] = {"data.parquet"} _optional_input: set[str] = { diff --git a/tests/unit/io/test_input.py b/tests/unit/io/test_input.py index cb74550a..eb405458 100644 --- a/tests/unit/io/test_input.py +++ b/tests/unit/io/test_input.py @@ -238,7 +238,7 @@ def test_missing_self(): with pytest.raises(KeyError) as error: test_input.check_missing() observed = str(error.value).strip('"') - expected = f"{test_input.stage} missing required input: " + expected = f"Stage '{test_input.stage}' missing required input: " assert ( observed == expected + "['data', 'covariates']" or observed == expected + "['covariates', 'data']" @@ -257,13 +257,21 @@ def test_missing_items(): } ) observed = str(error.value).strip('"') - expected = f"{test_input.stage} missing required input: " + expected = f"Stage '{test_input.stage}' missing required input: " assert ( observed == expected + "['data', 'covariates']" or observed == expected + "['covariates', 'data']" ) +# TODO: Write tests for input.check_exists() +# - input with empty items +# - input with nonemtpy items that don't exist +# - ignore missing/nonexisting item if not in item_names +# - ignore missing/nonexisting item if dependency not in upstream_stages +# - upstream_stages arg not used if item_names passed + + @pytest.mark.unit def test_serialize(): test_input = get_input(VALID_ITEMS)