diff --git a/charts/quantum-serverless/charts/gateway/templates/deployment.yaml b/charts/quantum-serverless/charts/gateway/templates/deployment.yaml index af6c248cf..c5d315372 100644 --- a/charts/quantum-serverless/charts/gateway/templates/deployment.yaml +++ b/charts/quantum-serverless/charts/gateway/templates/deployment.yaml @@ -59,6 +59,9 @@ spec: - name: gateway-pv-storage persistentVolumeClaim: claimName: {{ .Values.cos.claimName }} + - name: ray-cluster-template + configMap: + name: rayclustertemplate containers: - name: {{ .Chart.Name }} securityContext: @@ -81,6 +84,8 @@ spec: volumeMounts: - mountPath: "/usr/src/app/media/" name: gateway-pv-storage + - mountPath: "/tmp/templates/" + name: ray-cluster-template resources: {{- toYaml .Values.resources | nindent 12 }} env: @@ -93,8 +98,6 @@ spec: key: {{ .Values.secrets.secretKey.key }} - name: SITE_HOST value: {{ .Values.application.siteHost | quote }} - - name: RAY_HOST - value: {{ .Values.application.rayHost | quote }} - name: SETTINGS_AUTH_MECHANISM value: {{ .Values.application.auth.mechanism | quote }} - name: SETTINGS_TOKEN_AUTH_URL @@ -175,119 +178,6 @@ spec: secretKeyRef: name: {{ .Values.secrets.servicePsql.name }} key: {{ .Values.secrets.servicePsql.key.databasePassword }} - {{- with .Values.nodeSelector }} - nodeSelector: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.affinity }} - affinity: - {{- toYaml . | nindent 8 }} - {{- end }} - {{- with .Values.tolerations }} - tolerations: - {{- toYaml . | nindent 8 }} - {{- end }} ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: scheduler - labels: - {{- include "scheduler.labels" . | nindent 4 }} -spec: - {{- if not .Values.autoscaling.enabled }} - replicas: 1 - {{- end }} - selector: - matchLabels: - {{- include "scheduler.selectorLabels" . | nindent 6 }} - template: - metadata: - {{- with .Values.podAnnotations }} - annotations: - {{- toYaml . | nindent 8 }} - {{- end }} - labels: - {{- include "scheduler.selectorLabels" . | nindent 8 }} - spec: - volumes: - - name: gateway-pv-storage - persistentVolumeClaim: - claimName: {{ .Values.cos.claimName }} - - name: ray-cluster-template - configMap: - name: rayclustertemplate - serviceAccountName: {{ include "gateway.serviceAccountName" . }} - securityContext: - {{- toYaml .Values.podSecurityContext | nindent 8 }} - initContainers: - - name: waitpostresql - image: actions/pg_isready - command: ['sh', '-c', 'until pg_isready -U ${DATABASE_USER} -d "dbname=${DATABASE_NAME}" -h ${DATABASE_HOST} -p ${DATABASE_PORT}; do echo waiting for myservice; sleep 2; done'] - env: - - name: DATABASE_HOST - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.host }} - - name: DATABASE_PORT - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.port }} - - name: DATABASE_NAME - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.databaseName }} - - name: DATABASE_USER - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.userName }} - containers: - - name: gateway-scheduler - image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" - imagePullPolicy: {{ .Values.image.pullPolicy }} - command: ["./scripts/scheduler.sh"] - volumeMounts: - - mountPath: "/usr/src/app/media/" - name: gateway-pv-storage - - mountPath: "/tmp/templates/" - name: ray-cluster-template - env: - - name: DEBUG - value: {{ .Values.application.debug | quote }} - - name: DJANGO_SECRET_KEY - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.secretKey.name }} - key: {{ .Values.secrets.secretKey.key }} - - name: DATABASE_HOST - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.host }} - - name: DATABASE_PORT - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.port }} - - name: DATABASE_NAME - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.databaseName }} - - name: DATABASE_USER - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.userName }} - - name: DATABASE_PASSWORD - valueFrom: - secretKeyRef: - name: {{ .Values.secrets.servicePsql.name }} - key: {{ .Values.secrets.servicePsql.key.databasePassword }} - name: RAY_KUBERAY_NAMESPACE value: {{ .Release.Namespace }} - name: RAY_NODE_IMAGE @@ -306,18 +196,6 @@ spec: - name: RAY_CLUSTER_NO_DELETE_ON_COMPLETE value: "True" {{- end }} - - name: OTEL_ENABLED - value: {{ .Values.application.ray.openTelemetryCollector.enabled | quote }} - - name: OTEL_SERVICE_NAME - value: "Gateway" - - name: OTEL_TRACES_EXPORTER - value: console,otlp - - name: OTEL_METRICS_EXPORTER - value: console - - name: OTEL_EXPORTER_OTLP_TRACES_ENDPOINT - value: {{ .Values.application.ray.openTelemetryCollector.host }}:{{ .Values.application.ray.openTelemetryCollector.port }} - - name: OTEL_EXPORTER_OTLP_TRACES_INSECURE - value: {{ .Values.application.ray.openTelemetryCollector.insecure | quote }} - name: PROGRAM_TIMEOUT value: {{ .Values.application.limits.programTimeoutDays | quote }} {{- with .Values.nodeSelector }} diff --git a/client/quantum_serverless/__init__.py b/client/quantum_serverless/__init__.py index 36a491c14..794560d34 100644 --- a/client/quantum_serverless/__init__.py +++ b/client/quantum_serverless/__init__.py @@ -36,6 +36,7 @@ RayProvider, LocalProvider, save_result, + set_status, Configuration, ) from .quantum_serverless import ( diff --git a/client/quantum_serverless/core/__init__.py b/client/quantum_serverless/core/__init__.py index 2f0511922..8ef1e7458 100644 --- a/client/quantum_serverless/core/__init__.py +++ b/client/quantum_serverless/core/__init__.py @@ -71,6 +71,7 @@ LocalJobClient, Job, save_result, + set_status, Configuration, ) from .pattern import ( diff --git a/client/quantum_serverless/core/job.py b/client/quantum_serverless/core/job.py index 31fa52870..78ab7c01f 100644 --- a/client/quantum_serverless/core/job.py +++ b/client/quantum_serverless/core/job.py @@ -777,3 +777,35 @@ def save_result(result: Dict[str, Any]): logging.warning("Something went wrong: %s", response.text) return response.ok + + +def set_status(status: str): + """set job status.""" + + version = os.environ.get(ENV_GATEWAY_PROVIDER_VERSION) + if version is None: + version = GATEWAY_PROVIDER_VERSION_DEFAULT + + token = os.environ.get(ENV_JOB_GATEWAY_TOKEN) + if token is None: + logging.warning( + "Results will be saved as logs since" + "there is no information about the" + "authorization token in the environment." + ) + return False + + url = ( + f"{os.environ.get(ENV_JOB_GATEWAY_HOST)}/" + f"api/{version}/jobs/{os.environ.get(ENV_JOB_ID_GATEWAY)}/status/" + ) + response = requests.post( + url, + data={"status": status}, + headers={"Authorization": f"Bearer {token}"}, + timeout=REQUESTS_TIMEOUT, + ) + if not response.ok: + logging.warning("Something went wrong: %s", response.text) + + return response.ok diff --git a/gateway/api/ray.py b/gateway/api/ray.py index f60631fc6..d37467b7c 100644 --- a/gateway/api/ray.py +++ b/gateway/api/ray.py @@ -78,7 +78,8 @@ def submit(self, job: Job) -> Optional[str]: ) file.extractall(extract_folder) - entrypoint = f"python {program.entrypoint}" + #entrypoint = f"python {program.entrypoint}" + entrypoint = f"python launcher.py {program.entrypoint}" carrier = {} TraceContextTextMapPropagator().inject(carrier) env_w_span = json.loads(job.env_vars) @@ -87,6 +88,31 @@ def submit(self, job: Job) -> Optional[str]: except KeyError: pass + f = open(extract_folder + "/launcher.py", "w") + f.write( + ''' +import subprocess +from subprocess import Popen +import sys +from quantum_serverless import set_status + +with Popen( + ["python", sys.argv[1]], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, +) as pipe: + status = "SUCCEEDED" + if pipe.wait(): + status = "FAILED" + output, _ = pipe.communicate() + print(output) + set_status(status) + ''' + ) + f.close() + ff = open(extract_folder + "/launcher.py" , "r") + ray_job_id = retry_function( callback=lambda: self.client.submit_job( entrypoint=entrypoint, @@ -99,7 +125,6 @@ def submit(self, job: Job) -> Optional[str]: num_retries=settings.RAY_SETUP_MAX_RETRIES, error_message=f"Ray job [{job.id}] submission failed.", ) - if os.path.exists(extract_folder): shutil.rmtree(extract_folder) span.set_attribute("job.rayjobid", job.ray_job_id) diff --git a/gateway/api/views.py b/gateway/api/views.py index 6fc312769..b5ac3fc92 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -133,7 +133,7 @@ def run_existing(self, request): status=Job.QUEUED, config=jobconfig, ) - job.save() + #job.save() !!!!!!!!!!!! carrier = {} TraceContextTextMapPropagator().inject(carrier) @@ -254,6 +254,48 @@ def result(self, request, pk=None): # pylint: disable=invalid-name,unused-argum serializer = self.get_serializer(job) return Response(serializer.data) + @action(methods=["POST"], detail=True) + def status(self, request, pk=None): # pylint: disable=invalid-name,unused-argument + """set status of a job.""" + tracer = trace.get_tracer("gateway.tracer") + ctx = TraceContextTextMapPropagator().extract(carrier=request.headers) + with tracer.start_as_current_span("gateway.job.status", context=ctx): + org = self.get_object() + job_handler = get_job_handler(org.compute_resource.host) + if job_handler: + logs = job_handler.logs(org.ray_job_id) + + saved = False + attempts_left = 10 + while not saved: + if attempts_left <= 0: + return Response( + {"error": "All attempts to save results failed."}, status=500 + ) + + attempts_left -= 1 + + try: + job = self.get_object() + job.status = request.data.get("status") + job.logs = logs + job.save() + saved = True + print("Setting status") + print(request.data.get("status")) + except RecordModifiedError: + logger.warning( + "Job[%s] record has not been updated due to lock. " + "Retrying. Attempts left %s", + job.id, + attempts_left, + ) + continue + time.sleep(1) + + serializer = self.get_serializer(job) + return Response(serializer.data) + @action(methods=["GET"], detail=True) def logs(self, request, pk=None): # pylint: disable=invalid-name,unused-argument """Returns logs from job.""" diff --git a/gateway/main/signals.py b/gateway/main/signals.py new file mode 100644 index 000000000..47a11e8af --- /dev/null +++ b/gateway/main/signals.py @@ -0,0 +1,17 @@ +from django.db.models.signals import post_save +from django.dispatch import receiver +from api.models import Job +from api.management.commands.schedule_queued_jobs import Command as Schedule +from api.management.commands.free_resources import Command as Free + + +@receiver(post_save, sender=Job) +def save_job(sender, instance, created, **kwargs): + print("save_job") + print(instance) + print("Free") + free = Free() + free.handle() + print("Schedule") + schedule = Schedule() + schedule.handle() diff --git a/gateway/main/urls.py b/gateway/main/urls.py index e4b0c6a46..ad358e29e 100644 --- a/gateway/main/urls.py +++ b/gateway/main/urls.py @@ -19,6 +19,7 @@ from django.urls import path, include, re_path from django.views.generic import TemplateView from rest_framework import routers +from .signals import save_job from api.views import KeycloakLogin, KeycloakUsersView import probes.views diff --git a/gateway/scripts/scheduler.sh b/gateway/scripts/scheduler.sh deleted file mode 100755 index 16c99f222..000000000 --- a/gateway/scripts/scheduler.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh - -while : -do - python manage.py update_jobs_statuses - python manage.py free_resources - python manage.py schedule_queued_jobs - sleep 1 -done