diff --git a/reana_commons/snakemake.py b/reana_commons/snakemake.py index e8733352..78556cb9 100644 --- a/reana_commons/snakemake.py +++ b/reana_commons/snakemake.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2021, 2022 CERN. +# Copyright (C) 2021, 2022, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -11,15 +11,25 @@ import os from itertools import filterfalse, chain from typing import Any, Dict, List, Optional +from pathlib import Path -from snakemake import snakemake +from snakemake.api import SnakemakeApi from snakemake.dag import DAG -from snakemake.io import load_configfile +from snakemake.settings.types import ( + ResourceSettings, + WorkflowSettings, + ConfigSettings, + OutputSettings, + StorageSettings, + DAGSettings, + DeploymentSettings, +) + +# from snakemake.io import load_configfile from snakemake.jobs import Job from snakemake.persistence import Persistence from snakemake.rules import Rule from snakemake.workflow import Workflow - from reana_commons.errors import REANAValidationError @@ -36,15 +46,33 @@ def snakemake_validate( :param workdir: Path to working directory. :type workdir: string or None """ - valid = snakemake( - snakefile=workflow_file, - configfiles=configfiles, - workdir=workdir, - dryrun=True, - quiet=True, - ) - if not valid: - raise REANAValidationError("Snakemake specification is invalid.") + + with SnakemakeApi( + OutputSettings( + quiet=True, + ) + ) as snakemake_api: + try: + workflow_api = snakemake_api.workflow( + resource_settings=ResourceSettings(nodes=300), + config_settings=ConfigSettings(), + storage_settings=StorageSettings(), + storage_provider_settings=dict(), + workflow_settings=WorkflowSettings(), + deployment_settings=DeploymentSettings(), + snakefile=workflow_file, + workdir=workdir, + ) + # Execute the workflow with the dryrun executor + dag_api = ( + workflow_api.dag() + ) # If snakefile has syntax error, throws an error + # print(dag_api, "hey") + # dag_api.execute_workflow(executor="dryrun") + + except Exception as e: + snakemake_api.print_exception(e) + raise REANAValidationError("Snakemake specification is invalid.") def snakemake_load(workflow_file: str, **kwargs: Any) -> Dict: @@ -58,7 +86,7 @@ def snakemake_load(workflow_file: str, **kwargs: Any) -> Dict: """ def _create_snakemake_dag( - snakefile: str, configfiles: Optional[List[str]] = None, **kwargs: Any + snakefile: Path, configfiles: Optional[List[Path]] = None, **kwargs: Any ) -> DAG: """Create ``snakemake.dag.DAG`` instance. @@ -79,14 +107,23 @@ def _create_snakemake_dag( if configfiles is None: configfiles = [] for f in configfiles: + pass # get values to override. Later configfiles override earlier ones. - overwrite_config.update(load_configfile(f)) + + # @TODO load_configfile is deprecated. Find another way to implement it. + # overwrite_config.update(load_configfile(f)) + # convert provided paths to absolute paths - configfiles = list(map(os.path.abspath, configfiles)) + configfiles = [path.resolve() for path in configfiles] workflow = Workflow( - snakefile=snakefile, - overwrite_configfiles=configfiles, - overwrite_config=overwrite_config, + resource_settings=ResourceSettings(), + workflow_settings=WorkflowSettings(), + config_settings=ConfigSettings( + configfiles=configfiles, + ), + storage_settings=StorageSettings(), + storage_provider_settings=dict(), + dag_settings=DAGSettings(), ) workdir = kwargs.get("workdir") @@ -166,16 +203,18 @@ def relpath(f): else: # for backwards compatibility (Snakemake < 7 for Python 3.6) workflow.persistence = Persistence(dag=dag) - dag.init() - dag.update_checkpoint_dependencies() - dag.check_dynamic() + + # dag.init() # This function is async + # dag.update_checkpoint_dependencies() # This function is async + # dag.check_dynamic() # Dag has no longer this attribute return dag workdir = kwargs.get("workdir") if workdir: workflow_file = os.path.join(workdir, workflow_file) - configfiles = [kwargs.get("input")] if kwargs.get("input") else [] + workflow_file = Path(workflow_file) # convert str to Path + configfiles = [Path(kwargs.get("input"))] if kwargs.get("input") else [] snakemake_validate( workflow_file=workflow_file, configfiles=configfiles, workdir=workdir @@ -186,8 +225,17 @@ def relpath(f): prev_cwd = os.getcwd() try: snakemake_dag = _create_snakemake_dag( - workflow_file, configfiles=configfiles, **kwargs + Path(workflow_file), configfiles=configfiles, **kwargs ) + except Exception as e: + with SnakemakeApi( + OutputSettings( + quiet=True, + ) + ) as snakemake_api: + snakemake_api.print_exception(e) + + raise REANAValidationError("Snakemake specification is invalid.") finally: os.chdir(prev_cwd) diff --git a/setup.py b/setup.py index 4d7e2567..456b0767 100755 --- a/setup.py +++ b/setup.py @@ -38,10 +38,14 @@ ], "cwl": ["cwltool==3.1.20210628163208"], "snakemake": [ - "snakemake==8.20.5", - "snakemake-interface-common==1.17.3", - "snakemake-interface-executor-plugins==9.2.0", - "snakemake-interface-report-plugins==1.0.0", + "snakemake==8.23.0", + "snakemake-interface-common==1.17.4", + "snakemake-interface-executor-plugins==9.3.2", + "snakemake-interface-storage-plugins==3.3.0", + "snakemake-interface-report-plugins==1.1.0", + ], + "snakemake-xrootd": [ + "snakemake-storage-plugin-xrootd==0.1.4", ], }