Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.8 #1086

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ build-notebook:

build-ray-node:
docker build -t $(rayNodeImageName):$(version) --build-arg TARGETARCH=$(arch) -f Dockerfile-ray-node .
docker build -t $(rayNodeImageName):$(version)-py310 --build-arg TARGETARCH=$(arch) --build-arg IMAGE_PY_VERSION=py310 -f Dockerfile-ray-node .
docker build -t $(rayNodeImageName):$(version)-py39 --build-arg TARGETARCH=$(arch) --build-arg IMAGE_PY_VERSION=py39 -f Dockerfile-ray-node .
docker build -t $(rayNodeImageName):$(version)-py38 --build-arg TARGETARCH=$(arch) --build-arg IMAGE_PY_VERSION=py38 -f Dockerfile-ray-node .

build-gateway:
docker build -t $(gatewayImageName):$(version) -f ./gateway/Dockerfile .
Expand Down
12 changes: 12 additions & 0 deletions charts/quantum-serverless/charts/gateway/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ spec:
value: {{ .Values.application.auth.keycloak.realm | quote }}
- name: SETTINGS_KEYCLOAK_CLIENT_SECRET
value: {{ .Values.application.auth.keycloak.clientSecret | quote }}
- name: RAY_CLUSTER_WORKER_REPLICAS
value: {{ .Values.application.ray.replicas | quote }}
- name: RAY_CLUSTER_WORKER_MIN_REPLICAS
value: {{ .Values.application.ray.minReplicas | quote }}
- name: RAY_CLUSTER_WORKER_MAX_REPLICAS
value: {{ .Values.application.ray.maxReplicas | quote }}
{{- if .Values.application.superuser.enable }}
- name: DJANGO_SUPERUSER_USERNAME
valueFrom:
Expand Down Expand Up @@ -286,6 +292,12 @@ spec:
value: {{ .Release.Namespace }}
- name: RAY_NODE_IMAGE
value: {{ .Values.application.ray.nodeImage | quote }}
- name: RAY_NODE_IMAGE_PY38
value: {{ .Values.application.ray.nodeImage_py38 | quote }}
- name: RAY_NODE_IMAGE_PY39
value: {{ .Values.application.ray.nodeImage_py39 | quote }}
- name: RAY_NODE_IMAGE_PY310
value: {{ .Values.application.ray.nodeImage_py310 | quote }}
- name: LIMITS_JOBS_PER_USER
value: {{ .Values.application.limits.maxJobsPerUser | quote }}
- name: LIMITS_MAX_CLUSTERS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ data:
{{- if .Values.application.ray.scrapeWithPrometheus }}
headServiceAnnotations:
prometheus.io/scrape: "true"
{{- end }}
{{- end }}
enableInTreeAutoscaling: {{`{{ auto_scaling }}`}}
headGroupSpec:
rayStartParams:
dashboard-host: 0.0.0.0
Expand Down Expand Up @@ -68,7 +69,7 @@ data:
{{- end }}
affinity:
containers:
- image: {{ .Values.application.ray.nodeImage | quote }}
- image: {{`{{ node_image }}`| quote }}
imagePullPolicy: IfNotPresent
name: ray-head
ports:
Expand Down Expand Up @@ -193,11 +194,11 @@ data:
claimName: {{ .Values.cos.claimName }}
workerGroupSpecs:
- groupName: g
maxReplicas: {{ .Values.application.ray.maxReplicas }}
minReplicas: {{ .Values.application.ray.minReplicas }}
maxReplicas: {{`{{ max_workers }}`}}
minReplicas: {{`{{ min_workers }}`}}
rayStartParams:
block: 'true'
replicas: {{ .Values.application.ray.replicas }}
replicas: {{`{{ workers }}`}}
template:
{{- if .Values.application.ray.scrapeWithPrometheus }}
metadata:
Expand Down Expand Up @@ -299,7 +300,7 @@ data:
- name: OT_ENABLED
value: {{ .Values.application.ray.openTelemetryCollector.enabled | quote }}
{{- end }}
image: {{ .Values.application.ray.nodeImage | quote}}
image: {{`{{ node_image }}`| quote}}
imagePullPolicy: IfNotPresent
name: ray-worker
resources:
Expand Down
3 changes: 3 additions & 0 deletions charts/quantum-serverless/charts/gateway/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ application:
enable: true
ray:
nodeImage: "icr.io/quantum-public/quantum-serverless-ray-node:0.7.1-py39"
nodeImage_py38: "icr.io/quantum-public/quantum-serverless-ray-node:0.7.1-py38"
nodeImage_py39: "icr.io/quantum-public/quantum-serverless-ray-node:0.7.1-py39"
nodeImage_py310: "icr.io/quantum-public/quantum-serverless-ray-node:0.7.1-py310"
cpu: 2
memory: 2
replicas: 1
Expand Down
3 changes: 2 additions & 1 deletion client/quantum_serverless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
QuantumServerlessException
get_auto_discovered_provider
"""

# pylint: disable=W0404
from importlib_metadata import version as metadata_version, PackageNotFoundError

from .core import (
Expand All @@ -36,6 +36,7 @@
RayProvider,
LocalProvider,
save_result,
Configuration,
)
from .quantum_serverless import (
QuantumServerless,
Expand Down
2 changes: 1 addition & 1 deletion client/quantum_serverless/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@
LocalJobClient,
Job,
save_result,
Configuration,
)
from .pattern import (
QiskitPattern,
Program,
ProgramStorage,
ProgramRepository,
download_and_unpack_artifact,
Expand Down
132 changes: 111 additions & 21 deletions client/quantum_serverless/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@
RuntimeEnv
Job
"""
# pylint: disable=duplicate-code
import json
import logging
import os
import re
import tarfile
import time
import sys
from pathlib import Path
from typing import Dict, Any, Optional, List, Union
from uuid import uuid4
from dataclasses import asdict, dataclass

import subprocess
from subprocess import Popen
import re

import ray.runtime_env
import requests
Expand All @@ -58,6 +60,7 @@
MAX_ARTIFACT_FILE_SIZE_MB,
ENV_JOB_ARGUMENTS,
)

from quantum_serverless.core.pattern import QiskitPattern
from quantum_serverless.exception import QuantumServerlessException
from quantum_serverless.serializers.program_serializers import (
Expand All @@ -69,11 +72,36 @@
RuntimeEnv = ray.runtime_env.RuntimeEnv


@dataclass
class Configuration: # pylint: disable=too-many-instance-attributes
"""Program Configuration.

Args:
workers: number of worker pod when auto scaling is NOT enabled
auto_scaling: set True to enable auto scating of the workers
min_workers: minimum number of workers when auto scaling is enabled
max_workers: maxmum number of workers when auto scaling is enabled
python_version: python version string of program execution worker node
"""

workers: Optional[int] = None
min_workers: Optional[int] = None
max_workers: Optional[int] = None
auto_scaling: Optional[bool] = False
python_version: Optional[str] = ""
PYTHON_V3_8 = "py38"
PYTHON_V3_9 = "py39"
PYTHON_V3_10 = "py310"


class BaseJobClient:
"""Base class for Job clients."""

def run(
self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> "Job":
"""Runs program."""
raise NotImplementedError
Expand All @@ -86,6 +114,7 @@ def run_existing(
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
"""Executes existing program."""
raise NotImplementedError
Expand All @@ -110,6 +139,10 @@ def logs(self, job_id: str):
"""Return logs."""
raise NotImplementedError

def filtered_logs(self, job_id: str, **kwargs):
"""Return filtered logs."""
raise NotImplementedError

def result(self, job_id: str):
"""Return results."""
raise NotImplementedError
Expand Down Expand Up @@ -140,6 +173,9 @@ def stop(self, job_id: str):
def logs(self, job_id: str):
return self._job_client.get_job_logs(job_id)

def filtered_logs(self, job_id: str, **kwargs):
raise NotImplementedError

def result(self, job_id: str):
return self.logs(job_id)

Expand All @@ -151,7 +187,12 @@ def list(self, **kwargs) -> List["Job"]:
Job(job.job_id, job_client=self) for job in self._job_client.list_jobs()
]

def run(self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None):
def run(
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
arguments = arguments or {}
entrypoint = f"python {program.entrypoint}"

Expand Down Expand Up @@ -180,6 +221,7 @@ def run_existing(
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
raise NotImplementedError("Run existing is not available for RayJobClient.")

Expand Down Expand Up @@ -214,7 +256,12 @@ def get(self, job_id) -> Optional["Job"]:
def list(self, **kwargs) -> List["Job"]:
return [job["job"] for job in list(self._jobs.values())]

def run(self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None):
def run(
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
if program.dependencies:
for dependency in program.dependencies:
subprocess.check_call(
Expand Down Expand Up @@ -266,10 +313,11 @@ def upload(self, program: QiskitPattern):
}
return program.title

def run_existing(
def run_existing( # pylint: disable=too-many-locals
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
if isinstance(program, QiskitPattern):
title = program.title
Expand Down Expand Up @@ -333,7 +381,10 @@ def __init__(self, host: str, token: str, version: str):
self._token = token

def run( # pylint: disable=too-many-locals
self, program: QiskitPattern, arguments: Optional[Dict[str, Any]] = None
self,
program: QiskitPattern,
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
) -> "Job":
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.run") as span:
Expand Down Expand Up @@ -367,21 +418,26 @@ def run( # pylint: disable=too-many-locals
)

with open(artifact_file_path, "rb") as file:
data = {
"title": program.title,
"entrypoint": program.entrypoint,
"arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder),
"dependencies": json.dumps(program.dependencies or []),
}
if config:
data["config"] = json.dumps(asdict(config))
else:
data["config"] = "{}"

response_data = safe_json_request(
request=lambda: requests.post(
url=url,
data={
"title": program.title,
"entrypoint": program.entrypoint,
"arguments": json.dumps(
arguments or {}, cls=QiskitObjectsEncoder
),
"dependencies": json.dumps(program.dependencies or []),
},
data=data,
files={"artifact": file},
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
),
verbose=True,
)
job_id = response_data.get("id")
span.set_attribute("job.id", job_id)
Expand Down Expand Up @@ -449,6 +505,7 @@ def run_existing(
self,
program: Union[str, QiskitPattern],
arguments: Optional[Dict[str, Any]] = None,
config: Optional[Configuration] = None,
):
if isinstance(program, QiskitPattern):
title = program.title
Expand All @@ -462,15 +519,19 @@ def run_existing(

url = f"{self.host}/api/{self.version}/programs/run_existing/"

data = {
"title": title,
"arguments": json.dumps(arguments or {}, cls=QiskitObjectsEncoder),
}
if config:
data["config"] = json.dumps(asdict(config))
else:
data["config"] = "{}"

response_data = safe_json_request(
request=lambda: requests.post(
url=url,
data={
"title": title,
"arguments": json.dumps(
arguments or {}, cls=QiskitObjectsEncoder
),
},
data=data,
headers={"Authorization": f"Bearer {self._token}"},
timeout=REQUESTS_TIMEOUT,
)
Expand Down Expand Up @@ -519,6 +580,27 @@ def logs(self, job_id: str):
)
return response_data.get("logs")

def filtered_logs(self, job_id: str, **kwargs):
all_logs = self.logs(job_id=job_id)
included = ""
include = kwargs.get("include")
if include is not None:
for line in all_logs.split("\n"):
if re.search(include, line) is not None:
included = included + line + "\n"
else:
included = all_logs

excluded = ""
exclude = kwargs.get("exclude")
if exclude is not None:
for line in included.split("\n"):
if line != "" and re.search(exclude, line) is None:
excluded = excluded + line + "\n"
else:
excluded = included
return excluded

def result(self, job_id: str):
tracer = trace.get_tracer("client.tracer")
with tracer.start_as_current_span("job.result"):
Expand Down Expand Up @@ -621,6 +703,14 @@ def logs(self) -> str:
"""Returns logs of the job."""
return self._job_client.logs(self.job_id)

def filtered_logs(self, **kwargs) -> str:
"""Returns logs of the job.
Args:
include: rex expression finds match in the log line to be included
exclude: rex expression finds match in the log line to be excluded
"""
return self._job_client.filtered_logs(job_id=self.job_id, **kwargs)

def result(self, wait=True, cadence=5, verbose=False):
"""Return results of the job.
Args:
Expand Down
Loading
Loading