diff --git a/README.md b/README.md index 1b17eb5..42fc4a2 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,98 @@ Zoo runner using Argo Workflows ## Environment variables -- `STORAGE_CLASS`: standard -- `DEFAULT_VOLUME_SIZE` 12Gi -- `DEFAULT_MAX_CORES`: 4 -- `DEFAULT_MAX_RAM`: 4Gi -- `ARGO_WF_ENDPOINT`: "http://localhost:2746" -- `ARGO_WF_TOKEN`: `kubectl get -n ns1 secret argo.service-account-token -o=jsonpath='{.data.token}' | base64 --decode` -- `ARGO_WF_SYNCHRONIZATION_CM`: "semaphore-water-bodies" \ No newline at end of file +- `STORAGE_CLASS`: k8s cluster RWX storage class, defaults to `standard`. +- `DEFAULT_VOLUME_SIZE`: Calrissian default RWX volume size, defaults to `12Gi`. +- `DEFAULT_MAX_CORES`: Calrissian default max cores, defaults to `4`. +- `DEFAULT_MAX_RAM`: Calrissian default max RAM, defaults to `4Gi`. +- `ARGO_WF_ENDPOINT`: this is the Argo Workflows API endpoint, defaults to `"http://localhost:2746"`. +- `ARGO_WF_TOKEN`: this is the Argo Workflows API token that can be retrieved with: `kubectl get -n ns1 secret argo.service-account-token -o=jsonpath='{.data.token}' | base64 --decode` +- `ARGO_WF_SYNCHRONIZATION_CM`: this is the Argo Workflows synchronizaion configmap (with key "workflow"). For tests, we use "semaphore-argo-cwl-runner" +- `ARGO_CWL_RUNNER_TEMPLATE`: this is the Argo Workflows WorkflowTemplate that runs the CWL, defaults to: "argo-cwl-runner" +- `ARGO_CWL_RUNNER_ENTRYPOINT`: this is the Argo Workflows WorkflowTemplate entrypoint, defaults to: "calrissian-runner" + +## Requirements + +The Argo Workflows deployment has a Argo Workflows `WorkflowTemplate` or `ClusterWorkflowTemplate` impllementing the execution of a Calrissian Job and exposing the interface: + +**Input parameters:** + +```yaml + templates: + - name: calrissian-runner + inputs: + parameters: + - name: parameters + description: Parameters in JSON format + - name: cwl + description: CWL document in JSON format + - name: max_ram + default: 8G + description: Max RAM (e.g. 8G) + - name: max_cores + default: '4' + description: Max cores (e.g. 4) + - name: entry_point + description: CWL document entry_point +``` + +**Outputs:** + +```yaml + outputs: + parameters: + - name: results + valueFrom: + parameter: '{{steps.get-results.outputs.parameters.calrissian-output}}' + - name: log + valueFrom: + parameter: '{{steps.get-results.outputs.parameters.calrissian-stderr}}' + - name: usage-report + valueFrom: + parameter: '{{steps.get-results.outputs.parameters.calrissian-report}}' + - name: stac-catalog + valueFrom: + parameter: '{{steps.stage-out.outputs.parameters.stac-catalog}}' + - name: feature-collection + valueFrom: + parameter: >- + {{steps.feature-collection.outputs.parameters.feature-collection}} + artifacts: + - name: tool-logs + from: '{{steps.get-results.outputs.artifacts.tool-logs}}' + - name: calrissian-output + from: '{{steps.get-results.outputs.artifacts.calrissian-output}}' + - name: calrissian-stderr + from: '{{steps.get-results.outputs.artifacts.calrissian-stderr}}' + - name: calrissian-report + from: '{{steps.get-results.outputs.artifacts.calrissian-report}}' +``` + +Where: + +- `results` is the Calrissian job stdout +- `log` is the Calrissian job stderr +- `usage-report` is the Calrissian usage report +- `stac-catalog` is the s3 path to the published STAC Catalog +- `feature-collection` is the Feature Collection with the STAC Items produced + +And the artifacts: + +- `tool-logs` is the Calrissian CWL step logs defined as: + + +```yaml + artifacts: + - name: tool-logs + path: /calrissian/logs + s3: + key: '{{workflow.name}}-{{workflow.uid}}-artifacts/tool-logs' + archive: + none: {} +``` + +- `calrissian-output` is the Calrissian stdout +- `calrissian-stderr` is the Calrissian job stderr +- `calrissian-report` is the Calrissian usage report + +See the example provided in folder `example` \ No newline at end of file diff --git a/example/argo-cwl-runner.yaml b/example/argo-cwl-runner.yaml new file mode 100644 index 0000000..b77e6c9 --- /dev/null +++ b/example/argo-cwl-runner.yaml @@ -0,0 +1,459 @@ +apiVersion: argoproj.io/v1alpha1 +kind: WorkflowTemplate +metadata: + name: argo-cwl-runner + annotations: + workflows.argoproj.io/version: ">= v3.3.0" + workflows.argoproj.io/description: | + This workflow template is a CWL runner for Argo Workflows. +spec: + entrypoint: calrissian-runner + parameters: + - name: parameters + description: "Parameters in JSON format" + value: "" + - name: cwl + description: "CWL document in JSON format" + value: "" + - name: max_ram + description: "Max RAM" + value: "8G" + - name: max_cores + description: "Max cores" + value: "8" + # this is the Workflow throttling to avoid overloading the cluster + # the configMap contains the number of concurrent Workflows allowed + synchronization: + semaphore: + configMapKeyRef: + name: semaphore-argo-cwl-runner + key: workflow + + # volumes + volumes: + - name: usersettings-vol + secret: + secretName: user-settings + - name: calrissian-wdir + persistentVolumeClaim: + claimName: calrissian-wdir + + # Workflow templates are used to define the Workflow steps + templates: + - name: calrissian-runner + # this is the entry point of the Workflow + inputs: + parameters: + - name: parameters + description: "Parameters in JSON format" + - name: cwl + description: "CWL document in JSON format" + - name: max_ram + description: "Max RAM (e.g. 8G)" + default: "8G" + - name: max_cores + description: "Max cores (e.g. 4)" + default: "4" + - name: entry_point + description: "CWL document entry_point" + outputs: + parameters: + - name: results + valueFrom: + parameter: "{{steps.get-results.outputs.parameters.calrissian-output}}" + - name: log + valueFrom: + parameter: "{{steps.get-results.outputs.parameters.calrissian-stderr}}" + - name: usage-report + valueFrom: + parameter: "{{steps.get-results.outputs.parameters.calrissian-report}}" + - name: stac-catalog + valueFrom: + parameter: "{{steps.stage-out.outputs.parameters.stac-catalog}}" + - name: feature-collection + valueFrom: + parameter: "{{steps.feature-collection.outputs.parameters.feature-collection}}" + artifacts: + - name: tool-logs + from: "{{steps.get-results.outputs.artifacts.tool-logs}}" + - name: calrissian-output + from: "{{steps.get-results.outputs.artifacts.calrissian-output}}" + - name: calrissian-stderr + from: "{{steps.get-results.outputs.artifacts.calrissian-stderr}}" + - name: calrissian-report + from: "{{steps.get-results.outputs.artifacts.calrissian-report}}" + + steps: + # Workflow steps are defined here + - - name: cwl-prepare + template: cwl-prepare + arguments: + parameters: + - name: cwl + value: "{{inputs.parameters.cwl}}" + - name: parameters + value: "{{inputs.parameters.parameters}}" + + - - name: cwl-runner + template: calrissian-tmpl + arguments: + parameters: + - name: entry_point + value: "{{inputs.parameters.entry_point}}" + - name: max_ram + value: "{{inputs.parameters.max_ram}}" + - name: max_cores + value: "{{inputs.parameters.max_cores}}" + + - - name: get-results + # these can run in parallel + # the same template is used for all the steps + template: get-results + arguments: + parameters: + - name: calrissian-output + value: "/calrissian/output.json" + - name: calrissian-stderr + value: "/calrissian/stderr.log" + - name: calrissian-report + value: "/calrissian/report.json" + + - name: stage-out + template: stage-out + arguments: + parameters: + - name: file_path + value: "/calrissian/output.json" + - name: bucket + value: "results" + - name: folder + value: "{{workflow.name}}-{{workflow.uid}}" + + - - name: feature-collection + template: feature-collection + arguments: + parameters: + - name: stac-catalog + value: "{{steps.stage-out.outputs.parameters.stac-catalog}}" + + + - name: cwl-prepare + # this steps prepares the CWL inputs + # needed by Calrissian + inputs: + parameters: + - name: cwl + - name: parameters + + script: + image: busybox:1.35.0 + resources: + requests: + memory: 1Gi + cpu: 1 + volumeMounts: + - name: calrissian-wdir + mountPath: /calrissian + env: [] + command: [ash] + source: | + #!/bin/ash + + echo '{{inputs.parameters.cwl}}' >> /calrissian/cwl.json + echo '{{inputs.parameters.parameters}}' >> /calrissian/input.json + echo "CWL and input files created" + cat /calrissian/cwl.json + echo "CWL parameters" + cat /calrissian/input.json + + sleep 1 + + - name: calrissian-tmpl + # this step creates the Calrissian Job, Argo creates it as a Kubernetes Job + metrics: + prometheus: + - name: duration_gauge_calrissian + labels: + - key: name + value: steps + help: "Duration gauge by name" + gauge: + realtime: true + value: "{{duration}}" + resource: + action: create + setOwnerReference: true + successCondition: status.succeeded > 0 + failureCondition: status.failed > 3 + manifest: | + apiVersion: batch/v1 + kind: Job + metadata: + generateName: calrissian-water-bodies-detection- + spec: + backoffLimit: 1 + activeDeadlineSeconds: 86400 + ttlSecondsAfterFinished: 120 + template: + metadata: + name: calrissian_pod + spec: + serviceAccountName: argo + containers: + - name: calrissian + image: ghcr.io/duke-gcb/calrissian/calrissian:0.16.0 + imagePullPolicy: IfNotPresent + command: + - calrissian + args: + - --debug + - --pod-serviceaccount + - argo + - --stdout + - /calrissian/output.json + - --stderr + - /calrissian/stderr.log + - --usage-report + - /calrissian/report.json + - --max-ram + - '{{inputs.parameters.max_ram}}' + - --max-cores + - '{{inputs.parameters.max_cores}}' + - --tmp-outdir-prefix + - /calrissian/tmp/ + - --outdir + - /calrissian/results/ + - --tool-logs-basepath + - /calrissian/logs + - "/calrissian/cwl.json#{{inputs.parameters.entry_point}}" + - "/calrissian/input.json" + env: + - name: CALRISSIAN_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: CALRISSIAN_DELETE_PODS + value: "true" + resources: + limits: + cpu: 2000m + memory: 2G + requests: + cpu: 1000m + memory: 1G + volumeMounts: + - mountPath: /calrissian + name: calrissian-wdir + readOnly: false + restartPolicy: Never + securityContext: + fsGroup: 0 + runAsGroup: 0 + runAsUser: 0 + terminationGracePeriodSeconds: 120 + volumes: + - name: calrissian-wdir + persistentVolumeClaim: + claimName: {{workflow.name}}-calrissian-wdir + readOnly: false + + inputs: + parameters: + - name: max_ram + - name: max_cores + - name: entry_point + outputs: + parameters: [] + artifacts: [] + + - name: get-results + # reads the files generated by Calrissian + inputs: + parameters: + - name: calrissian-output + - name: calrissian-stderr + - name: calrissian-report + outputs: + parameters: + - name: calrissian-output + valueFrom: + path: /tmp/calrissian-output.json + - name: calrissian-stderr + valueFrom: + path: /tmp/calrissian-stderr.txt + - name: calrissian-report + valueFrom: + path: /tmp/calrissian-report.json + artifacts: + - name: tool-logs + path: /calrissian/logs + archive: + none: {} + s3: + key: "{{workflow.name}}-{{workflow.uid}}-artifacts/tool-logs" + - name: calrissian-output + path: /tmp/calrissian-output.json + s3: + key: "{{workflow.name}}-{{workflow.uid}}-artifacts/calrissian-output.tgz" + - name: calrissian-stderr + path: /tmp/calrissian-stderr.txt + s3: + key: "{{workflow.name}}-{{workflow.uid}}-artifacts/calrissian-stderr.tgz" + - name: calrissian-report + path: /tmp/calrissian-report.json + s3: + key: "{{workflow.name}}-{{workflow.uid}}-artifacts/calrissian-report.tgz" + script: + image: busybox:1.35.0 + resources: + requests: + memory: 1Gi + cpu: 1 + volumeMounts: + - name: calrissian-wdir + mountPath: /calrissian + command: [ash] + source: | + #!/bin/ash + cat "{{inputs.parameters.calrissian-output}}" > /tmp/calrissian-output.json + cat "{{inputs.parameters.calrissian-stderr}}" > /tmp/calrissian-stderr.txt + cat "{{inputs.parameters.calrissian-report}}" > /tmp/calrissian-report.json + + - name: stage-out + inputs: + parameters: + - name: file_path + - name: bucket + - name: folder + outputs: + parameters: + - name: stac-catalog + valueFrom: + path: /tmp/output + script: + image: stageout + resources: + requests: + memory: 1Gi + cpu: 1 + volumeMounts: + - name: calrissian-wdir + mountPath: /calrissian + - name: usersettings-vol + readOnly: true + mountPath: "/etc/secret" + command: [bash] + source: + #!/bin/bash + set -x + + cat /etc/secret/usersettings.json + + stage-out --stac-catalog $( cat {{inputs.parameters.file_path}} | jq -r .stac_catalog.path - ) --user-settings /etc/secret/usersettings.json --bucket "{{inputs.parameters.bucket}}" --subfolder "{{inputs.parameters.folder}}" + + res=$? + + echo "s3://{{inputs.parameters.bucket}}/{{inputs.parameters.folder}}/catalog.json" > /tmp/output + + exit $res + + - name: feature-collection + inputs: + parameters: + - name: stac-catalog + outputs: + parameters: + - name: feature-collection + valueFrom: + path: /tmp/output + script: + image: stageout + resources: + requests: + memory: 1Gi + cpu: 1 + volumeMounts: [] + command: [python] + source: | + + import os + import sys + import traceback + import yaml + import json + import boto3 # noqa: F401 + import botocore + from loguru import logger + from urllib.parse import urlparse + from botocore.exceptions import ClientError + from botocore.client import Config + from pystac import read_file + from pystac.stac_io import DefaultStacIO, StacIO + from pystac.item_collection import ItemCollection + + logger.remove() + logger.add(sys.stderr, level="INFO") + + + class CustomStacIO(DefaultStacIO): + """Custom STAC IO class that uses boto3 to read from S3.""" + + def __init__(self): + self.session = botocore.session.Session() + self.s3_client = self.session.create_client( + service_name="s3", + region_name="it-rom", + endpoint_url="http://minio.ns1.svc.cluster.local:9000", + aws_access_key_id="minio-admin", + aws_secret_access_key="minio-admin", + ) + + def read_text(self, source, *args, **kwargs): + parsed = urlparse(source) + if parsed.scheme == "s3": + return ( + self.s3_client.get_object(Bucket=parsed.netloc, Key=parsed.path[1:])[ + "Body" + ] + .read() + .decode("utf-8") + ) + else: + return super().read_text(source, *args, **kwargs) + + def write_text(self, dest, txt, *args, **kwargs): + parsed = urlparse(dest) + if parsed.scheme == "s3": + self.s3_client.put_object( + Body=txt.encode("UTF-8"), + Bucket=parsed.netloc, + Key=parsed.path[1:], + ContentType="application/geo+json", + ) + else: + super().write_text(dest, txt, *args, **kwargs) + + + StacIO.set_default(CustomStacIO) + + s3_catalog_output = "{{inputs.parameters.stac-catalog}}" + logger.info("Post execution hook") + + StacIO.set_default(CustomStacIO) + + logger.info(f"Read catalog from STAC Catalog URI: {s3_catalog_output}") + + cat = read_file(s3_catalog_output) + + collection = next(cat.get_all_collections()) + + logger.info("Got collection {collection.id} from processing outputs") + + item_collection = ItemCollection(items=collection.get_all_items()) + + logger.info("Created feature collection from items") + + # save the feature collection to a file /tmp/output + with open("/tmp/output", "w") as f: + f.write(json.dumps(item_collection.to_dict(), indent=2)) + logger.info("Saved feature collection to /tmp/output") \ No newline at end of file diff --git a/tests/tests_water_bodies.py b/tests/tests_water_bodies.py index 3993bc5..770c3c2 100644 --- a/tests/tests_water_bodies.py +++ b/tests/tests_water_bodies.py @@ -29,7 +29,7 @@ def _(self, message): os.environ["ARGO_WF_TOKEN"] = ( "eyJhbGciOiJSUzI1NiIsImtpZCI6ImZSUUYycDZNbnI4MTZLeXVFUnpyZk9FcUJGTHdQQzI4SURGdHhQc0pzRXMifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJuczEiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoiYXJnby5zZXJ2aWNlLWFjY291bnQtdG9rZW4iLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoiYXJnbyIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjExZjI1NDUxLWE4OGEtNDFkZC1hNGIxLTJlNTM3ZGUyOGU3NiIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpuczE6YXJnbyJ9.dUVo3WCJgoVN5NVh3VVeKw7uF6hE6moCj8Mu5W7ioJ4M_lG_bJ4BZ7XRaJVrQMF--vMQAiNDr-_GSC0R7ItKmRqhafkONy61MeEDz_6u-j0ay4fj8qofgBatnF7lWcVVvNklVZxZ4IGb62SJepAvzerxsz5HhSG8icn9U0cUsyg4Vw_wsHntic-yFY4Eyp6kYSEXO5G2v_0KXLKcpLB41YLfMQaE8cu0ghmE5mFmKkKOf0oDHhLlch-j_K0IpGvXKcjZGlwjiukD1dCL3E6BSHYMutuz4n27Bg62nf3rZBErUkJB2GRjN-tKIoYMrEuwyEbv7TSuCeqHGyfWn3Tu0A" ) - os.environ["ARGO_WF_SYNCHRONIZATION_CM"] = "semaphore-water-bodies" + os.environ["ARGO_WF_SYNCHRONIZATION_CM"] = "semaphore-argo-cwl-runner" os.environ["DEFAULT_VOLUME_SIZE"] = "12Gi" cls.zoo = zoo diff --git a/zoo_argowf_runner/cwl2argo.py b/zoo_argowf_runner/cwl2argo.py index 61702e1..e5245c6 100644 --- a/zoo_argowf_runner/cwl2argo.py +++ b/zoo_argowf_runner/cwl2argo.py @@ -87,7 +87,10 @@ def cwl_to_argo( workflow_step( name="argo-cwl", template_ref=TemplateRef( - name="argo-cwl-runner", template="calrissian-runner" + name=os.environ.get("ARGO_CWL_RUNNER_TEMPLATE", "argo-cwl-runner"), + template=os.environ.get( + "ARGO_CWL_RUNNER_ENTRYPOINT", "calrissian-runner" + ), ), parameters=[ Parameter(name="entry_point", value=entrypoint),