From f09a21e1cbd120f57eb810ffacfce313a82c2c94 Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 3 Jan 2024 15:40:27 -0700 Subject: [PATCH 01/10] feat: preemption Signed-off-by: vsoch --- README.md | 21 -------- docs/further.md | 4 ++ example/hello-world-preemption/README.md | 9 ++++ example/hello-world-preemption/Snakefile | 19 +++++++ .../__init__.py | 32 +++++++++--- .../command.py | 2 +- .../executor.py | 51 +++++++++++++++++++ 7 files changed, 110 insertions(+), 28 deletions(-) create mode 100644 example/hello-world-preemption/README.md create mode 100644 example/hello-world-preemption/Snakefile diff --git a/README.md b/README.md index 3446c6a..d4dc85e 100644 --- a/README.md +++ b/README.md @@ -3,34 +3,13 @@ This is the [Google Batch](https://cloud.google.com/batch/docs/get-started) external executor plugin for snakemake. For documentation, see the [Snakemake plugin catalog](https://snakemake.github.io/snakemake-plugin-catalog/plugins/executor/googlebatch.html). -### TODO - -- Add bash strict mode (should default to true) -- Integrate snakemake MPI support (needs to work with snippet) - - -### Questions - -- What leads to STATE_UNSPECIFIED? -- For Google: what is the source of truth (listing) for batch? I see different answers in different places. -- For Johannes: Why can't we use debug logging for executor plugins? I instead need to use info and make it very verbose. -- For All: How do we want to use [COS](https://cloud.google.com/container-optimized-os/docs/concepts/features-and-benefits)? It would allow a container base to be used instead I think? - ### Notes - Conda is used to install Snakemake and dependencies. - The COS (container OS) uses the default Snakemake container, unless you specify differently. -### Feedback - -- Debugging batch is impossible (and slow). A "hello world" workflow takes 10 minutes to run and debug once. -- The jobs table is slow to load and sometimes does not load / shows old jobs at the top (without me touching anything) -- The logs directly in batch are so much better! Having the stream option there would still be nice (vs. having to refresh.) -- The batch UI (jobs table) is very slow to load and often just doesn't even after button or page refresh. - For examples, look into the [examples](examples) directory. - ## Developer The instructions for creating and scaffolding this plugin are [here](https://github.com/snakemake/poetry-snakemake-plugin#scaffolding-an-executor-plugin). diff --git a/docs/further.md b/docs/further.md index 96a3a55..6e93efc 100644 --- a/docs/further.md +++ b/docs/further.md @@ -88,6 +88,10 @@ And custom arguments can be any of the following, either on the command line or | container | Container to use (only when image_family is batch-cos*) [see here](https://cloud.google.com/batch/docs/vm-os-environment-overview#supported_vm_os_images) for families/projects | `--googlebatch-container` | str | | False | unset| | keep_source_cache | Cache workflows in your Google Cloud Storage Bucket | `--googlebatch-keep-source-cache` | bool | | False | False | | snippet | A comma separated list of one or more snippets to add to your setup | `--googelbatch-snippets` | str | | False | unset | +| preemption-default | Set a default number of preemptible instance retries | `--preemptible-default` | int | False | unset | +nset | +| preemption-rules | Define custom preemptible instance retries for specific rules | `--preemption-rules` | list | False | unset | + For machine type, note that for MPI workloads, mpitune configurations are validated on c2 and c2d instances only. Also note that you can customize the machine type on the level of the step (see [Step Options](#step-options) below). diff --git a/example/hello-world-preemption/README.md b/example/hello-world-preemption/README.md new file mode 100644 index 0000000..6dd0f6b --- /dev/null +++ b/example/hello-world-preemption/README.md @@ -0,0 +1,9 @@ +# Hello World + +> with preemtible instances + +Here is an example specifying to use an s3 bucket and asking for preemptible instances. + +```bash +snakemake --jobs 1 --executor googlebatch --googlebatch-region us-central1 --googlebatch-project llnl-flux --default-storage-provider s3 --default-storage-prefix s3://snakemake-testing-llnl --googlebatch-preemption-default 2 +``` \ No newline at end of file diff --git a/example/hello-world-preemption/Snakefile b/example/hello-world-preemption/Snakefile new file mode 100644 index 0000000..12f03bc --- /dev/null +++ b/example/hello-world-preemption/Snakefile @@ -0,0 +1,19 @@ +# By convention, the first pseudorule should be called "all" +# We're using the expand() function to create multiple targets +rule all: + input: + expand( + "{greeting}/world.txt", + greeting = ['hello', 'hola'], + ), + +# First real rule, this is using a wildcard called "greeting" +rule multilingual_hello_world: + output: + "{greeting}/world.txt", + shell: + """ + mkdir -p "{wildcards.greeting}" + sleep 5 + echo "{wildcards.greeting}, World!" > {output} + """ diff --git a/snakemake_executor_plugin_googlebatch/__init__.py b/snakemake_executor_plugin_googlebatch/__init__.py index 3573b20..5991511 100644 --- a/snakemake_executor_plugin_googlebatch/__init__.py +++ b/snakemake_executor_plugin_googlebatch/__init__.py @@ -82,7 +82,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - work_tasks: Optional[str] = field( + work_tasks: Optional[int] = field( default=1, metadata={ "help": "The default number of work tasks (these are NOT MPI ranks)", @@ -91,7 +91,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - cpu_milli: Optional[str] = field( + cpu_milli: Optional[int] = field( default=1000, metadata={ "help": "Milliseconds per cpu-second", @@ -100,7 +100,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - cpu_milli: Optional[str] = field( + cpu_milli: Optional[int] = field( default=1000, metadata={ "help": "Milliseconds per cpu-second", @@ -109,7 +109,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - work_tasks_per_node: Optional[str] = field( + work_tasks_per_node: Optional[int] = field( default=1, metadata={ "help": "The default number of work tasks per node (NOT MPI ranks)", @@ -118,7 +118,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - memory: Optional[str] = field( + memory: Optional[int] = field( default=1000, metadata={ "help": "Memory in MiB", @@ -136,7 +136,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - retry_count: Optional[str] = field( + retry_count: Optional[int] = field( default=1, metadata={ "help": "Retry count (default to 1)", @@ -163,6 +163,26 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) + # Preemptible (also sometimes called spot) options + # https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#preemptible-jobs + preemption_default: Optional[int] = field( + default=None, + metadata={ + "help": "Set a default number of preemptible instance retries", + "env_var": False, + "required": False, + }, + ) + + preemption_rules: Optional[list] = field( + default=None, + metadata={ + "help": "Define custom preemptible instance retries for specific rules", + "env_var": False, + "required": False, + }, + ) + # Required: # Common settings shared by various executors. diff --git a/snakemake_executor_plugin_googlebatch/command.py b/snakemake_executor_plugin_googlebatch/command.py index 000afa9..4f5bf92 100644 --- a/snakemake_executor_plugin_googlebatch/command.py +++ b/snakemake_executor_plugin_googlebatch/command.py @@ -62,7 +62,7 @@ check_for_snakemake = ( snakemake_base_environment + """ -$(pwd) +echo $(pwd) ls which snakemake || whereis snakemake """ diff --git a/snakemake_executor_plugin_googlebatch/executor.py b/snakemake_executor_plugin_googlebatch/executor.py index 921d5bc..de466bf 100644 --- a/snakemake_executor_plugin_googlebatch/executor.py +++ b/snakemake_executor_plugin_googlebatch/executor.py @@ -25,6 +25,7 @@ def __post_init__(self): self.batch = batch_v1.BatchServiceClient() except Exception as e: raise WorkflowError("Unable to connect to Google Batch.", e) + self._set_preemptible_rules() def get_param(self, job, param): """ @@ -110,6 +111,47 @@ def validate(self, job): f"Job {job} has container without image_family batch-cos*." ) + def _set_preemptible_rules(self): + """ + Define a lookup dictionary for preemptible instance retries. + """ + self.preemptible_rules = {} + + # If a default is defined, we apply it to all the rules + if self.executor_settings.preemption_default is not None: + self.preemptible_rules = { + rule.name: self.executor_settings.preemption_default + for rule in self.workflow.rules + } + + # Now update custom set rules + if self.executor_settings.preemption_rules is not None: + for rule in self.executor_settings.preemption_rules: + rule_name, restart_times = rule.strip().split("=") + self.preemption_rules[rule_name] = int(restart_times) + + # Ensure we set the number of restart times for each rule + for rule_name, restart_times in self.preemptible_rules.items(): + rule = self.workflow.get_rule(rule_name) + rule.restart_times = restart_times + + def is_preemptible(self, job): + """ + Determine if a job is preemptible. + """ + if job.is_group(): + preemptible = all(rule in self.preemptible_rules for rule in job.rules) + if not preemptible and any( + rule in self.preemptible_rules for rule in job.rules + ): + raise WorkflowError( + "All grouped rules should be homogenously set as preemptible rules" + "(see Defining groups for execution in snakemake documentation)" + ) + else: + preemptible = job.rule.name in self.preemptible_rules + return preemptible + def get_command_writer(self, job): """ Get a command writer for a job. @@ -178,6 +220,9 @@ def run_job(self, job: JobExecutorInterface): setup.script = batch_v1.Runnable.Script() setup.script.text = setup_command + # Placement policy + # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.PlacementPolicy + # This will ensure all nodes finish first barrier = batch_v1.Runnable() barrier.barrier = batch_v1.Runnable.Barrier() @@ -262,6 +307,12 @@ def get_allocation_policy(self, job): policy.machine_type = machine_type policy.boot_disk = boot_disk + # Do we want preemptible? + # https://github.com/googleapis/googleapis/blob/master/google/cloud/batch/v1/job.proto#L479 and # noqa + # https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-batch/google/cloud/batch_v1/types/job.py#L672 # noqa + if self.is_preemptible(job): + policy.provisioning_model = 3 + instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate() instances.policy = policy allocation_policy.instances = [instances] From b874811633de0925267fad36ad2f24b85eccc51b Mon Sep 17 00:00:00 2001 From: vsoch Date: Tue, 9 Jan 2024 13:58:51 -0700 Subject: [PATCH 02/10] updates still having an issue with not finding the gs flags Signed-off-by: vsoch --- .../command.py | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/snakemake_executor_plugin_googlebatch/command.py b/snakemake_executor_plugin_googlebatch/command.py index 4f5bf92..957fdcb 100644 --- a/snakemake_executor_plugin_googlebatch/command.py +++ b/snakemake_executor_plugin_googlebatch/command.py @@ -34,28 +34,28 @@ # Only the main job should install conda (rest can use it) echo "I am batch index ${BATCH_TASK_INDEX}" export PATH=/opt/conda/bin:${PATH} + +# TODO update to be from snakemake googlebatch repository +wget https://gist.githubusercontent.com/vsoch/60838b0b0fc848ca812e21f7d37ebac9/raw/621570759e7b851e7f8a9c6cb9c4889d898ae31e/install-snek.sh +chmod +x ./install-snek.sh + if [ $BATCH_TASK_INDEX = 0 ] && [ ! -d "/opt/conda" ] ; then - workdir=$(pwd) - url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh - wget ${url} -O ./miniconda.sh - chmod +x ./miniconda.sh - bash ./miniconda.sh -b -u -p /opt/conda - rm -rf ./miniconda.sh - conda config --system --set channel_priority strict - which python - /opt/conda/bin/python --version - url=https://github.com/snakemake/snakemake-interface-common - git clone --depth 1 ${url} /tmp/snakemake-common - cd /tmp/snakemake-common - /opt/conda/bin/python -m pip install . - url=https://github.com/snakemake/snakemake-interface-executor-plugins - git clone --depth 1 ${url} /tmp/snakemake-plugin - cd /tmp/snakemake-plugin - /opt/conda/bin/python -m pip install . - git clone --depth 1 https://github.com/snakemake/snakemake /tmp/snakemake - cd /tmp/snakemake - /opt/conda/bin/python -m pip install . - cd ${workdir} + workdir=$(pwd) + url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh + wget ${url} -O ./miniconda.sh + chmod +x ./miniconda.sh + bash ./miniconda.sh -b -u -p /opt/conda + rm -rf ./miniconda.sh + conda config --system --set channel_priority strict + which python + /opt/conda/bin/python --version + ./install-snek.sh https://github.com/snakemake/snakemake-interface-common + ./install-snek.sh https://github.com/snakemake/snakemake-interface-executor-plugins + ./install-snek.sh https://github.com/snakemake/snakemake-interface-storage-plugins + ./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-s3 + ./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-gcs + ./install-snek.sh https://github.com/snakemake/snakemake + cd ${workdir} fi """ From 4a0eb48960ffb0315ba4fa0aaf1b3733ec45b3a6 Mon Sep 17 00:00:00 2001 From: vsoch Date: Tue, 9 Jan 2024 14:19:04 -0700 Subject: [PATCH 03/10] clean up install of snakemake assets Signed-off-by: vsoch --- scripts/install-snek.sh | 12 ++++++++++++ snakemake_executor_plugin_googlebatch/command.py | 7 ++++--- 2 files changed, 16 insertions(+), 3 deletions(-) create mode 100755 scripts/install-snek.sh diff --git a/scripts/install-snek.sh b/scripts/install-snek.sh new file mode 100755 index 0000000..4226233 --- /dev/null +++ b/scripts/install-snek.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +url=$1 + +tmp=$(mktemp -d -t snek-install-XXXX) +rm -rf ${tmp} + +git clone --depth 1 ${url} ${tmp} +cd ${tmp} +/opt/conda/bin/python -m pip install . +-rm rf ${tmp} + diff --git a/snakemake_executor_plugin_googlebatch/command.py b/snakemake_executor_plugin_googlebatch/command.py index 957fdcb..0619385 100644 --- a/snakemake_executor_plugin_googlebatch/command.py +++ b/snakemake_executor_plugin_googlebatch/command.py @@ -35,8 +35,9 @@ echo "I am batch index ${BATCH_TASK_INDEX}" export PATH=/opt/conda/bin:${PATH} -# TODO update to be from snakemake googlebatch repository -wget https://gist.githubusercontent.com/vsoch/60838b0b0fc848ca812e21f7d37ebac9/raw/621570759e7b851e7f8a9c6cb9c4889d898ae31e/install-snek.sh +repo=https://github.com/snakemake/snakemake-executor-plugin-googlebatch +path=blob/add-preemtible/snakemake_executor_plugin_googlebatch/scripts/install-shek.sh +wget ${repo}/${path} chmod +x ./install-snek.sh if [ $BATCH_TASK_INDEX = 0 ] && [ ! -d "/opt/conda" ] ; then @@ -55,7 +56,7 @@ ./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-s3 ./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-gcs ./install-snek.sh https://github.com/snakemake/snakemake - cd ${workdir} + cd ${workdir} fi """ From 985fa736c0d3763588a240e4b2d108851b37df25 Mon Sep 17 00:00:00 2001 From: Vanessasaurus <814322+vsoch@users.noreply.github.com> Date: Sun, 14 Jan 2024 18:01:21 -0700 Subject: [PATCH 04/10] Update install-snek.sh --- scripts/install-snek.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scripts/install-snek.sh b/scripts/install-snek.sh index 4226233..854fd1e 100755 --- a/scripts/install-snek.sh +++ b/scripts/install-snek.sh @@ -8,5 +8,4 @@ rm -rf ${tmp} git clone --depth 1 ${url} ${tmp} cd ${tmp} /opt/conda/bin/python -m pip install . --rm rf ${tmp} - +rm -rf ${tmp} From f5471ead9cee1520feb77f3d82c57f9c175b2009 Mon Sep 17 00:00:00 2001 From: Vanessasaurus <814322+vsoch@users.noreply.github.com> Date: Sun, 14 Jan 2024 18:01:46 -0700 Subject: [PATCH 05/10] Update install-snek.sh --- scripts/install-snek.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/install-snek.sh b/scripts/install-snek.sh index 854fd1e..a051654 100755 --- a/scripts/install-snek.sh +++ b/scripts/install-snek.sh @@ -8,4 +8,5 @@ rm -rf ${tmp} git clone --depth 1 ${url} ${tmp} cd ${tmp} /opt/conda/bin/python -m pip install . +cd - rm -rf ${tmp} From b6258f9557e4226e48b12776522843a6ccb71541 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 14 Jan 2024 18:36:22 -0700 Subject: [PATCH 06/10] save state Signed-off-by: vsoch --- .../command.py | 43 +++++++------------ 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/snakemake_executor_plugin_googlebatch/command.py b/snakemake_executor_plugin_googlebatch/command.py index 0619385..82eae75 100644 --- a/snakemake_executor_plugin_googlebatch/command.py +++ b/snakemake_executor_plugin_googlebatch/command.py @@ -31,33 +31,22 @@ ) install_snakemake = """ -# Only the main job should install conda (rest can use it) -echo "I am batch index ${BATCH_TASK_INDEX}" -export PATH=/opt/conda/bin:${PATH} - -repo=https://github.com/snakemake/snakemake-executor-plugin-googlebatch -path=blob/add-preemtible/snakemake_executor_plugin_googlebatch/scripts/install-shek.sh -wget ${repo}/${path} -chmod +x ./install-snek.sh - -if [ $BATCH_TASK_INDEX = 0 ] && [ ! -d "/opt/conda" ] ; then - workdir=$(pwd) - url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh - wget ${url} -O ./miniconda.sh - chmod +x ./miniconda.sh - bash ./miniconda.sh -b -u -p /opt/conda - rm -rf ./miniconda.sh - conda config --system --set channel_priority strict - which python - /opt/conda/bin/python --version - ./install-snek.sh https://github.com/snakemake/snakemake-interface-common - ./install-snek.sh https://github.com/snakemake/snakemake-interface-executor-plugins - ./install-snek.sh https://github.com/snakemake/snakemake-interface-storage-plugins - ./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-s3 - ./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-gcs - ./install-snek.sh https://github.com/snakemake/snakemake - cd ${workdir} -fi +echo "I am batch index ${BATCH_TASK_INDEX}" && +export PATH=/opt/conda/bin:${PATH} && +repo=https://raw.githubusercontent.com/snakemake/snakemake-executor-plugin-googlebatch && +path=add-preemtible/scripts/install-snek.sh && +wget ${repo}/${path} && +chmod +x ./install-snek.sh && +workdir=$(pwd) && +url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && +wget ${url} -O ./miniconda.sh && +chmod +x ./miniconda.sh && +bash ./miniconda.sh -b -u -p /opt/conda && +rm -rf ./miniconda.sh && +which python && +/opt/conda/bin/python --version && +./install-snek.sh https://github.com/snakemake/snakemake && +cd ${workdir} """ check_for_snakemake = ( From 02f2ca3dbb00512c7fd5039113de073d3e5bd675 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 14 Jan 2024 19:54:04 -0700 Subject: [PATCH 07/10] bugfixes and updates to preemptible we need to use the snakemake provided preemtible args and not write custom ones here. Signed-off-by: vsoch --- docs/further.md | 6 +-- example/README.md | 6 +++ example/hello-world-preemption/README.md | 4 +- .../__init__.py | 20 ---------- .../command.py | 33 ++++++++-------- .../executor.py | 38 ++++++------------- 6 files changed, 38 insertions(+), 69 deletions(-) create mode 100644 example/README.md diff --git a/docs/further.md b/docs/further.md index 6e93efc..e031c7c 100644 --- a/docs/further.md +++ b/docs/further.md @@ -88,10 +88,6 @@ And custom arguments can be any of the following, either on the command line or | container | Container to use (only when image_family is batch-cos*) [see here](https://cloud.google.com/batch/docs/vm-os-environment-overview#supported_vm_os_images) for families/projects | `--googlebatch-container` | str | | False | unset| | keep_source_cache | Cache workflows in your Google Cloud Storage Bucket | `--googlebatch-keep-source-cache` | bool | | False | False | | snippet | A comma separated list of one or more snippets to add to your setup | `--googelbatch-snippets` | str | | False | unset | -| preemption-default | Set a default number of preemptible instance retries | `--preemptible-default` | int | False | unset | -nset | -| preemption-rules | Define custom preemptible instance retries for specific rules | `--preemption-rules` | list | False | unset | - For machine type, note that for MPI workloads, mpitune configurations are validated on c2 and c2d instances only. Also note that you can customize the machine type on the level of the step (see [Step Options](#step-options) below). @@ -335,4 +331,4 @@ rule hello_world: googlebatch_snippets="mpi,myscript.sh" shell: "..." -``` \ No newline at end of file +``` diff --git a/example/README.md b/example/README.md new file mode 100644 index 0000000..a9ae8cf --- /dev/null +++ b/example/README.md @@ -0,0 +1,6 @@ +# Snakemake Google Batch Examples + + - [hello-world](hello-world): The most basic hello world example with multiple langauges + - [hello-world-intel-mpi](hello-world-intel-mpi): Run an example MPI job to calculate Pi with intel MPI + - [hello-world-cos](hello-world-cos): The same using the container operating system (COS) + - [hello-world-preemption](hello-world-preemption): Example asking for preemptible jobs for all rules. diff --git a/example/hello-world-preemption/README.md b/example/hello-world-preemption/README.md index 6dd0f6b..ba03c98 100644 --- a/example/hello-world-preemption/README.md +++ b/example/hello-world-preemption/README.md @@ -5,5 +5,5 @@ Here is an example specifying to use an s3 bucket and asking for preemptible instances. ```bash -snakemake --jobs 1 --executor googlebatch --googlebatch-region us-central1 --googlebatch-project llnl-flux --default-storage-provider s3 --default-storage-prefix s3://snakemake-testing-llnl --googlebatch-preemption-default 2 -``` \ No newline at end of file +snakemake --jobs 1 --executor googlebatch --googlebatch-region us-central1 --googlebatch-project llnl-flux --default-storage-provider s3 --default-storage-prefix s3://snakemake-testing-llnl --preemptible-rules +``` diff --git a/snakemake_executor_plugin_googlebatch/__init__.py b/snakemake_executor_plugin_googlebatch/__init__.py index 5991511..daed653 100644 --- a/snakemake_executor_plugin_googlebatch/__init__.py +++ b/snakemake_executor_plugin_googlebatch/__init__.py @@ -163,26 +163,6 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - # Preemptible (also sometimes called spot) options - # https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#preemptible-jobs - preemption_default: Optional[int] = field( - default=None, - metadata={ - "help": "Set a default number of preemptible instance retries", - "env_var": False, - "required": False, - }, - ) - - preemption_rules: Optional[list] = field( - default=None, - metadata={ - "help": "Define custom preemptible instance retries for specific rules", - "env_var": False, - "required": False, - }, - ) - # Required: # Common settings shared by various executors. diff --git a/snakemake_executor_plugin_googlebatch/command.py b/snakemake_executor_plugin_googlebatch/command.py index 82eae75..b85e092 100644 --- a/snakemake_executor_plugin_googlebatch/command.py +++ b/snakemake_executor_plugin_googlebatch/command.py @@ -31,21 +31,24 @@ ) install_snakemake = """ -echo "I am batch index ${BATCH_TASK_INDEX}" && -export PATH=/opt/conda/bin:${PATH} && -repo=https://raw.githubusercontent.com/snakemake/snakemake-executor-plugin-googlebatch && -path=add-preemtible/scripts/install-snek.sh && -wget ${repo}/${path} && -chmod +x ./install-snek.sh && -workdir=$(pwd) && -url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh && -wget ${url} -O ./miniconda.sh && -chmod +x ./miniconda.sh && -bash ./miniconda.sh -b -u -p /opt/conda && -rm -rf ./miniconda.sh && -which python && -/opt/conda/bin/python --version && -./install-snek.sh https://github.com/snakemake/snakemake && +echo "I am batch index ${BATCH_TASK_INDEX}" +export PATH=/opt/conda/bin:${PATH} +repo=https://raw.githubusercontent.com/snakemake/snakemake-executor-plugin-googlebatch +path=add-preemtible/scripts/install-snek.sh +wget ${repo}/${path} +chmod +x ./install-snek.sh +workdir=$(pwd) +url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh +wget ${url} -O ./miniconda.sh +chmod +x ./miniconda.sh +bash ./miniconda.sh -b -u -p /opt/conda +rm -rf ./miniconda.sh + +which python +/opt/conda/bin/python --version +./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-gcs +./install-snek.sh https://github.com/snakemake/snakemake + cd ${workdir} """ diff --git a/snakemake_executor_plugin_googlebatch/executor.py b/snakemake_executor_plugin_googlebatch/executor.py index de466bf..776bccb 100644 --- a/snakemake_executor_plugin_googlebatch/executor.py +++ b/snakemake_executor_plugin_googlebatch/executor.py @@ -25,7 +25,6 @@ def __post_init__(self): self.batch = batch_v1.BatchServiceClient() except Exception as e: raise WorkflowError("Unable to connect to Google Batch.", e) - self._set_preemptible_rules() def get_param(self, job, param): """ @@ -111,36 +110,13 @@ def validate(self, job): f"Job {job} has container without image_family batch-cos*." ) - def _set_preemptible_rules(self): - """ - Define a lookup dictionary for preemptible instance retries. - """ - self.preemptible_rules = {} - - # If a default is defined, we apply it to all the rules - if self.executor_settings.preemption_default is not None: - self.preemptible_rules = { - rule.name: self.executor_settings.preemption_default - for rule in self.workflow.rules - } - - # Now update custom set rules - if self.executor_settings.preemption_rules is not None: - for rule in self.executor_settings.preemption_rules: - rule_name, restart_times = rule.strip().split("=") - self.preemption_rules[rule_name] = int(restart_times) - - # Ensure we set the number of restart times for each rule - for rule_name, restart_times in self.preemptible_rules.items(): - rule = self.workflow.get_rule(rule_name) - rule.restart_times = restart_times - def is_preemptible(self, job): """ Determine if a job is preemptible. """ + is_p = self.workflow.remote_execution_settings.preemptible_rules.is_preemptible if job.is_group(): - preemptible = all(rule in self.preemptible_rules for rule in job.rules) + preemptible = all(is_p(rule.name) for rule in job.rules) if not preemptible and any( rule in self.preemptible_rules for rule in job.rules ): @@ -149,7 +125,7 @@ def is_preemptible(self, job): "(see Defining groups for execution in snakemake documentation)" ) else: - preemptible = job.rule.name in self.preemptible_rules + preemptible = is_p(job.rule.name) return preemptible def get_command_writer(self, job): @@ -247,8 +223,16 @@ def run_job(self, job: JobExecutorInterface): group.permissive_ssh = True # This includes instances (machine type) boot disk and policy + # Also preemtion policy = self.get_allocation_policy(job) + # If we have preemption for the job and retry, update task retries with it + retries = self.workflow.remote_execution_settings.preemptible_retries + if self.is_preemptible(job) and retries: + self.logger.debug(f"Updating preemptible retries to {retries}") + task.max_retry_count = retries + self.workflow.remote_execution_settings.preemptible_retries + batchjob = batch_v1.Job() batchjob.task_groups = [group] batchjob.allocation_policy = policy From 2fbb0b1aa56a18f8c5d1721dfbf913d586266441 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 14 Jan 2024 19:59:08 -0700 Subject: [PATCH 08/10] assume job.rules produces strings of the job name Signed-off-by: vsoch --- snakemake_executor_plugin_googlebatch/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snakemake_executor_plugin_googlebatch/executor.py b/snakemake_executor_plugin_googlebatch/executor.py index 776bccb..bf692f1 100644 --- a/snakemake_executor_plugin_googlebatch/executor.py +++ b/snakemake_executor_plugin_googlebatch/executor.py @@ -116,7 +116,7 @@ def is_preemptible(self, job): """ is_p = self.workflow.remote_execution_settings.preemptible_rules.is_preemptible if job.is_group(): - preemptible = all(is_p(rule.name) for rule in job.rules) + preemptible = all(is_p(rule) for rule in job.rules) if not preemptible and any( rule in self.preemptible_rules for rule in job.rules ): From f68e66ae3d7d6ed7ec078afd376de08098b3994f Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 14 Jan 2024 20:28:54 -0700 Subject: [PATCH 09/10] clean up is preemptible function the logic for checking the accuracy of the preemptible rules (that if one is set but not the others, and raising a workflow error if this is off) should belong upstream as this is no longer a parameter owned by googlebatch. Signed-off-by: vsoch --- snakemake_executor_plugin_googlebatch/executor.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/snakemake_executor_plugin_googlebatch/executor.py b/snakemake_executor_plugin_googlebatch/executor.py index bf692f1..d8f82ce 100644 --- a/snakemake_executor_plugin_googlebatch/executor.py +++ b/snakemake_executor_plugin_googlebatch/executor.py @@ -113,17 +113,12 @@ def validate(self, job): def is_preemptible(self, job): """ Determine if a job is preemptible. + + The logic for determining if the set is valid should belong upstream. """ is_p = self.workflow.remote_execution_settings.preemptible_rules.is_preemptible if job.is_group(): preemptible = all(is_p(rule) for rule in job.rules) - if not preemptible and any( - rule in self.preemptible_rules for rule in job.rules - ): - raise WorkflowError( - "All grouped rules should be homogenously set as preemptible rules" - "(see Defining groups for execution in snakemake documentation)" - ) else: preemptible = is_p(job.rule.name) return preemptible From e4c5a23c135b8e2f0ea82e81b6594322d2a4124d Mon Sep 17 00:00:00 2001 From: Vanessasaurus <814322+vsoch@users.noreply.github.com> Date: Fri, 19 Jan 2024 01:29:06 -0700 Subject: [PATCH 10/10] Update snakemake_executor_plugin_googlebatch/executor.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Johannes Köster --- snakemake_executor_plugin_googlebatch/executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/snakemake_executor_plugin_googlebatch/executor.py b/snakemake_executor_plugin_googlebatch/executor.py index d8f82ce..d0153d4 100644 --- a/snakemake_executor_plugin_googlebatch/executor.py +++ b/snakemake_executor_plugin_googlebatch/executor.py @@ -226,7 +226,6 @@ def run_job(self, job: JobExecutorInterface): if self.is_preemptible(job) and retries: self.logger.debug(f"Updating preemptible retries to {retries}") task.max_retry_count = retries - self.workflow.remote_execution_settings.preemptible_retries batchjob = batch_v1.Job() batchjob.task_groups = [group]