diff --git a/docs/further.md b/docs/further.md index 4980e20..daad1e9 100644 --- a/docs/further.md +++ b/docs/further.md @@ -100,6 +100,8 @@ $ snakemake --jobs 1 --executor googlebatch --googlebatch-bucket snakemake-cache The following environment variables are available within any Google batch run: - `BATCH_TASK_INDEX`: The index of the workflow step (Google Batch calls a "task") + - `GOOGLEBATCH_DOCKER_PASSWORD`: your docker registry passwork if using the container operating system (COS) and your container requires credentials + - `GOOGLEBATCH_DOCKER_USERNAME`: the same, but the username ### GPU @@ -142,6 +144,15 @@ rule hello_world: "..." ``` +Note that the way to get updated names is to run: + +```bash +gcloud compute images list \ + --project=batch-custom-image \ + --no-standard-images +``` + +And see [this page](https://cloud.google.com/batch/docs/view-os-images) for more details. #### googlebatch_image_project @@ -373,6 +384,7 @@ rule hello_world: "..." ``` + #### googlebatch_snippets One or more named (or file-derived) snippets to add to setup. diff --git a/example/hello-world-cos/README.md b/example/hello-world-cos/README.md index f798006..318c39d 100644 --- a/example/hello-world-cos/README.md +++ b/example/hello-world-cos/README.md @@ -9,19 +9,12 @@ gcloud compute images list \ --no-standard-images ``` -Here is how to use the debian base: +Here is an example command: ```bash GOOGLE_PROJECT=myproject -snakemake --jobs 1 --executor googlebatch --googlebatch-image-family batch-debian-11-official --googlebatch-region us-central1 --googlebatch-image-project batch-custom-image --googlebatch-project ${GOOGLE_PROJECT} --default-storage-provider s3 --default-storage-prefix s3://my-snakemake-testing +snakemake --jobs 1 --executor googlebatch --googlebatch-image-family batch-cos-stable-official --googlebatch-region us-central1 --googlebatch-image-project batch-custom-image --googlebatch-project ${GOOGLE_PROJECT} --default-storage-provider s3 --default-storage-prefix s3://my-snakemake-testing ``` -And a centos. - -```bash -snakemake --jobs 1 --executor googlebatch --googlebatch-image-family batch-centos-7-official --googlebatch-region us-central1 --googlebatch-image-project batch-custom-image --googlebatch-project ${GOOGLE_PROJECT} --default-storage-provider s3 --default-storage-prefix s3://my-snakemake-testing -``` - - See [this link](https://cloud.google.com/batch/docs/vm-os-environment-overview#supported_vm_os_images) for how to find a compatible COS image project and family. You can also see information [here](https://cloud.google.com/batch/docs/view-os-images), \ No newline at end of file diff --git a/snakemake_executor_plugin_googlebatch/__init__.py b/snakemake_executor_plugin_googlebatch/__init__.py index 2186c94..1cb1f31 100644 --- a/snakemake_executor_plugin_googlebatch/__init__.py +++ b/snakemake_executor_plugin_googlebatch/__init__.py @@ -40,6 +40,33 @@ class ExecutorSettings(ExecutorSettingsBase): }, ) + container: Optional[str] = field( + default=None, + metadata={ + "help": "A custom container for use with Google Batch COS", + "env_var": False, + "required": False, + }, + ) + + docker_password: Optional[str] = field( + default=None, + metadata={ + "help": "A docker registry password for COS if credentials are required", + "env_var": True, + "required": False, + }, + ) + + docker_username: Optional[str] = field( + default=None, + metadata={ + "help": "A docker registry username for COS if credentials are required", + "env_var": True, + "required": False, + }, + ) + # mpitune configurations are validated on c2 and c2d instances only. machine_type: Optional[str] = field( default="c2-standard-4", diff --git a/snakemake_executor_plugin_googlebatch/command.py b/snakemake_executor_plugin_googlebatch/command.py index 602eff2..923e701 100644 --- a/snakemake_executor_plugin_googlebatch/command.py +++ b/snakemake_executor_plugin_googlebatch/command.py @@ -2,10 +2,31 @@ import snakemake_executor_plugin_googlebatch.snippet as sniputil -write_snakefile = """cat < ./Snakefile +write_snakefile = """ +#!/bin/bash +snakefile_path=$(realpath %s) +snakefile_dir=$(dirname $snakefile_path) +mkdir -p $snakefile_dir || true +cat < $snakefile_path %s EOF -cat ./Snakefile +echo "Snakefile is at $snakefile_path" +cat $snakefile_path +""" + +write_entrypoint = """ +#!/bin/bash + +mkdir -p /tmp/workdir +cat < /tmp/workdir/entrypoint.sh +%s + +# https://github.com/boto/botocore/issues/3111 +python3 -m pip install boto3==1.33.11 +python3 -m pip install urllib3==1.26.17 +EOF +chmod +x /tmp/workdir/entrypoint.sh +cat /tmp/workdir/entrypoint.sh """ snakemake_base_environment = """export HOME=/root @@ -32,6 +53,7 @@ install_snakemake = """ echo "I am batch index ${BATCH_TASK_INDEX}" + export PATH=/opt/conda/bin:${PATH} repo=https://raw.githubusercontent.com/snakemake/snakemake-executor-plugin-googlebatch path=main/scripts/install-snek.sh @@ -73,19 +95,19 @@ class CommandWriter: def __init__( self, command=None, - container=None, snakefile=None, snippets=None, settings=None, resources=None, + snakefile_path=None, ): self.command = command - self.container = container - # This is the contents of the snakefile and not the path + # This is the contents of the snakefile self.snakefile = snakefile self.resources = resources self.settings = settings + self.snakefile_path = snakefile_path # Prepare (and validate) any provided snippets for the job self.load_snippets(snippets) @@ -100,23 +122,20 @@ def load_snippets(self, spec): self.snippets = sniputil.SnippetGroup(spec, self.settings, self.resources) self.snippets.validate() - def run(self, pre_commands=None): + def run(self): """ Write the command script. This is likely shared. We allow one or more pre-commands (e.g., to download artifacts) """ - pre_commands = pre_commands or [] - command = "" - for pre_command in pre_commands: - command += pre_command + "\n" + command = "\n" # Ensure we check for snakemake - command += "\n" + check_for_snakemake + command += check_for_snakemake # If we have a snippet group, add snippets before installing snakemake if self.snippets: - command += self.snippets.render_run(self.command, self.container) + command += self.snippets.render_run(self.command) # Don't include the main command twice if self.snippets.has_run_command_snippet: @@ -127,18 +146,24 @@ def setup(self): """ Derive the correct setup command based on the family. """ - raise NotImplementedError(f"Setup is not implemented for {self}.") + pass + + def write_snakefile(self): + """ + Return tempalted snakefile. We do this in a separate step so + a later container step can use it. + """ + return write_snakefile % (self.snakefile_path, self.snakefile) def _template_setup(self, template, use_container=False): """ Shared logic to template the setup command. """ command = template - command += write_snakefile % self.snakefile # If we have a snippet group, add snippets before installing snakemake if self.snippets: - command += self.snippets.render_setup(self.command, self.container) + command += self.snippets.render_setup(self.command) # If we don't use container, install snakemkae to VM if not use_container: @@ -153,26 +178,12 @@ class COSWriter(CommandWriter): def setup(self): """ - We pre-pull the container so they start at the same time. - """ - command = f"docker pull {self.container}" - return self._template_setup(command, use_container=True) - - def run(self, pre_commands=None): + Setup for the container operating system means writing + the entrypoint. We do not use any snippets here, using + a container assumes what the user needs is in the + container. """ - Write the run command script for cos. - - For this command we assume the container has python as python3 - """ - pre_commands = pre_commands or [] - command = "" - for pre_command in pre_commands: - command += pre_command + "\n" - command += write_snakefile % self.snakefile - volume = "$PWD/Snakefile:./Snakefile" - docker = f"docker run -it -v {volume} {self.container} {self.command}" - command += docker - return command + return write_entrypoint % self.command class DebianWriter(CommandWriter): diff --git a/snakemake_executor_plugin_googlebatch/executor.py b/snakemake_executor_plugin_googlebatch/executor.py index 9c8ee97..509b1d7 100644 --- a/snakemake_executor_plugin_googlebatch/executor.py +++ b/snakemake_executor_plugin_googlebatch/executor.py @@ -1,7 +1,7 @@ import os import uuid -from typing import List, Generator +from typing import List from snakemake_interface_common.exceptions import WorkflowError from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor @@ -95,21 +95,55 @@ def generate_jobid(self, job): uid = str(uuid.uuid4()) return job.name.replace("_", "-") + "-" + uid[0:6] - def validate(self, job): + def get_container(self, job, entrypoint=None, commands=None): """ - Ensure the choices of arguments make sense. + Get a container, if batch-cos is defined. """ - container = self.get_container(job) family = self.get_param(job, "image_family") + if "batch-cos" not in family: + return + + # Default entrypoint assumes the setup wrote our command here + if entrypoint is None: + entrypoint = "/bin/bash" + if commands is None: + commands = ["/tmp/workdir/entrypoint.sh"] + + # We use the default snakemake image or the container, but also + # honor a googlebatch_container in case it is distinct + image = self.workflow.remote_execution_settings.container_image + image = self.get_param(job, "container") or image + container = batch_v1.Runnable.Container() + container.image_uri = image + + # This is written by writer.setup() for COS + container.entrypoint = entrypoint + container.commands = commands + + # This will ensure the Snakefile is in the PWD of the COS container + container.volumes = ["/tmp/workdir:/tmp/workdir"] + container.options = ( + "--network host --workdir /tmp/workdir -e PYTHONUNBUFFERED=1" + ) - # We can't take a custom container if batch-cos is not set - # We only give a warning here in case it's a one-off setting - # https://cloud.google.com/batch/docs/vm-os-environment-overview - if "batch-cos" not in family and container: - self.logger.warning( - f"Job {job} has container without image_family batch-cos*." + username = self.get_param(job, "docker_username") + password = self.get_param(job, "docker_password") + + # Both are required + if (username and not password) or (password and not username): + raise WorkflowError( + "docker username and password are required if one is provided" ) + if username and password: + container.username = username + container.password = password + + # Not sure if we want to add this + # https://github.com/googleapis/googleapis/blob/master/google/cloud/batch/v1/task.proto#L230-L234 + # enable_image_streaming true + return container + def is_preemptible(self, job): """ Determine if a job is preemptible. @@ -128,18 +162,20 @@ def get_command_writer(self, job): Get a command writer for a job. """ family = self.get_param(job, "image_family") - container = self.workflow.remote_execution_settings.container_image command = self.format_job_exec(job) snakefile = self.read_snakefile() # Any custom snippets snippets = self.get_param(job, "snippets") + snakefile_path = "./Snakefile" + if "batch-cos" in family: + snakefile_path = "/tmp/workdir/Snakefile" return cmdutil.get_writer(family)( command=command, - container=container, snakefile=snakefile, snippets=snippets, + snakefile_path=snakefile_path, settings=self.workflow.executor_settings, resources=job.resources, ) @@ -167,20 +203,31 @@ def run_job(self, job: JobExecutorInterface): self.logger.info("\n🌟️ Setup Command:") print(setup_command) - pre_commands = [] - - # Run command - run_command = writer.run(pre_commands) - self.logger.info("\n🐍️ Snakemake Command:") - print(run_command) - # Add environment variables to the task envars = self.workflow.spawned_job_args_factory.envvars() # A single runnable to execute snakemake runnable = batch_v1.Runnable() - runnable.script = batch_v1.Runnable.Script() - runnable.script.text = run_command + + # If we have a container, add it - the script isn't used + container = self.get_container(job) + if container is not None: + runnable.container = container + snakefile_text = writer.write_snakefile() + else: + # Run command (not used for COS) + run_command = writer.run() + self.logger.info("\n🐍️ Snakemake Command:") + print(run_command) + + runnable.script = batch_v1.Runnable.Script() + runnable.script.text = run_command + snakefile_text = writer.write_snakefile() + + # Runnable to write Snakefile on the host + snakefile_step = batch_v1.Runnable() + snakefile_step.script = batch_v1.Runnable.Script() + snakefile_step.script.text = snakefile_text # Note that secret variables seem to require some # extra secret API enabled @@ -198,7 +245,7 @@ def run_job(self, job: JobExecutorInterface): barrier = batch_v1.Runnable() barrier.barrier = batch_v1.Runnable.Barrier() barrier.barrier.name = "wait-for-setup" - task.runnables = [setup, barrier, runnable] + task.runnables = [setup, snakefile_step, barrier, runnable] # Are we adding storage? self.add_storage(job, task) @@ -243,6 +290,7 @@ def run_job(self, job: JobExecutorInterface): # The job's parent is the region in which the job will run create_request.parent = self.project_parent(job) createdjob = self.batch.create_job(create_request) + print(createdjob) # Save aux metadata # Last seen will hold the timestamp of last recorded status @@ -400,9 +448,7 @@ def get_snakefile(self): assert os.path.exists(self.workflow.main_snakefile) return os.path.relpath(self.workflow.main_snakefile, os.getcwd()) - async def check_active_jobs( - self, active_jobs: List[SubmittedJobInfo] - ) -> Generator[SubmittedJobInfo, None, None]: + async def check_active_jobs(self, active_jobs: List[SubmittedJobInfo]): """ Check the status of active jobs. """ @@ -420,6 +466,7 @@ async def check_active_jobs( except DeadlineExceeded: msg = f"Google Batch job '{j.external_jobid}' exceeded deadline. " self.report_job_error(j, msg=msg, aux_logs=aux_logs) + yield j self.logger.info(f"Job {jobid} has state {response.status.state.name}") for event in response.status.status_events: diff --git a/snakemake_executor_plugin_googlebatch/snippet.py b/snakemake_executor_plugin_googlebatch/snippet.py index 9803660..3966a92 100644 --- a/snakemake_executor_plugin_googlebatch/snippet.py +++ b/snakemake_executor_plugin_googlebatch/snippet.py @@ -37,7 +37,7 @@ def __init__(self, spec, settings, resources): self.load(spec) self.parse() - def render_setup(self, command, container): + def render_setup(self, command): """ Render snippets into a chunk for the setup. """ @@ -47,11 +47,10 @@ def render_setup(self, command, container): settings=self.settings, resources=self.resources, command=command, - container=container, ) return render - def render_run(self, command, container): + def render_run(self, command): """ Render snippets into a chunk for the run. """ @@ -61,7 +60,6 @@ def render_run(self, command, container): settings=self.settings, resources=self.resources, command=command, - container=container, ) return render