diff --git a/AUTHORS.md b/AUTHORS.md index e88ac830..8fe2f148 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -5,6 +5,7 @@ The list of contributors in alphabetical order: - [Adelina Lintuluoto](https://orcid.org/0000-0002-0726-1452) - [Agisilaos Kounelis](https://orcid.org/0000-0001-9312-3189) - [Alastair Lyall](https://orcid.org/0009-0000-4955-8935) +- [Alp Tuna](https://orcid.org/0009-0001-1915-3993) - [Audrius Mecionis](https://orcid.org/0000-0002-3759-1663) - [Bruno Rosendo](https://orcid.org/0000-0002-0923-3148) - [Burt Holzman](https://orcid.org/0000-0001-5235-6314) diff --git a/reana_commons/snakemake.py b/reana_commons/snakemake.py index e8733352..e8e6aa0b 100644 --- a/reana_commons/snakemake.py +++ b/reana_commons/snakemake.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # # This file is part of REANA. -# Copyright (C) 2021, 2022 CERN. +# Copyright (C) 2021, 2022, 2024 CERN. # # REANA is free software; you can redistribute it and/or modify it # under the terms of the MIT License; see LICENSE file for more details. @@ -9,16 +9,29 @@ """REANA Snakemake Workflow utils.""" import os +import sys from itertools import filterfalse, chain from typing import Any, Dict, List, Optional - -from snakemake import snakemake -from snakemake.dag import DAG -from snakemake.io import load_configfile -from snakemake.jobs import Job -from snakemake.persistence import Persistence -from snakemake.rules import Rule -from snakemake.workflow import Workflow +from pathlib import Path + +if sys.version_info >= (3, 11): + from snakemake.api import SnakemakeApi + from snakemake.settings.types import ( + ResourceSettings, + WorkflowSettings, + ConfigSettings, + OutputSettings, + StorageSettings, + DeploymentSettings, + ) +else: + from snakemake import snakemake + from snakemake.dag import DAG + from snakemake.io import load_configfile + from snakemake.jobs import Job + from snakemake.persistence import Persistence + from snakemake.rules import Rule + from snakemake.workflow import Workflow from reana_commons.errors import REANAValidationError @@ -26,7 +39,7 @@ def snakemake_validate( workflow_file: str, configfiles: List[str], workdir: Optional[str] = None ): - """Validate Snakemake workflow specification. + """Seems to be enough for the first validation. We may move to snakemake --dry-run when the validation process will be fully moved to the server-side. :param workflow_file: A specification file compliant with `snakemake` workflow specification. @@ -36,15 +49,41 @@ def snakemake_validate( :param workdir: Path to working directory. :type workdir: string or None """ - valid = snakemake( - snakefile=workflow_file, - configfiles=configfiles, - workdir=workdir, - dryrun=True, - quiet=True, - ) - if not valid: - raise REANAValidationError("Snakemake specification is invalid.") + if sys.version_info >= (3, 11): + with SnakemakeApi( + OutputSettings( + quiet=True, + ) + ) as snakemake_api: + try: + workflow_api = snakemake_api.workflow( + resource_settings=ResourceSettings(nodes=300), + config_settings=ConfigSettings(configfiles=configfiles), + storage_settings=StorageSettings(), + storage_provider_settings=dict(), + workflow_settings=WorkflowSettings(), + deployment_settings=DeploymentSettings(), + snakefile=workflow_file, + workdir=workdir, + ) + + workflow_api.dag() + + except Exception as e: + snakemake_api.print_exception(e) + raise REANAValidationError("Snakemake specification is invalid.") + + # Keep the old implementation for python < 3.11 + else: + valid = snakemake( + snakefile=workflow_file, + configfiles=configfiles, + workdir=workdir, + dryrun=True, + quiet=True, + ) + if not valid: + raise REANAValidationError("Snakemake specification is invalid.") def snakemake_load(workflow_file: str, **kwargs: Any) -> Dict: @@ -56,163 +95,184 @@ def snakemake_load(workflow_file: str, **kwargs: Any) -> Dict: :returns: Dictonary containing relevant workflow metadata. """ - - def _create_snakemake_dag( - snakefile: str, configfiles: Optional[List[str]] = None, **kwargs: Any - ) -> DAG: - """Create ``snakemake.dag.DAG`` instance. - - The code of this function comes from the Snakemake codebase and is adapted - to fullfil REANA purposes of getting the needed metadata. - - If `workdir` is passed as a keyword argument, then this function will change the - CWD to `workdir`. - - :param snakefile: Path to Snakefile. - :type snakefile: string - :param configfiles: List of config files paths. - :type configfiles: List - :param kwargs: Snakemake args. - :type kwargs: Any - """ - overwrite_config = dict() - if configfiles is None: - configfiles = [] - for f in configfiles: - # get values to override. Later configfiles override earlier ones. - overwrite_config.update(load_configfile(f)) - # convert provided paths to absolute paths - configfiles = list(map(os.path.abspath, configfiles)) - workflow = Workflow( - snakefile=snakefile, - overwrite_configfiles=configfiles, - overwrite_config=overwrite_config, - ) - + if sys.version_info >= (3, 11): workdir = kwargs.get("workdir") if workdir: - workflow.workdir(workdir) - - workflow.include(snakefile=snakefile, overwrite_default_target=True) - workflow.check() - - # code copied and adapted from `snakemake.workflow.Workflow.execute()` - # in order to build the DAG and calculate the job dependencies. - # https://github.com/snakemake/snakemake/blob/75a544ba528b30b43b861abc0ad464db4d6ae16f/snakemake/workflow.py#L525 - def rules(items): - return map( - workflow._rules.__getitem__, - filter(workflow.is_rule, items), - ) - - if kwargs.get("keep_target_files"): - - def files(items): - return filterfalse(workflow.is_rule, items) - - else: + workflow_file = os.path.join(workdir, workflow_file) - def files(items): - def relpath(f): - return ( - f - if os.path.isabs(f) or f.startswith("root://") - else os.path.relpath(f) - ) + workflow_file = Path(workflow_file) # convert str to Path + configfiles = [Path(kwargs.get("input"))] if kwargs.get("input") else [] - return map(relpath, filterfalse(workflow.is_rule, items)) + snakemake_validate( + workflow_file=workflow_file, configfiles=configfiles, workdir=workdir + ) - if not kwargs.get("targets"): - targets = ( - [workflow.default_target] - if workflow.default_target is not None - else list() + return { + "job_dependencies": {}, + "steps": [], + } + + # Keep the old implementation for python < 3.11 + else: + + def _create_snakemake_dag( + snakefile: str, configfiles: Optional[List[str]] = None, **kwargs: Any + ) -> DAG: + """Create ``snakemake.dag.DAG`` instance. + + The code of this function comes from the Snakemake codebase and is adapted + to fullfil REANA purposes of getting the needed metadata. + + If `workdir` is passed as a keyword argument, then this function will change the + CWD to `workdir`. + + :param snakefile: Path to Snakefile. + :type snakefile: string + :param configfiles: List of config files paths. + :type configfiles: List + :param kwargs: Snakemake args. + :type kwargs: Any + """ + overwrite_config = dict() + if configfiles is None: + configfiles = [] + for f in configfiles: + # get values to override. Later configfiles override earlier ones. + overwrite_config.update(load_configfile(f)) + # convert provided paths to absolute paths + configfiles = list(map(os.path.abspath, configfiles)) + workflow = Workflow( + snakefile=snakefile, + overwrite_configfiles=configfiles, + overwrite_config=overwrite_config, ) - prioritytargets = kwargs.get("prioritytargets", []) - forcerun = kwargs.get("forcerun", []) - until = kwargs.get("until", []) - omit_from = kwargs.get("omit_from", []) - - priorityrules = set(rules(prioritytargets)) - priorityfiles = set(files(prioritytargets)) - forcerules = set(rules(forcerun)) - forcefiles = set(files(forcerun)) - untilrules = set(rules(until)) - untilfiles = set(files(until)) - omitrules = set(rules(omit_from)) - omitfiles = set(files(omit_from)) - - targetrules = set( - chain( - rules(targets), - filterfalse(Rule.has_wildcards, priorityrules), - filterfalse(Rule.has_wildcards, forcerules), - filterfalse(Rule.has_wildcards, untilrules), + workdir = kwargs.get("workdir") + if workdir: + workflow.workdir(workdir) + + workflow.include(snakefile=snakefile, overwrite_default_target=True) + workflow.check() + + # code copied and adapted from `snakemake.workflow.Workflow.execute()` + # in order to build the DAG and calculate the job dependencies. + # https://github.com/snakemake/snakemake/blob/75a544ba528b30b43b861abc0ad464db4d6ae16f/snakemake/workflow.py#L525 + def rules(items): + return map( + workflow._rules.__getitem__, + filter(workflow.is_rule, items), + ) + + if kwargs.get("keep_target_files"): + + def files(items): + return filterfalse(workflow.is_rule, items) + + else: + + def files(items): + def relpath(f): + return ( + f + if os.path.isabs(f) or f.startswith("root://") + else os.path.relpath(f) + ) + + return map(relpath, filterfalse(workflow.is_rule, items)) + + if not kwargs.get("targets"): + targets = ( + [workflow.default_target] + if workflow.default_target is not None + else list() + ) + + prioritytargets = kwargs.get("prioritytargets", []) + forcerun = kwargs.get("forcerun", []) + until = kwargs.get("until", []) + omit_from = kwargs.get("omit_from", []) + + priorityrules = set(rules(prioritytargets)) + priorityfiles = set(files(prioritytargets)) + forcerules = set(rules(forcerun)) + forcefiles = set(files(forcerun)) + untilrules = set(rules(until)) + untilfiles = set(files(until)) + omitrules = set(rules(omit_from)) + omitfiles = set(files(omit_from)) + + targetrules = set( + chain( + rules(targets), + filterfalse(Rule.has_wildcards, priorityrules), + filterfalse(Rule.has_wildcards, forcerules), + filterfalse(Rule.has_wildcards, untilrules), + ) + ) + targetfiles = set( + chain(files(targets), priorityfiles, forcefiles, untilfiles) + ) + dag = DAG( + workflow, + workflow.rules, + targetrules=targetrules, + targetfiles=targetfiles, + omitfiles=omitfiles, + omitrules=omitrules, ) - ) - targetfiles = set(chain(files(targets), priorityfiles, forcefiles, untilfiles)) - dag = DAG( - workflow, - workflow.rules, - targetrules=targetrules, - targetfiles=targetfiles, - omitfiles=omitfiles, - omitrules=omitrules, - ) - - if hasattr(workflow, "_persistence"): - workflow._persistence = Persistence(dag=dag) - else: - # for backwards compatibility (Snakemake < 7 for Python 3.6) - workflow.persistence = Persistence(dag=dag) - dag.init() - dag.update_checkpoint_dependencies() - dag.check_dynamic() - return dag - workdir = kwargs.get("workdir") - if workdir: - workflow_file = os.path.join(workdir, workflow_file) + if hasattr(workflow, "_persistence"): + workflow._persistence = Persistence(dag=dag) + else: + # for backwards compatibility (Snakemake < 7 for Python 3.6) + workflow.persistence = Persistence(dag=dag) + dag.init() + dag.update_checkpoint_dependencies() + dag.check_dynamic() + return dag - configfiles = [kwargs.get("input")] if kwargs.get("input") else [] + workdir = kwargs.get("workdir") + if workdir: + workflow_file = os.path.join(workdir, workflow_file) - snakemake_validate( - workflow_file=workflow_file, configfiles=configfiles, workdir=workdir - ) + configfiles = [kwargs.get("input")] if kwargs.get("input") else [] - # save the cwd to restore it after _create_snakemake_dag, because this function - # changes the cwd if `workdir` is in `kwargs` - prev_cwd = os.getcwd() - try: - snakemake_dag = _create_snakemake_dag( - workflow_file, configfiles=configfiles, **kwargs + snakemake_validate( + workflow_file=workflow_file, configfiles=configfiles, workdir=workdir ) - finally: - os.chdir(prev_cwd) - - job_dependencies = { - str(job): list(map(str, deps.keys())) - for job, deps in snakemake_dag.dependencies.items() - } - - return { - "job_dependencies": job_dependencies, - "steps": [ - { - "name": rule.name, - "environment": (rule._container_img or "").replace("docker://", ""), - "inputs": dict(rule._input), - "params": dict(rule._params), - "outputs": dict(rule._output), - "commands": [rule.shellcmd], - "compute_backend": rule.resources.get("compute_backend"), - "kubernetes_memory_limit": rule.resources.get( - "kubernetes_memory_limit" - ), - "kubernetes_uid": rule.resources.get("kubernetes_uid"), - } - for rule in snakemake_dag.rules - if not rule.norun - ], - } + + # save the cwd to restore it after _create_snakemake_dag, because this function + # changes the cwd if `workdir` is in `kwargs` + prev_cwd = os.getcwd() + try: + snakemake_dag = _create_snakemake_dag( + workflow_file, configfiles=configfiles, **kwargs + ) + finally: + os.chdir(prev_cwd) + + job_dependencies = { + str(job): list(map(str, deps.keys())) + for job, deps in snakemake_dag.dependencies.items() + } + + return { + "job_dependencies": job_dependencies, + "steps": [ + { + "name": rule.name, + "environment": (rule._container_img or "").replace("docker://", ""), + "inputs": dict(rule._input), + "params": dict(rule._params), + "outputs": dict(rule._output), + "commands": [rule.shellcmd], + "compute_backend": rule.resources.get("compute_backend"), + "kubernetes_memory_limit": rule.resources.get( + "kubernetes_memory_limit" + ), + "kubernetes_uid": rule.resources.get("kubernetes_uid"), + } + for rule in snakemake_dag.rules + if not rule.norun + ], + } diff --git a/setup.py b/setup.py index 129f906d..735c0469 100755 --- a/setup.py +++ b/setup.py @@ -38,22 +38,19 @@ ], "cwl": ["cwltool==3.1.20210628163208"], "snakemake": [ - # install patched version of snakemake v7 that works with Python 3.12 - # see https://github.com/snakemake/snakemake/issues/2480 - # see https://github.com/snakemake/snakemake/issues/2648 - # see https://github.com/snakemake/snakemake/issues/2657 - "snakemake @ git+https://github.com/mdonadoni/snakemake.git@cea31624976989ad0645eb2e1751260d32259506", # branch `7.32.4-python3.12` - "pulp>=2.7.0,<2.8.0", + "snakemake==7.32.4 ; python_version<'3.11'", + "pulp>=2.7.0,<2.8.0 ; python_version<'3.11'", + "snakemake==8.24.1 ; python_version>='3.11'", + "snakemake-interface-common==1.17.4 ; python_version>='3.11'", + "snakemake-interface-executor-plugins==9.3.2 ; python_version>='3.11'", + "snakemake-interface-storage-plugins==3.3.0 ; python_version>='3.11'", + "snakemake-interface-report-plugins==1.1.0 ; python_version>='3.11'", ], - "snakemake-reports": [ - "snakemake[reports] @ git+https://github.com/mdonadoni/snakemake.git@cea31624976989ad0645eb2e1751260d32259506", # branch `7.32.4-python3.12` - "pulp>=2.7.0,<2.8.0", + "snakemake-xrootd": [ + "snakemake-storage-plugin-xrootd==0.1.4", ], } -# backwards compatibility with extras before PEP 685 -extras_require["snakemake_reports"] = extras_require["snakemake-reports"] - extras_require["all"] = [] for key, reqs in extras_require.items(): if ":" == key[0]: