diff --git a/README.md b/README.md index 3446c6a..d4dc85e 100644 --- a/README.md +++ b/README.md @@ -3,34 +3,13 @@ This is the [Google Batch](https://cloud.google.com/batch/docs/get-started) external executor plugin for snakemake. For documentation, see the [Snakemake plugin catalog](https://snakemake.github.io/snakemake-plugin-catalog/plugins/executor/googlebatch.html). -### TODO - -- Add bash strict mode (should default to true) -- Integrate snakemake MPI support (needs to work with snippet) - - -### Questions - -- What leads to STATE_UNSPECIFIED? -- For Google: what is the source of truth (listing) for batch? I see different answers in different places. -- For Johannes: Why can't we use debug logging for executor plugins? I instead need to use info and make it very verbose. -- For All: How do we want to use [COS](https://cloud.google.com/container-optimized-os/docs/concepts/features-and-benefits)? It would allow a container base to be used instead I think? - ### Notes - Conda is used to install Snakemake and dependencies. - The COS (container OS) uses the default Snakemake container, unless you specify differently. -### Feedback - -- Debugging batch is impossible (and slow). A "hello world" workflow takes 10 minutes to run and debug once. -- The jobs table is slow to load and sometimes does not load / shows old jobs at the top (without me touching anything) -- The logs directly in batch are so much better! Having the stream option there would still be nice (vs. having to refresh.) -- The batch UI (jobs table) is very slow to load and often just doesn't even after button or page refresh. - For examples, look into the [examples](examples) directory. - ## Developer The instructions for creating and scaffolding this plugin are [here](https://github.com/snakemake/poetry-snakemake-plugin#scaffolding-an-executor-plugin). diff --git a/docs/further.md b/docs/further.md index 96a3a55..e031c7c 100644 --- a/docs/further.md +++ b/docs/further.md @@ -331,4 +331,4 @@ rule hello_world: googlebatch_snippets="mpi,myscript.sh" shell: "..." -``` \ No newline at end of file +``` diff --git a/example/README.md b/example/README.md new file mode 100644 index 0000000..a9ae8cf --- /dev/null +++ b/example/README.md @@ -0,0 +1,6 @@ +# Snakemake Google Batch Examples + + - [hello-world](hello-world): The most basic hello world example with multiple langauges + - [hello-world-intel-mpi](hello-world-intel-mpi): Run an example MPI job to calculate Pi with intel MPI + - [hello-world-cos](hello-world-cos): The same using the container operating system (COS) + - [hello-world-preemption](hello-world-preemption): Example asking for preemptible jobs for all rules. diff --git a/example/hello-world-preemption/README.md b/example/hello-world-preemption/README.md new file mode 100644 index 0000000..ba03c98 --- /dev/null +++ b/example/hello-world-preemption/README.md @@ -0,0 +1,9 @@ +# Hello World + +> with preemtible instances + +Here is an example specifying to use an s3 bucket and asking for preemptible instances. + +```bash +snakemake --jobs 1 --executor googlebatch --googlebatch-region us-central1 --googlebatch-project llnl-flux --default-storage-provider s3 --default-storage-prefix s3://snakemake-testing-llnl --preemptible-rules +``` diff --git a/example/hello-world-preemption/Snakefile b/example/hello-world-preemption/Snakefile new file mode 100644 index 0000000..12f03bc --- /dev/null +++ b/example/hello-world-preemption/Snakefile @@ -0,0 +1,19 @@ +# By convention, the first pseudorule should be called "all" +# We're using the expand() function to create multiple targets +rule all: + input: + expand( + "{greeting}/world.txt", + greeting = ['hello', 'hola'], + ), + +# First real rule, this is using a wildcard called "greeting" +rule multilingual_hello_world: + output: + "{greeting}/world.txt", + shell: + """ + mkdir -p "{wildcards.greeting}" + sleep 5 + echo "{wildcards.greeting}, World!" > {output} + """ diff --git a/scripts/install-snek.sh b/scripts/install-snek.sh new file mode 100755 index 0000000..a051654 --- /dev/null +++ b/scripts/install-snek.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +url=$1 + +tmp=$(mktemp -d -t snek-install-XXXX) +rm -rf ${tmp} + +git clone --depth 1 ${url} ${tmp} +cd ${tmp} +/opt/conda/bin/python -m pip install . +cd - +rm -rf ${tmp} diff --git a/snakemake_executor_plugin_googlebatch/__init__.py b/snakemake_executor_plugin_googlebatch/__init__.py index 3573b20..daed653 100644 --- a/snakemake_executor_plugin_googlebatch/__init__.py +++ b/snakemake_executor_plugin_googlebatch/__init__.py @@ -82,7 +82,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - work_tasks: Optional[str] = field( + work_tasks: Optional[int] = field( default=1, metadata={ "help": "The default number of work tasks (these are NOT MPI ranks)", @@ -91,7 +91,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - cpu_milli: Optional[str] = field( + cpu_milli: Optional[int] = field( default=1000, metadata={ "help": "Milliseconds per cpu-second", @@ -100,7 +100,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - cpu_milli: Optional[str] = field( + cpu_milli: Optional[int] = field( default=1000, metadata={ "help": "Milliseconds per cpu-second", @@ -109,7 +109,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - work_tasks_per_node: Optional[str] = field( + work_tasks_per_node: Optional[int] = field( default=1, metadata={ "help": "The default number of work tasks per node (NOT MPI ranks)", @@ -118,7 +118,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - memory: Optional[str] = field( + memory: Optional[int] = field( default=1000, metadata={ "help": "Memory in MiB", @@ -136,7 +136,7 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) - retry_count: Optional[str] = field( + retry_count: Optional[int] = field( default=1, metadata={ "help": "Retry count (default to 1)", diff --git a/snakemake_executor_plugin_googlebatch/command.py b/snakemake_executor_plugin_googlebatch/command.py index 000afa9..b85e092 100644 --- a/snakemake_executor_plugin_googlebatch/command.py +++ b/snakemake_executor_plugin_googlebatch/command.py @@ -31,38 +31,31 @@ ) install_snakemake = """ -# Only the main job should install conda (rest can use it) echo "I am batch index ${BATCH_TASK_INDEX}" export PATH=/opt/conda/bin:${PATH} -if [ $BATCH_TASK_INDEX = 0 ] && [ ! -d "/opt/conda" ] ; then - workdir=$(pwd) - url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh - wget ${url} -O ./miniconda.sh - chmod +x ./miniconda.sh - bash ./miniconda.sh -b -u -p /opt/conda - rm -rf ./miniconda.sh - conda config --system --set channel_priority strict - which python - /opt/conda/bin/python --version - url=https://github.com/snakemake/snakemake-interface-common - git clone --depth 1 ${url} /tmp/snakemake-common - cd /tmp/snakemake-common - /opt/conda/bin/python -m pip install . - url=https://github.com/snakemake/snakemake-interface-executor-plugins - git clone --depth 1 ${url} /tmp/snakemake-plugin - cd /tmp/snakemake-plugin - /opt/conda/bin/python -m pip install . - git clone --depth 1 https://github.com/snakemake/snakemake /tmp/snakemake - cd /tmp/snakemake - /opt/conda/bin/python -m pip install . - cd ${workdir} -fi +repo=https://raw.githubusercontent.com/snakemake/snakemake-executor-plugin-googlebatch +path=add-preemtible/scripts/install-snek.sh +wget ${repo}/${path} +chmod +x ./install-snek.sh +workdir=$(pwd) +url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh +wget ${url} -O ./miniconda.sh +chmod +x ./miniconda.sh +bash ./miniconda.sh -b -u -p /opt/conda +rm -rf ./miniconda.sh + +which python +/opt/conda/bin/python --version +./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-gcs +./install-snek.sh https://github.com/snakemake/snakemake + +cd ${workdir} """ check_for_snakemake = ( snakemake_base_environment + """ -$(pwd) +echo $(pwd) ls which snakemake || whereis snakemake """ diff --git a/snakemake_executor_plugin_googlebatch/executor.py b/snakemake_executor_plugin_googlebatch/executor.py index 921d5bc..d0153d4 100644 --- a/snakemake_executor_plugin_googlebatch/executor.py +++ b/snakemake_executor_plugin_googlebatch/executor.py @@ -110,6 +110,19 @@ def validate(self, job): f"Job {job} has container without image_family batch-cos*." ) + def is_preemptible(self, job): + """ + Determine if a job is preemptible. + + The logic for determining if the set is valid should belong upstream. + """ + is_p = self.workflow.remote_execution_settings.preemptible_rules.is_preemptible + if job.is_group(): + preemptible = all(is_p(rule) for rule in job.rules) + else: + preemptible = is_p(job.rule.name) + return preemptible + def get_command_writer(self, job): """ Get a command writer for a job. @@ -178,6 +191,9 @@ def run_job(self, job: JobExecutorInterface): setup.script = batch_v1.Runnable.Script() setup.script.text = setup_command + # Placement policy + # https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.PlacementPolicy + # This will ensure all nodes finish first barrier = batch_v1.Runnable() barrier.barrier = batch_v1.Runnable.Barrier() @@ -202,8 +218,15 @@ def run_job(self, job: JobExecutorInterface): group.permissive_ssh = True # This includes instances (machine type) boot disk and policy + # Also preemtion policy = self.get_allocation_policy(job) + # If we have preemption for the job and retry, update task retries with it + retries = self.workflow.remote_execution_settings.preemptible_retries + if self.is_preemptible(job) and retries: + self.logger.debug(f"Updating preemptible retries to {retries}") + task.max_retry_count = retries + batchjob = batch_v1.Job() batchjob.task_groups = [group] batchjob.allocation_policy = policy @@ -262,6 +285,12 @@ def get_allocation_policy(self, job): policy.machine_type = machine_type policy.boot_disk = boot_disk + # Do we want preemptible? + # https://github.com/googleapis/googleapis/blob/master/google/cloud/batch/v1/job.proto#L479 and # noqa + # https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-batch/google/cloud/batch_v1/types/job.py#L672 # noqa + if self.is_preemptible(job): + policy.provisioning_model = 3 + instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate() instances.policy = policy allocation_policy.instances = [instances]