Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/minor_updates #115

Merged
merged 35 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
32fa376
change model config object to path
kels271828 Nov 22, 2024
a1f53c0
add pandas.DataFrame support for dump
kels271828 Nov 25, 2024
3b6fc97
comment out some things that need to be fixed
kels271828 Nov 25, 2024
59c06ec
add support for directory input vs. path
kels271828 Nov 25, 2024
51f814c
rename stage vars for more clarity, remove option to run collect via …
kels271828 Nov 25, 2024
cdc1d69
change stages list to set, update docstring
kels271828 Nov 25, 2024
5db45ec
remove stage_name arg, add stages arg, remove error message
kels271828 Nov 25, 2024
a10e9da
update docstrings, add support for input directories
kels271828 Nov 26, 2024
fd781ac
add error message about collect method
kels271828 Dec 2, 2024
50db1ce
update rover and spxmod stages for polars
kels271828 Dec 2, 2024
6def890
remove config from other pars, remove unused import
kels271828 Dec 2, 2024
d51d0c3
pass stages to model.evaluate
kels271828 Dec 2, 2024
8972c56
add description of attributes that are created automatically or must …
kels271828 Dec 3, 2024
ff7edad
add name= to pipeline and stage repr func
kels271828 Dec 5, 2024
a0cfbb4
add name= to Stage.repr
kels271828 Dec 5, 2024
c979ccd
add docstring for Input.check_missing
kels271828 Dec 5, 2024
a87bacf
add check_exists function to Input class
kels271828 Dec 5, 2024
f037657
replace check_upstream_output_exists with stage.input.check_exists
kels271828 Dec 5, 2024
3c8e713
Add stages arg to pipeline.get_execution_order
kels271828 Dec 5, 2024
a9f0e38
add check that input exists to stage evaluate methods
kels271828 Dec 5, 2024
542f91b
add mkdir arg to DataIO
kels271828 Dec 5, 2024
282a606
add absolute paths to input/output in set_dataif
kels271828 Dec 5, 2024
df21b01
sort data subsets
kels271828 Dec 5, 2024
321df47
bugfix: fix typo and add logic when name is not in the upstream stages
zhengp0 Dec 5, 2024
ee6c26f
sort import
zhengp0 Dec 5, 2024
48fbc16
bugfix: iherit global config when load_stage
zhengp0 Dec 5, 2024
e4250f1
revert+bugfix: revert back to inherit config in the function, and gi…
zhengp0 Dec 6, 2024
fcd40a5
formatting: ruff + slurm cluster is messed up
zhengp0 Dec 6, 2024
ba04096
reorganize imports
kels271828 Dec 6, 2024
476391c
remove preprocessing stage
kels271828 Dec 6, 2024
8981a31
add arguments to get_stage_subset
kels271828 Dec 6, 2024
0e9cc37
remove PreprocessingConfig from test helpers
kels271828 Dec 6, 2024
ca29d90
fix expected error message
kels271828 Dec 6, 2024
dad8af2
fixing some linting and mypy errors
kels271828 Dec 6, 2024
a81b0c5
remove more preprocessing, order imports, add todo
kels271828 Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 9 additions & 50 deletions examples/example_pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,6 @@
"sex_id"
],
"stages": {
"preprocessing": {
"name": "preprocessing",
"config": {
"id_columns": [
"year_id",
"sex_id",
"age_group_id",
"location_id"
],
"model_type": "binomial",
"observation_column": "obs",
"prediction_column": "pred",
"weight_column": "weights",
"test_column": "test",
"holdout_columns": [],
"coef_bounds": {}
},
"input": {
"data": "/path/to/data.parquet"
},
"type": "PreprocessingStage"
},
"covariate_selection": {
"name": "covariate_selection",
"config": {
Expand Down Expand Up @@ -80,10 +58,7 @@
],
"crossby": [],
"input": {
"data": {
"stage": "preprocessing",
"path": "/path/to/experiment/directory/preprocessing/data.parquet"
}
"data": "/path/to/data.parquet"
},
"type": "RoverStage"
},
Expand Down Expand Up @@ -179,10 +154,7 @@
],
"crossby": [],
"input": {
"data": {
"stage": "preprocessing",
"path": "/path/to/experiment/directory/preprocessing/data.parquet"
},
"data": "/path/to/data.parquet",
"offset": {
"stage": "global_model",
"path": "/path/to/experiment/directory/global_model/predictions.parquet"
Expand Down Expand Up @@ -233,10 +205,7 @@
],
"crossby": [],
"input": {
"data": {
"stage": "preprocessing",
"path": "/path/to/experiment/directory/preprocessing/data.parquet"
},
"data": "/path/to/data.parquet",
"offset": {
"stage": "location_model",
"path": "/path/to/experiment/directory/location_model/predictions.parquet"
Expand Down Expand Up @@ -274,10 +243,7 @@
],
"module": "/path/to/custom_stage.py",
"input": {
"observations": {
"stage": "preprocessing",
"path": "/path/to/experiment/directory/preprocessing/data.parquet"
},
"observations": "/path/to/data.parquet",
"predictions": {
"stage": "smoothing",
"path": "/path/to/experiment/directory/smoothing/predictions.parquet"
Expand All @@ -287,25 +253,18 @@
}
},
"dependencies": {
"preprocessing": [],
"covariate_selection": [
"preprocessing"
],
"covariate_selection": [],
"global_model": [
"covariate_selection",
"preprocessing"
"covariate_selection"
],
"location_model": [
"global_model",
"preprocessing"
"global_model"
],
"smoothing": [
"location_model",
"preprocessing"
"location_model"
],
"custom_stage": [
"smoothing",
"preprocessing"
"smoothing"
]
}
}
27 changes: 7 additions & 20 deletions examples/pipeline_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
from custom_stage import CustomStage

from onemod import Pipeline
from onemod.stage import KregStage, PreprocessingStage, RoverStage, SpxmodStage
from onemod.stage import KregStage, RoverStage, SpxmodStage


def create_pipeline(directory: str, data: str):
# Create stages
# Stage-specific validation specifications go here.
# Stage classes may also implement default validation specifications.
preprocessing = PreprocessingStage(name="preprocessing", config={})
covariate_selection = RoverStage(
name="covariate_selection",
config={"cov_exploring": ["cov1", "cov2", "cov3"]},
Expand Down Expand Up @@ -68,7 +67,6 @@ def create_pipeline(directory: str, data: str):
# Add stages
example_pipeline.add_stages(
[
preprocessing,
covariate_selection,
global_model,
location_model,
Expand All @@ -78,24 +76,13 @@ def create_pipeline(directory: str, data: str):
)

# Define dependencies
preprocessing(data=example_pipeline.data)
covariate_selection(data=preprocessing.output["data"])
covariate_selection(data=data)
global_model(
data=preprocessing.output["data"],
selected_covs=covariate_selection.output["selected_covs"],
)
location_model(
data=preprocessing.output["data"],
offset=global_model.output["predictions"],
)
smoothing(
data=preprocessing.output["data"],
offset=location_model.output["predictions"],
)
custom_stage(
observations=preprocessing.output["data"],
predictions=smoothing.output["predictions"],
data=data, selected_covs=covariate_selection.output["selected_covs"]
)
location_model(data=data, offset=global_model.output["predictions"])
smoothing(data=data, offset=location_model.output["predictions"])
custom_stage(observations=data, predictions=smoothing.output["predictions"])

# Serialize pipeline
example_pipeline.to_json()
Expand All @@ -111,7 +98,7 @@ def create_pipeline(directory: str, data: str):

# Fit specific stages
example_pipeline.evaluate(
method="fit", stages=["preprocessing", "covariate_selection"]
method="fit", stages=["covariate_selection", "global_model"]
)

# Predict for specific locations
Expand Down
67 changes: 38 additions & 29 deletions src/onemod/backend/jobmon_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import sys
from pathlib import Path
from typing import Any, Literal, cast
from typing import Literal

import yaml
from jobmon.client.api import Tool
Expand Down Expand Up @@ -96,7 +96,7 @@ def get_tasks(
name=f"{stage.name}_{method}",
upstream_tasks=upstream_tasks,
max_attempts=1,
**cast(dict[str, Any], task_args),
**task_args,
)
]

Expand Down Expand Up @@ -141,8 +141,8 @@ def get_command_template(
"{python}"
f" {Path(__file__).parents[1] / 'main.py'}"
" --config {config}"
f" --stage_name {stage_name}"
f" --method {method}"
f" --stages {stage_name}"
)

for node_arg in node_args:
Expand All @@ -167,21 +167,38 @@ def get_task_resources(
def get_upstream_tasks(
stage: Stage,
method: Literal["run", "fit", "predict"],
stages: dict[str, Stage],
stage_dict: dict[str, Stage],
task_dict: dict[str, list[Task]],
specified_stages: set[str] | None = None,
stages: set[str] | None = None,
) -> list[Task]:
"""Get upstream stage tasks."""
"""Get upstream stage tasks.

Parameters
----------
stage : Stage
Current stage.
method : str
Name of method to evaluate.
stage_dict : dict[str, Stage]
Dictionary of all upstream pipeline stages.
task_dict : dict[str, list[Task]]
Dictionary of all tasks being evaluated.
stages : set[str] or None, optional
Name of all pipeline stages being evaluated.

Returns
-------
list of Task
Upstream tasks for current stage.

"""
upstream_tasks = []

for upstream_name in stage.dependencies:
if (
specified_stages is not None
and upstream_name not in specified_stages
):
if stages is not None and upstream_name not in stages:
continue

upstream = stages[upstream_name]
upstream = stage_dict[upstream_name]
if method not in upstream.skip:
if (
isinstance(upstream, ModelStage)
Expand All @@ -200,8 +217,8 @@ def evaluate_with_jobmon(
cluster: str,
resources: Path | str,
python: Path | str | None = None,
method: Literal["run", "fit", "predict", "collect"] = "run",
stages: list[str] | None = None,
method: Literal["run", "fit", "predict"] = "run",
stages: set[str] | None = None,
) -> None:
"""Evaluate pipeline or stage method with Jobmon.

Expand All @@ -219,7 +236,8 @@ def evaluate_with_jobmon(
method : str, optional
Name of method to evalaute. Default is 'run'.
stages : set of str or None, optional
Set of stage names to evaluate. Default is None.
Names of stages to evaluate if `model` is a pipeline instance.
If None, evaluate entire pipeline. Default is None.

TODO: Optional stage-specific Python environments
TODO: User-defined max_attempts
Expand All @@ -231,34 +249,25 @@ def evaluate_with_jobmon(

# Set config
if isinstance(model, Stage):
model_config = model.dataif.load(key="config")
config_path = model.dataif.get_path("config")
elif isinstance(model, Pipeline):
model_config = model.config
config_path = model.directory / f"{model.name}.json"

task_args: dict[str, str] = {
"python": str(python or sys.executable),
"config": str(model_config),
"config": str(config_path),
}

# Create tasks
if isinstance(model, Pipeline):
tasks = []
task_dict: dict[str, list[Task]] = {}

if stages is None:
stages = model.get_execution_order()

for stage_name in stages:
for stage_name in model.get_execution_order(stages):
stage = model.stages[stage_name]
if (
method not in stage.skip and method != "collect"
): # TODO: handle collect
if method not in stage.skip:
upstream_tasks = get_upstream_tasks(
stage,
method,
model.stages,
task_dict,
specified_stages=set(stages),
stage, method, model.stages, task_dict, stages
)
task_dict[stage_name] = get_tasks(
tool, resources, stage, method, task_args, upstream_tasks
Expand Down
12 changes: 5 additions & 7 deletions src/onemod/backend/local_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
def evaluate_local(
model: Pipeline | Stage,
method: Literal["run", "fit", "predict"] = "run",
stages: list[str] | None = None,
stages: set[str] | None = None,
**kwargs,
) -> None:
"""Evaluate pipeline or stage method locally.
Expand All @@ -24,22 +24,20 @@ def evaluate_local(
Pipeline or stage instance.
method : str, optional
Name of method to evaluate. Default is 'run'.
stages : set of str or None, optional
Names of stages to evaluate if `model` is a pipeline instance.
If None, evaluate entire pipeline. Default is None.

Other Parameters
----------------
subset_id : int, optional
Submodel data subset ID. Only used for model stages.
param_id : int, optional
Submodel parameter set ID. Only used for model stages.
stages : list of str or None, optional
List of stage names to evaluate. Default is None.

"""
if isinstance(model, Pipeline):
if stages is None:
stages = model.get_execution_order()

for stage_name in stages:
for stage_name in model.get_execution_order(stages):
stage = model.stages[stage_name]
if method not in stage.skip:
_evaluate_stage(stage, method)
Expand Down
2 changes: 0 additions & 2 deletions src/onemod/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from onemod.config.base import Config, ModelConfig, PipelineConfig, StageConfig
from onemod.config.data_config import PreprocessingConfig
from onemod.config.model_config import KregConfig, RoverConfig, SpxmodConfig

__all__ = [
"Config",
"PipelineConfig",
"StageConfig",
"ModelConfig",
"PreprocessingConfig",
"RoverConfig",
"SpxmodConfig",
"KregConfig",
Expand Down
3 changes: 0 additions & 3 deletions src/onemod/config/data_config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from onemod.config.data_config.preprocessing_config import PreprocessingConfig

__all__ = ["PreprocessingConfig"]
9 changes: 0 additions & 9 deletions src/onemod/config/data_config/preprocessing_config.py

This file was deleted.

Loading
Loading