Skip to content

Commit

Permalink
Wait for job if maximum pending jobs limit is exceeded (#1794)
Browse files Browse the repository at this point in the history
* Add usage endpoint

* warn users if maxPendingJobs exceeded

* Wait for job if max pending jobs exceeded

* Release note

* Refactor

* Update warning message

---------

Co-authored-by: ptristan3 <[email protected]>
  • Loading branch information
kt474 and ptristan3 authored Jul 17, 2024
1 parent 86a1560 commit 674017d
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
8 changes: 8 additions & 0 deletions qiskit_ibm_runtime/api/clients/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,11 @@ def update_tags(self, job_id: str, tags: list) -> Response:
API Response.
"""
return self._api.program_job(job_id).update_tags(tags)

def usage(self) -> Dict[str, Any]:
"""Return monthly open plan usage information.
Returns:
API Response.
"""
return self._api.usage()
10 changes: 10 additions & 0 deletions qiskit_ibm_runtime/api/rest/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Runtime(RestAdapterBase):
"jobs": "/jobs",
"backends": "/backends",
"cloud_instance": "/instance",
"usage": "/usage",
}

def program_job(self, job_id: str) -> "ProgramJob":
Expand Down Expand Up @@ -239,3 +240,12 @@ def is_qctrl_enabled(self) -> bool:
"""
url = self.get_url("cloud_instance")
return self.session.get(url).json().get("qctrl_enabled")

def usage(self) -> Dict[str, Any]:
"""Return monthly open plan usage information.
Returns:
JSON response.
"""
url = self.get_url("usage")
return self.session.get(url).json()
47 changes: 47 additions & 0 deletions qiskit_ibm_runtime/qiskit_runtime_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,9 @@ def run(
f"The backend {backend.name} currently has a status of {status.status_msg}."
)

if hgp_name == "ibm-q/open/main":
self.check_pending_jobs()

version = inputs.get("version", 1) if inputs else 1
try:
response = self._api_client.program_run(
Expand Down Expand Up @@ -927,6 +930,35 @@ def _run(self, *args: Any, **kwargs: Any) -> Union[RuntimeJob, RuntimeJobV2]:
"""Private run method"""
return self.run(*args, **kwargs)

def check_pending_jobs(self) -> None:
"""Check the number of pending jobs and wait for the oldest pending job if
the maximum number of pending jobs has been reached.
"""
try:
usage = self.usage().get("byInstance")[0]
pending_jobs = usage.get("pendingJobs")
max_pending_jobs = usage.get("maxPendingJobs")
if pending_jobs >= max_pending_jobs:
oldest_running = self.jobs(limit=1, descending=False, pending=True)
if oldest_running:
logger.warning(
"The pending jobs limit has been reached. "
"Waiting for job %s to finish before submitting the next one.",
oldest_running[0],
)
try:
oldest_running[0].wait_for_final_state(timeout=300)

except Exception as ex: # pylint: disable=broad-except
logger.debug(
"An error occurred while waiting for job %s to finish: %s",
oldest_running[0].job_id(),
ex,
)

except Exception as ex: # pylint: disable=broad-except
logger.warning("Unable to retrieve open plan pending jobs details. %s", ex)

def job(self, job_id: str) -> Union[RuntimeJob, RuntimeJobV2]:
"""Retrieve a runtime job.
Expand Down Expand Up @@ -1063,6 +1095,21 @@ def delete_job(self, job_id: str) -> None:
raise RuntimeJobNotFound(f"Job not found: {ex.message}") from None
raise IBMRuntimeError(f"Failed to delete job: {ex}") from None

def usage(self) -> Dict[str, Any]:
"""Return monthly open plan usage information.
Returns:
Dict with usage details.
Raises:
IBMInputValueError: If method is called when using the ibm_cloud channel
"""
if self._channel == "ibm_cloud":
raise IBMInputValueError(
"Usage is only available for the ``ibm_quantum`` channel open plan."
)
return self._api_client.usage()

def _decode_job(self, raw_data: Dict) -> Union[RuntimeJob, RuntimeJobV2]:
"""Decode job data received from the server.
Expand Down
3 changes: 3 additions & 0 deletions release-notes/unreleased/1794.feat.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
When running jobs on the open plan, there will now be a warning if the limit for the
maximum number of pending jobs has been reached. The service will also attempt to wait
for the oldest pending jobs to finish running before submitting a new job.

0 comments on commit 674017d

Please sign in to comment.