Skip to content

Commit

Permalink
feat: preemption (#25)
Browse files Browse the repository at this point in the history
This is a WIP because there is a bug #24 with what looks to be the
google storage plugin installed. Ping @johanneskoester I need to get #24
fixed before can proceed with more features here. Happy New Year!

---------

Signed-off-by: vsoch <[email protected]>
Co-authored-by: vsoch <[email protected]>
Co-authored-by: Johannes Köster <[email protected]>
  • Loading branch information
3 people authored Jan 19, 2024
1 parent 2951454 commit d6913a1
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 53 deletions.
21 changes: 0 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,13 @@
This is the [Google Batch](https://cloud.google.com/batch/docs/get-started) external executor plugin for snakemake.
For documentation, see the [Snakemake plugin catalog](https://snakemake.github.io/snakemake-plugin-catalog/plugins/executor/googlebatch.html).

### TODO

- Add bash strict mode (should default to true)
- Integrate snakemake MPI support (needs to work with snippet)


### Questions

- What leads to STATE_UNSPECIFIED?
- For Google: what is the source of truth (listing) for batch? I see different answers in different places.
- For Johannes: Why can't we use debug logging for executor plugins? I instead need to use info and make it very verbose.
- For All: How do we want to use [COS](https://cloud.google.com/container-optimized-os/docs/concepts/features-and-benefits)? It would allow a container base to be used instead I think?

### Notes

- Conda is used to install Snakemake and dependencies.
- The COS (container OS) uses the default Snakemake container, unless you specify differently.

### Feedback

- Debugging batch is impossible (and slow). A "hello world" workflow takes 10 minutes to run and debug once.
- The jobs table is slow to load and sometimes does not load / shows old jobs at the top (without me touching anything)
- The logs directly in batch are so much better! Having the stream option there would still be nice (vs. having to refresh.)
- The batch UI (jobs table) is very slow to load and often just doesn't even after button or page refresh.

For examples, look into the [examples](examples) directory.


## Developer

The instructions for creating and scaffolding this plugin are [here](https://github.com/snakemake/poetry-snakemake-plugin#scaffolding-an-executor-plugin).
Expand Down
2 changes: 1 addition & 1 deletion docs/further.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,4 @@ rule hello_world:
googlebatch_snippets="mpi,myscript.sh"
shell:
"..."
```
```
6 changes: 6 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Snakemake Google Batch Examples

- [hello-world](hello-world): The most basic hello world example with multiple langauges
- [hello-world-intel-mpi](hello-world-intel-mpi): Run an example MPI job to calculate Pi with intel MPI
- [hello-world-cos](hello-world-cos): The same using the container operating system (COS)
- [hello-world-preemption](hello-world-preemption): Example asking for preemptible jobs for all rules.
9 changes: 9 additions & 0 deletions example/hello-world-preemption/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Hello World

> with preemtible instances
Here is an example specifying to use an s3 bucket and asking for preemptible instances.

```bash
snakemake --jobs 1 --executor googlebatch --googlebatch-region us-central1 --googlebatch-project llnl-flux --default-storage-provider s3 --default-storage-prefix s3://snakemake-testing-llnl --preemptible-rules
```
19 changes: 19 additions & 0 deletions example/hello-world-preemption/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# By convention, the first pseudorule should be called "all"
# We're using the expand() function to create multiple targets
rule all:
input:
expand(
"{greeting}/world.txt",
greeting = ['hello', 'hola'],
),

# First real rule, this is using a wildcard called "greeting"
rule multilingual_hello_world:
output:
"{greeting}/world.txt",
shell:
"""
mkdir -p "{wildcards.greeting}"
sleep 5
echo "{wildcards.greeting}, World!" > {output}
"""
12 changes: 12 additions & 0 deletions scripts/install-snek.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

url=$1

tmp=$(mktemp -d -t snek-install-XXXX)
rm -rf ${tmp}

git clone --depth 1 ${url} ${tmp}
cd ${tmp}
/opt/conda/bin/python -m pip install .
cd -
rm -rf ${tmp}
12 changes: 6 additions & 6 deletions snakemake_executor_plugin_googlebatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

work_tasks: Optional[str] = field(
work_tasks: Optional[int] = field(
default=1,
metadata={
"help": "The default number of work tasks (these are NOT MPI ranks)",
Expand All @@ -91,7 +91,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

cpu_milli: Optional[str] = field(
cpu_milli: Optional[int] = field(
default=1000,
metadata={
"help": "Milliseconds per cpu-second",
Expand All @@ -100,7 +100,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

cpu_milli: Optional[str] = field(
cpu_milli: Optional[int] = field(
default=1000,
metadata={
"help": "Milliseconds per cpu-second",
Expand All @@ -109,7 +109,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

work_tasks_per_node: Optional[str] = field(
work_tasks_per_node: Optional[int] = field(
default=1,
metadata={
"help": "The default number of work tasks per node (NOT MPI ranks)",
Expand All @@ -118,7 +118,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

memory: Optional[str] = field(
memory: Optional[int] = field(
default=1000,
metadata={
"help": "Memory in MiB",
Expand All @@ -136,7 +136,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

retry_count: Optional[str] = field(
retry_count: Optional[int] = field(
default=1,
metadata={
"help": "Retry count (default to 1)",
Expand Down
43 changes: 18 additions & 25 deletions snakemake_executor_plugin_googlebatch/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,31 @@
)

install_snakemake = """
# Only the main job should install conda (rest can use it)
echo "I am batch index ${BATCH_TASK_INDEX}"
export PATH=/opt/conda/bin:${PATH}
if [ $BATCH_TASK_INDEX = 0 ] && [ ! -d "/opt/conda" ] ; then
workdir=$(pwd)
url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
wget ${url} -O ./miniconda.sh
chmod +x ./miniconda.sh
bash ./miniconda.sh -b -u -p /opt/conda
rm -rf ./miniconda.sh
conda config --system --set channel_priority strict
which python
/opt/conda/bin/python --version
url=https://github.com/snakemake/snakemake-interface-common
git clone --depth 1 ${url} /tmp/snakemake-common
cd /tmp/snakemake-common
/opt/conda/bin/python -m pip install .
url=https://github.com/snakemake/snakemake-interface-executor-plugins
git clone --depth 1 ${url} /tmp/snakemake-plugin
cd /tmp/snakemake-plugin
/opt/conda/bin/python -m pip install .
git clone --depth 1 https://github.com/snakemake/snakemake /tmp/snakemake
cd /tmp/snakemake
/opt/conda/bin/python -m pip install .
cd ${workdir}
fi
repo=https://raw.githubusercontent.com/snakemake/snakemake-executor-plugin-googlebatch
path=add-preemtible/scripts/install-snek.sh
wget ${repo}/${path}
chmod +x ./install-snek.sh
workdir=$(pwd)
url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
wget ${url} -O ./miniconda.sh
chmod +x ./miniconda.sh
bash ./miniconda.sh -b -u -p /opt/conda
rm -rf ./miniconda.sh
which python
/opt/conda/bin/python --version
./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-gcs
./install-snek.sh https://github.com/snakemake/snakemake
cd ${workdir}
"""

check_for_snakemake = (
snakemake_base_environment
+ """
$(pwd)
echo $(pwd)
ls
which snakemake || whereis snakemake
"""
Expand Down
29 changes: 29 additions & 0 deletions snakemake_executor_plugin_googlebatch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ def validate(self, job):
f"Job {job} has container without image_family batch-cos*."
)

def is_preemptible(self, job):
"""
Determine if a job is preemptible.
The logic for determining if the set is valid should belong upstream.
"""
is_p = self.workflow.remote_execution_settings.preemptible_rules.is_preemptible
if job.is_group():
preemptible = all(is_p(rule) for rule in job.rules)
else:
preemptible = is_p(job.rule.name)
return preemptible

def get_command_writer(self, job):
"""
Get a command writer for a job.
Expand Down Expand Up @@ -178,6 +191,9 @@ def run_job(self, job: JobExecutorInterface):
setup.script = batch_v1.Runnable.Script()
setup.script.text = setup_command

# Placement policy
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.PlacementPolicy

# This will ensure all nodes finish first
barrier = batch_v1.Runnable()
barrier.barrier = batch_v1.Runnable.Barrier()
Expand All @@ -202,8 +218,15 @@ def run_job(self, job: JobExecutorInterface):
group.permissive_ssh = True

# This includes instances (machine type) boot disk and policy
# Also preemtion
policy = self.get_allocation_policy(job)

# If we have preemption for the job and retry, update task retries with it
retries = self.workflow.remote_execution_settings.preemptible_retries
if self.is_preemptible(job) and retries:
self.logger.debug(f"Updating preemptible retries to {retries}")
task.max_retry_count = retries

batchjob = batch_v1.Job()
batchjob.task_groups = [group]
batchjob.allocation_policy = policy
Expand Down Expand Up @@ -262,6 +285,12 @@ def get_allocation_policy(self, job):
policy.machine_type = machine_type
policy.boot_disk = boot_disk

# Do we want preemptible?
# https://github.com/googleapis/googleapis/blob/master/google/cloud/batch/v1/job.proto#L479 and # noqa
# https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-batch/google/cloud/batch_v1/types/job.py#L672 # noqa
if self.is_preemptible(job):
policy.provisioning_model = 3

instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy.instances = [instances]
Expand Down

0 comments on commit d6913a1

Please sign in to comment.