From 5914f33c24907836f18aa44441425cf4f42db7b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Fri, 27 Oct 2023 11:14:31 +0200 Subject: [PATCH] fix: update dependencies (#5) --- .github/workflows/ci.yml | 2 +- pyproject.toml | 6 +- .../__init__.py | 111 ++---------------- tests/tests.py | 2 +- 4 files changed, 16 insertions(+), 105 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 568479e..d24ff72 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,7 +90,7 @@ jobs: uses: medyagh/setup-minikube@v0.0.14 - name: Run pytest - run: poetry run coverage run -m pytest tests/tests.py + run: poetry run coverage run -m pytest tests/tests.py -sv - name: Run Coverage run: poetry run coverage report -m diff --git a/pyproject.toml b/pyproject.toml index 9c4bc0b..5b20012 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,8 +7,8 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.11" -snakemake-interface-common = "^1.13.0" -snakemake-interface-executor-plugins = "^7.0.0" +snakemake-interface-common = "^1.14.1" +snakemake-interface-executor-plugins = "^7.0.3" kubernetes = "^27.2.0" [tool.poetry.group.dev.dependencies] @@ -17,7 +17,7 @@ flake8 = "^6.1.0" coverage = "^7.3.1" pytest = "^7.4.1" snakemake = {git = "https://github.com/snakemake/snakemake.git"} -snakemake-storage-plugin-s3 = "^0.2.0" +snakemake-storage-plugin-s3 = "^0.2.4" [tool.coverage.run] omit = [".*", "*/site-packages/*", "Snakefile"] diff --git a/snakemake_executor_plugin_kubernetes/__init__.py b/snakemake_executor_plugin_kubernetes/__init__.py index 8c93e0d..9d7b80e 100644 --- a/snakemake_executor_plugin_kubernetes/__init__.py +++ b/snakemake_executor_plugin_kubernetes/__init__.py @@ -1,6 +1,5 @@ import base64 from dataclasses import dataclass, field -import os import shlex import subprocess import time @@ -93,7 +92,7 @@ def __post_init__(self): self.kubeapi = kubernetes.client.CoreV1Api() self.batchapi = kubernetes.client.BatchV1Api() self.namespace = self.workflow.executor_settings.namespace - self.envvars = self.workflow.envvars + self.envvars = self.workflow.spawned_job_args_factory.envvars() self.secret_files = {} self.run_namespace = str(uuid.uuid4()) self.secret_envvars = {} @@ -113,6 +112,8 @@ def run_job(self, job: JobExecutorInterface): # snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo. exec_job = self.format_job_exec(job) + exec_job = "echo $SNAKEMAKE_STORAGE_S3_SECRET_KEY && " + exec_job + self.logger.debug(f"Executing job: {exec_job}") # Kubernetes silently does not submit a job if the name is too long # therefore, we ensure that it is not longer than snakejob+uuid. @@ -133,7 +134,6 @@ def run_job(self, job: JobExecutorInterface): container.working_dir = "/workdir" container.volume_mounts = [ kubernetes.client.V1VolumeMount(name="workdir", mount_path="/workdir"), - kubernetes.client.V1VolumeMount(name="source", mount_path="/source"), ] node_selector = {} @@ -153,33 +153,9 @@ def run_job(self, job: JobExecutorInterface): # fail on first error body.spec.restart_policy = "Never" - # source files as a secret volume - # we copy these files to the workdir before executing Snakemake - too_large = [ - path - for path in self.secret_files.values() - if os.path.getsize(path) > 1000000 - ] - if too_large: - raise WorkflowError( - "The following source files exceed the maximum " - "file size (1MB) that can be passed from host to " - "kubernetes. These are likely not source code " - "files. Consider adding them to your " - "remote storage instead or (if software) use " - "Conda packages or container images:\n{}".format("\n".join(too_large)) - ) - secret_volume = kubernetes.client.V1Volume(name="source") - secret_volume.secret = kubernetes.client.V1SecretVolumeSource() - secret_volume.secret.secret_name = self.run_namespace - secret_volume.secret.items = [ - kubernetes.client.V1KeyToPath(key=key, path=path) - for key, path in self.secret_files.items() - ] - # workdir as an emptyDir volume of undefined size workdir_volume = kubernetes.client.V1Volume(name="workdir") workdir_volume.empty_dir = kubernetes.client.V1EmptyDirVolumeSource() - body.spec.volumes = [secret_volume, workdir_volume] + body.spec.volumes = [workdir_volume] # env vars container.env = [] @@ -262,8 +238,8 @@ async def check_active_jobs( # # async with self.status_rate_limiter: # # query remote middleware here - import kubernetes + self.logger.debug(f"Checking status of {len(active_jobs)} jobs") for j in active_jobs: async with self.status_rate_limiter: try: @@ -328,9 +304,6 @@ def shutdown(self): self.unregister_secret() super().shutdown() - def get_job_exec_prefix(self, job: JobExecutorInterface): - return "cp -rf /source/. ." - def register_secret(self): import kubernetes.client @@ -340,84 +313,22 @@ def register_secret(self): secret.metadata.name = self.run_namespace secret.type = "Opaque" secret.data = {} - for i, f in enumerate(self.dag.get_sources()): - if f.startswith(".."): - self.logger.warning( - "Ignoring source file {}. Only files relative " - "to the working directory are allowed.".format(f) - ) - continue - # The kubernetes API can't create secret files larger than 1MB. - source_file_size = os.path.getsize(f) - max_file_size = 1048576 - if source_file_size > max_file_size: - self.logger.warning( - "Skipping the source file {f}. Its size {source_file_size} exceeds " - "the maximum file size (1MB) that can be passed " - "from host to kubernetes.".format( - f=f, source_file_size=source_file_size - ) - ) - continue - - with open(f, "br") as content: - key = f"f{i}" - - # Some files are smaller than 1MB, but grows larger after being - # base64 encoded. - # We should exclude them as well, otherwise Kubernetes APIs will - # complain. - encoded_contents = base64.b64encode(content.read()).decode() - encoded_size = len(encoded_contents) - if encoded_size > 1048576: - self.logger.warning( - "Skipping the source file {f} for secret key {key}. " - "Its base64 encoded size {encoded_size} exceeds " - "the maximum file size (1MB) that can be passed " - "from host to kubernetes.".format( - f=f, - key=key, - encoded_size=encoded_size, - ) - ) - continue - - self.secret_files[key] = f - secret.data[key] = encoded_contents - - for e in self.envvars: - try: - key = e.lower() - secret.data[key] = base64.b64encode(os.environ[e].encode()).decode() - self.secret_envvars[key] = e - except KeyError: - continue + for name, value in self.envvars.items(): + key = name.lower() + secret.data[key] = base64.b64encode(value.encode()).decode() + self.secret_envvars[key] = name # Test if the total size of the configMap exceeds 1MB config_map_size = sum( [len(base64.b64decode(v)) for k, v in secret.data.items()] ) if config_map_size > 1048576: - self.logger.warning( + raise WorkflowError( "The total size of the included files and other Kubernetes secrets " - "is {}, exceeding the 1MB limit.\n".format(config_map_size) - ) - self.logger.warning( - "The following are the largest files. Consider removing some of them " - "(you need remove at least {} bytes):".format(config_map_size - 1048576) + f"is {config_map_size}, exceeding the 1MB limit.\n" ) - entry_sizes = { - self.secret_files[k]: len(base64.b64decode(v)) - for k, v in secret.data.items() - if k in self.secret_files - } - for k, v in sorted(entry_sizes.items(), key=lambda item: item[1])[:-6:-1]: - self.logger.warning(f" * File: {k}, original size: {v}") - - raise WorkflowError("ConfigMap too large") - self.kubeapi.create_namespaced_secret(self.namespace, secret) def unregister_secret(self): diff --git a/tests/tests.py b/tests/tests.py index 3909823..3b9438b 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -28,7 +28,7 @@ def get_remote_execution_settings( self, ) -> snakemake.settings.RemoteExecutionSettings: return snakemake.settings.RemoteExecutionSettings( - seconds_between_status_checks=1, + seconds_between_status_checks=10, envvars=self.get_envvars(), # TODO remove once we have switched to stable snakemake for dev container_image="snakemake/snakemake:latest",