Skip to content

Commit

Permalink
refactor(snakemake): convert old snakemake api to new api (reanahub#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Oct 15, 2024
1 parent 21b44b9 commit 46bd3b6
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 28 deletions.
96 changes: 72 additions & 24 deletions reana_commons/snakemake.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021, 2022 CERN.
# Copyright (C) 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -11,15 +11,25 @@
import os
from itertools import filterfalse, chain
from typing import Any, Dict, List, Optional
from pathlib import Path

from snakemake import snakemake
from snakemake.api import SnakemakeApi
from snakemake.dag import DAG
from snakemake.io import load_configfile
from snakemake.settings.types import (
ResourceSettings,
WorkflowSettings,
ConfigSettings,
OutputSettings,
StorageSettings,
DAGSettings,
DeploymentSettings,
)

# from snakemake.io import load_configfile
from snakemake.jobs import Job
from snakemake.persistence import Persistence
from snakemake.rules import Rule
from snakemake.workflow import Workflow

from reana_commons.errors import REANAValidationError


Expand All @@ -36,15 +46,33 @@ def snakemake_validate(
:param workdir: Path to working directory.
:type workdir: string or None
"""
valid = snakemake(
snakefile=workflow_file,
configfiles=configfiles,
workdir=workdir,
dryrun=True,
quiet=True,
)
if not valid:
raise REANAValidationError("Snakemake specification is invalid.")

with SnakemakeApi(
OutputSettings(
quiet=True,
)
) as snakemake_api:
try:
workflow_api = snakemake_api.workflow(
resource_settings=ResourceSettings(nodes=300),
config_settings=ConfigSettings(),
storage_settings=StorageSettings(),
storage_provider_settings=dict(),
workflow_settings=WorkflowSettings(),
deployment_settings=DeploymentSettings(),
snakefile=workflow_file,
workdir=workdir,
)
# Execute the workflow with the dryrun executor
dag_api = (
workflow_api.dag()
) # If snakefile has syntax error, throws an error
# print(dag_api, "hey")
# dag_api.execute_workflow(executor="dryrun")

except Exception as e:
snakemake_api.print_exception(e)
raise REANAValidationError("Snakemake specification is invalid.")


def snakemake_load(workflow_file: str, **kwargs: Any) -> Dict:
Expand All @@ -58,7 +86,7 @@ def snakemake_load(workflow_file: str, **kwargs: Any) -> Dict:
"""

def _create_snakemake_dag(
snakefile: str, configfiles: Optional[List[str]] = None, **kwargs: Any
snakefile: Path, configfiles: Optional[List[Path]] = None, **kwargs: Any
) -> DAG:
"""Create ``snakemake.dag.DAG`` instance.
Expand All @@ -79,14 +107,23 @@ def _create_snakemake_dag(
if configfiles is None:
configfiles = []
for f in configfiles:
pass
# get values to override. Later configfiles override earlier ones.
overwrite_config.update(load_configfile(f))

# @TODO load_configfile is deprecated. Find another way to implement it.
# overwrite_config.update(load_configfile(f))

# convert provided paths to absolute paths
configfiles = list(map(os.path.abspath, configfiles))
configfiles = [path.resolve() for path in configfiles]
workflow = Workflow(
snakefile=snakefile,
overwrite_configfiles=configfiles,
overwrite_config=overwrite_config,
resource_settings=ResourceSettings(),
workflow_settings=WorkflowSettings(),
config_settings=ConfigSettings(
configfiles=configfiles,
),
storage_settings=StorageSettings(),
storage_provider_settings=dict(),
dag_settings=DAGSettings(),
)

workdir = kwargs.get("workdir")
Expand Down Expand Up @@ -166,16 +203,18 @@ def relpath(f):
else:
# for backwards compatibility (Snakemake < 7 for Python 3.6)
workflow.persistence = Persistence(dag=dag)
dag.init()
dag.update_checkpoint_dependencies()
dag.check_dynamic()

# dag.init() # This function is async
# dag.update_checkpoint_dependencies() # This function is async
# dag.check_dynamic() # Dag has no longer this attribute
return dag

workdir = kwargs.get("workdir")
if workdir:
workflow_file = os.path.join(workdir, workflow_file)

configfiles = [kwargs.get("input")] if kwargs.get("input") else []
workflow_file = Path(workflow_file) # convert str to Path
configfiles = [Path(kwargs.get("input"))] if kwargs.get("input") else []

snakemake_validate(
workflow_file=workflow_file, configfiles=configfiles, workdir=workdir
Expand All @@ -186,8 +225,17 @@ def relpath(f):
prev_cwd = os.getcwd()
try:
snakemake_dag = _create_snakemake_dag(
workflow_file, configfiles=configfiles, **kwargs
Path(workflow_file), configfiles=configfiles, **kwargs
)
except Exception as e:
with SnakemakeApi(
OutputSettings(
quiet=True,
)
) as snakemake_api:
snakemake_api.print_exception(e)

raise REANAValidationError("Snakemake specification is invalid.")
finally:
os.chdir(prev_cwd)

Expand Down
12 changes: 8 additions & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@
],
"cwl": ["cwltool==3.1.20210628163208"],
"snakemake": [
"snakemake==8.20.5",
"snakemake-interface-common==1.17.3",
"snakemake-interface-executor-plugins==9.2.0",
"snakemake-interface-report-plugins==1.0.0",
"snakemake==8.23.0",
"snakemake-interface-common==1.17.4",
"snakemake-interface-executor-plugins==9.3.2",
"snakemake-interface-storage-plugins==3.3.0",
"snakemake-interface-report-plugins==1.1.0",
],
"snakemake-xrootd": [
"snakemake-storage-plugin-xrootd==0.1.4",
],
}

Expand Down

0 comments on commit 46bd3b6

Please sign in to comment.