Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: preemption #25

Merged
merged 10 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,13 @@
This is the [Google Batch](https://cloud.google.com/batch/docs/get-started) external executor plugin for snakemake.
For documentation, see the [Snakemake plugin catalog](https://snakemake.github.io/snakemake-plugin-catalog/plugins/executor/googlebatch.html).

### TODO

- Add bash strict mode (should default to true)
- Integrate snakemake MPI support (needs to work with snippet)


### Questions

- What leads to STATE_UNSPECIFIED?
- For Google: what is the source of truth (listing) for batch? I see different answers in different places.
- For Johannes: Why can't we use debug logging for executor plugins? I instead need to use info and make it very verbose.
- For All: How do we want to use [COS](https://cloud.google.com/container-optimized-os/docs/concepts/features-and-benefits)? It would allow a container base to be used instead I think?

### Notes

- Conda is used to install Snakemake and dependencies.
- The COS (container OS) uses the default Snakemake container, unless you specify differently.

### Feedback

- Debugging batch is impossible (and slow). A "hello world" workflow takes 10 minutes to run and debug once.
- The jobs table is slow to load and sometimes does not load / shows old jobs at the top (without me touching anything)
- The logs directly in batch are so much better! Having the stream option there would still be nice (vs. having to refresh.)
- The batch UI (jobs table) is very slow to load and often just doesn't even after button or page refresh.

For examples, look into the [examples](examples) directory.


## Developer

The instructions for creating and scaffolding this plugin are [here](https://github.com/snakemake/poetry-snakemake-plugin#scaffolding-an-executor-plugin).
Expand Down
2 changes: 1 addition & 1 deletion docs/further.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,4 @@ rule hello_world:
googlebatch_snippets="mpi,myscript.sh"
shell:
"..."
```
```
6 changes: 6 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Snakemake Google Batch Examples

- [hello-world](hello-world): The most basic hello world example with multiple langauges
- [hello-world-intel-mpi](hello-world-intel-mpi): Run an example MPI job to calculate Pi with intel MPI
- [hello-world-cos](hello-world-cos): The same using the container operating system (COS)
- [hello-world-preemption](hello-world-preemption): Example asking for preemptible jobs for all rules.
9 changes: 9 additions & 0 deletions example/hello-world-preemption/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Hello World

> with preemtible instances
Here is an example specifying to use an s3 bucket and asking for preemptible instances.

```bash
snakemake --jobs 1 --executor googlebatch --googlebatch-region us-central1 --googlebatch-project llnl-flux --default-storage-provider s3 --default-storage-prefix s3://snakemake-testing-llnl --preemptible-rules
```
19 changes: 19 additions & 0 deletions example/hello-world-preemption/Snakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# By convention, the first pseudorule should be called "all"
# We're using the expand() function to create multiple targets
rule all:
input:
expand(
"{greeting}/world.txt",
greeting = ['hello', 'hola'],
),

# First real rule, this is using a wildcard called "greeting"
rule multilingual_hello_world:
output:
"{greeting}/world.txt",
shell:
"""
mkdir -p "{wildcards.greeting}"
sleep 5
echo "{wildcards.greeting}, World!" > {output}
"""
12 changes: 12 additions & 0 deletions scripts/install-snek.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

url=$1

tmp=$(mktemp -d -t snek-install-XXXX)
rm -rf ${tmp}

git clone --depth 1 ${url} ${tmp}
cd ${tmp}
/opt/conda/bin/python -m pip install .
cd -
rm -rf ${tmp}
12 changes: 6 additions & 6 deletions snakemake_executor_plugin_googlebatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

work_tasks: Optional[str] = field(
work_tasks: Optional[int] = field(
default=1,
metadata={
"help": "The default number of work tasks (these are NOT MPI ranks)",
Expand All @@ -91,7 +91,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

cpu_milli: Optional[str] = field(
cpu_milli: Optional[int] = field(
default=1000,
metadata={
"help": "Milliseconds per cpu-second",
Expand All @@ -100,7 +100,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

cpu_milli: Optional[str] = field(
cpu_milli: Optional[int] = field(
default=1000,
metadata={
"help": "Milliseconds per cpu-second",
Expand All @@ -109,7 +109,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

work_tasks_per_node: Optional[str] = field(
work_tasks_per_node: Optional[int] = field(
default=1,
metadata={
"help": "The default number of work tasks per node (NOT MPI ranks)",
Expand All @@ -118,7 +118,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

memory: Optional[str] = field(
memory: Optional[int] = field(
default=1000,
metadata={
"help": "Memory in MiB",
Expand All @@ -136,7 +136,7 @@ class ExecutorSettings(ExecutorSettingsBase):
},
)

retry_count: Optional[str] = field(
retry_count: Optional[int] = field(
default=1,
metadata={
"help": "Retry count (default to 1)",
Expand Down
43 changes: 18 additions & 25 deletions snakemake_executor_plugin_googlebatch/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,31 @@
)

install_snakemake = """
# Only the main job should install conda (rest can use it)
echo "I am batch index ${BATCH_TASK_INDEX}"
export PATH=/opt/conda/bin:${PATH}
if [ $BATCH_TASK_INDEX = 0 ] && [ ! -d "/opt/conda" ] ; then
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure about best practice here - if we should install snakemake just to the main executor (running the workflow) or as I've done, allow installing to all. For the other jobs (that ran OK) I was just installing to index 0. We can run a test again after this and see if the previous are still working OK. Intuitively unless there is an obvious shared filesystem (there isn't at /opt) I would want snakemake installed across nodes.

workdir=$(pwd)
url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
wget ${url} -O ./miniconda.sh
chmod +x ./miniconda.sh
bash ./miniconda.sh -b -u -p /opt/conda
rm -rf ./miniconda.sh
conda config --system --set channel_priority strict
which python
/opt/conda/bin/python --version
url=https://github.com/snakemake/snakemake-interface-common
git clone --depth 1 ${url} /tmp/snakemake-common
cd /tmp/snakemake-common
/opt/conda/bin/python -m pip install .
url=https://github.com/snakemake/snakemake-interface-executor-plugins
git clone --depth 1 ${url} /tmp/snakemake-plugin
cd /tmp/snakemake-plugin
/opt/conda/bin/python -m pip install .
git clone --depth 1 https://github.com/snakemake/snakemake /tmp/snakemake
cd /tmp/snakemake
/opt/conda/bin/python -m pip install .
cd ${workdir}
fi
repo=https://raw.githubusercontent.com/snakemake/snakemake-executor-plugin-googlebatch
path=add-preemtible/scripts/install-snek.sh
wget ${repo}/${path}
chmod +x ./install-snek.sh
workdir=$(pwd)
url=https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
wget ${url} -O ./miniconda.sh
chmod +x ./miniconda.sh
bash ./miniconda.sh -b -u -p /opt/conda
rm -rf ./miniconda.sh

which python
/opt/conda/bin/python --version
./install-snek.sh https://github.com/snakemake/snakemake-storage-plugin-gcs
./install-snek.sh https://github.com/snakemake/snakemake

cd ${workdir}
"""

check_for_snakemake = (
snakemake_base_environment
+ """
$(pwd)
echo $(pwd)
ls
which snakemake || whereis snakemake
"""
Expand Down
29 changes: 29 additions & 0 deletions snakemake_executor_plugin_googlebatch/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,19 @@ def validate(self, job):
f"Job {job} has container without image_family batch-cos*."
)

def is_preemptible(self, job):
"""
Determine if a job is preemptible.

The logic for determining if the set is valid should belong upstream.
"""
is_p = self.workflow.remote_execution_settings.preemptible_rules.is_preemptible
if job.is_group():
preemptible = all(is_p(rule) for rule in job.rules)
else:
preemptible = is_p(job.rule.name)
return preemptible

def get_command_writer(self, job):
"""
Get a command writer for a job.
Expand Down Expand Up @@ -178,6 +191,9 @@ def run_job(self, job: JobExecutorInterface):
setup.script = batch_v1.Runnable.Script()
setup.script.text = setup_command

# Placement policy
# https://cloud.google.com/python/docs/reference/batch/latest/google.cloud.batch_v1.types.AllocationPolicy.PlacementPolicy

# This will ensure all nodes finish first
barrier = batch_v1.Runnable()
barrier.barrier = batch_v1.Runnable.Barrier()
Expand All @@ -202,8 +218,15 @@ def run_job(self, job: JobExecutorInterface):
group.permissive_ssh = True

# This includes instances (machine type) boot disk and policy
# Also preemtion
policy = self.get_allocation_policy(job)

# If we have preemption for the job and retry, update task retries with it
retries = self.workflow.remote_execution_settings.preemptible_retries
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task has retries, and I figure that parameter corresponds to "preemtible retries" when that is set. Note that without it, it's just the number of retries regardless of preemptible.

if self.is_preemptible(job) and retries:
self.logger.debug(f"Updating preemptible retries to {retries}")
task.max_retry_count = retries

batchjob = batch_v1.Job()
batchjob.task_groups = [group]
batchjob.allocation_policy = policy
Expand Down Expand Up @@ -262,6 +285,12 @@ def get_allocation_policy(self, job):
policy.machine_type = machine_type
policy.boot_disk = boot_disk

# Do we want preemptible?
# https://github.com/googleapis/googleapis/blob/master/google/cloud/batch/v1/job.proto#L479 and # noqa
# https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-batch/google/cloud/batch_v1/types/job.py#L672 # noqa
if self.is_preemptible(job):
policy.provisioning_model = 3

instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy.instances = [instances]
Expand Down
Loading