Skip to content

Commit

Permalink
Change recipe format to just use steps, and update infra for running …
Browse files Browse the repository at this point in the history
…them

I've updated one recipe for now, but they will all have to be converted.
  • Loading branch information
jfrost-mo committed Aug 6, 2024
1 parent 6993791 commit 9567afb
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 119 deletions.
2 changes: 2 additions & 0 deletions cset-workflow/includes/plot_spatial_surface_model_field.cylc
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{% if PLOT_SPATIAL_SURFACE_MODEL_FIELD %}
{% for model in models %}
{% for model_field in SURFACE_MODEL_FIELDS %}
[runtime]
[[generic_spatial_plot_time_series_{{model_field}}]]
inherit = PROCESS
[[[environment]]]
CSET_RECIPE_NAME = "generic_surface_spatial_plot_sequence.yaml"
CSET_ADDOPTS = "--VARNAME={{model_field}}"
MODEL_NUMBER = {{model["number"]}}
{% endfor %}
{% endif %}
32 changes: 9 additions & 23 deletions src/CSET/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def main():
"-i",
"--input-dir",
type=Path,
required=True,
help="directory containing input data",
)
parser_bake.add_argument(
Expand All @@ -66,13 +67,6 @@ def main():
required=True,
help="recipe file to read",
)
bake_step_control = parser_bake.add_mutually_exclusive_group()
bake_step_control.add_argument(
"--parallel-only", action="store_true", help="only run parallel steps"
)
bake_step_control.add_argument(
"--collate-only", action="store_true", help="only run collation steps"
)
parser_bake.add_argument(
"-s", "--style-file", type=Path, help="colour bar definition to use"
)
Expand Down Expand Up @@ -194,24 +188,16 @@ def calculate_loglevel(args) -> int:

def _bake_command(args, unparsed_args):
from CSET._common import parse_variable_options
from CSET.operators import execute_recipe_collate, execute_recipe_parallel
from CSET.operators import execute_recipe

recipe_variables = parse_variable_options(unparsed_args)
if not args.collate_only:
# Input dir is needed for parallel steps, but not collate steps.
if not args.input_dir:
raise ArgumentError("the following arguments are required: -i/--input-dir")
execute_recipe_parallel(
args.recipe,
args.input_dir,
args.output_dir,
recipe_variables,
args.style_file,
)
if not args.parallel_only:
execute_recipe_collate(
args.recipe, args.output_dir, recipe_variables, args.style_file
)
execute_recipe(
args.recipe,
args.input_dir,
args.output_dir,
recipe_variables,
args.style_file,
)


def _graph_command(args, unparsed_args):
Expand Down
14 changes: 7 additions & 7 deletions src/CSET/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ def parse_recipe(recipe_yaml: Union[Path, str], variables: dict = None):
except ruamel.yaml.parser.ParserError as err:
raise ValueError("ParserError: Invalid YAML") from err

logging.debug(recipe)
logging.debug("Recipe before templating:\n%s", recipe)
check_recipe_has_steps(recipe)

if variables is not None:
logging.debug("Recipe variables: %s", variables)
recipe = template_variables(recipe, variables)

logging.debug("Recipe after templating:\n%s", recipe)
return recipe


Expand All @@ -103,16 +104,15 @@ def check_recipe_has_steps(recipe: dict):
KeyError
If needed recipe variables are not supplied.
"""
parallel_steps_key = "parallel"
if not isinstance(recipe, dict):
raise TypeError("Recipe must contain a mapping.")
if "parallel" not in recipe:
raise ValueError("Recipe must contain a 'parallel' key.")
if "steps" not in recipe:
raise ValueError("Recipe must contain a 'steps' key.")
try:
if len(recipe[parallel_steps_key]) < 1:
raise ValueError("Recipe must have at least 1 parallel step.")
if len(recipe["steps"]) < 1:
raise ValueError("Recipe must have at least 1 step.")
except TypeError as err:
raise ValueError("'parallel' key must contain a sequence of steps.") from err
raise ValueError("'steps' key must contain a sequence of steps.") from err


def slugify(s: str) -> str:
Expand Down
42 changes: 8 additions & 34 deletions src/CSET/_workflow_utils/run_cset_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ def recipe_id():
)
sys.exit(1)
id = p.stdout.decode(sys.stdout.encoding).strip()
return id
model_number = os.environ["MODEL_NUMBER"]
return f"m{model_number}_{id}"


def output_directory():
Expand All @@ -118,7 +119,8 @@ def data_directory():
"""Get the input data directory for the cycle."""
share_directory = os.environ["CYLC_WORKFLOW_SHARE_DIR"]
cycle_point = os.environ["CYLC_TASK_CYCLE_POINT"]
return f"{share_directory}/cycle/{cycle_point}/data"
model_number = os.environ["MODEL_NUMBER"]
return f"{share_directory}/cycle/{cycle_point}/data/{model_number}"


def create_diagnostic_archive(output_directory):
Expand Down Expand Up @@ -146,30 +148,7 @@ def add_to_diagnostic_index(output_directory, recipe_id):
append_to_index({category: {recipe_id: title}})


def parallel():
"""Process raw data in parallel."""
logging.info("Pre-processing data into intermediate form.")
try:
subprocess.run(
(
"cset",
"-v",
"bake",
f"--recipe={recipe_file()}",
f"--input-dir={data_directory()}",
f"--output-dir={output_directory()}",
f"--style-file={os.getenv('COLORBAR_FILE', '')}",
"--parallel-only",
),
check=True,
env=subprocess_env(),
)
except subprocess.CalledProcessError:
logging.error("cset bake exited non-zero while processing.")
sys.exit(1)


def collate():
def run_recipe_steps():
"""Collate processed data together and produce output plot.
If the intermediate directory doesn't exist then we are running a simple
Expand All @@ -186,25 +165,20 @@ def collate():
"-v",
"bake",
f"--recipe={recipe_file()}",
f"--input-dir={data_directory()}",
f"--output-dir={output_directory()}",
f"--style-file={os.getenv('COLORBAR_FILE', '')}",
"--collate-only",
),
check=True,
env=subprocess_env(),
)
except subprocess.CalledProcessError:
logging.error("cset bake exited non-zero while collating.")
sys.exit(1)
raise
create_diagnostic_archive(output_directory())
add_to_diagnostic_index(output_directory(), recipe_id())


def run():
"""Run workflow script."""
# Check if we are running in parallel or collate mode.
bake_mode = os.getenv("CSET_BAKE_MODE")
if bake_mode == "parallel":
parallel()
elif bake_mode == "collate":
collate()
run_recipe_steps()
50 changes: 6 additions & 44 deletions src/CSET/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ def _step_parser(step: dict, step_input: any) -> str:
def _run_steps(recipe, steps, step_input, output_directory: Path, style_file: Path):
"""Execute the steps in a recipe."""
original_working_directory = Path.cwd()
os.chdir(output_directory)
try:
os.chdir(output_directory)
logger = logging.getLogger()
diagnostic_log = logging.FileHandler(
filename="CSET.log", mode="w", encoding="UTF-8"
Expand All @@ -168,14 +168,14 @@ def _run_steps(recipe, steps, step_input, output_directory: Path, style_file: Pa
os.chdir(original_working_directory)


def execute_recipe_parallel(
def execute_recipe(
recipe_yaml: Union[Path, str],
input_directory: Path,
output_directory: Path,
recipe_variables: dict = None,
style_file: Path = None,
) -> None:
"""Parse and executes the parallel steps from a recipe file.
"""Parse and executes the steps from a recipe file.
Parameters
----------
Expand All @@ -202,51 +202,13 @@ def execute_recipe_parallel(
TypeError
The provided recipe is not a stream or Path.
"""
if recipe_variables is None:
recipe_variables = {}
recipe = parse_recipe(recipe_yaml, recipe_variables)
step_input = Path(input_directory).absolute()
# Create output directory, and an inter-cycle intermediate directory.
# Create output directory.
try:
(output_directory / "intermediate").mkdir(parents=True, exist_ok=True)
output_directory.mkdir(parents=True, exist_ok=True)
except (FileExistsError, NotADirectoryError) as err:
logging.error("Output directory is a file. %s", output_directory)
raise err
steps = recipe["parallel"]
steps = recipe["steps"]
_run_steps(recipe, steps, step_input, output_directory, style_file)


def execute_recipe_collate(
recipe_yaml: Union[Path, str],
output_directory: Path,
recipe_variables: dict = None,
style_file: Path = None,
) -> None:
"""Parse and execute the collation steps from a recipe file.
Parameters
----------
recipe_yaml: Path or str
Path to a file containing, or string of, a recipe's YAML describing the
operators that need running. If a Path is provided it is opened and
read.
output_directory: Path
Pathlike indicating desired location of output. Must already exist.
recipe_variables: dict
Dictionary of variables for the recipe.
Raises
------
ValueError
The recipe is not well formed.
TypeError
The provided recipe is not a stream or Path.
"""
if recipe_variables is None:
recipe_variables = {}
output_directory = Path(output_directory).resolve()
assert output_directory.is_dir()
recipe = parse_recipe(recipe_yaml, recipe_variables)
# If collate doesn't exist treat it as having no steps.
steps = recipe.get("collate", [])
_run_steps(recipe, steps, output_directory, output_directory, style_file)
12 changes: 1 addition & 11 deletions src/CSET/recipes/generic_surface_spatial_plot_sequence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ category: Quick Look
title: Surface $VARNAME
description: Extracts and plots the surface $VARNAME from a file.

parallel:
steps:
- operator: read.read_cube
constraint:
operator: constraints.combine_constraints
Expand All @@ -16,16 +16,6 @@ parallel:
operator: constraints.generate_level_constraint
coordinate: "pressure"
levels: []
validity_time_constraint:
operator: constraints.generate_time_constraint
time_start: $VALIDITY_TIME

- operator: write.write_cube_to_nc
filename: intermediate/surface_field

collate:
- operator: read.read_cube
filename_pattern: intermediate/*.nc

- operator: plot.spatial_contour_plot
sequence_coordinate: time
Expand Down

0 comments on commit 9567afb

Please sign in to comment.