From 674017dbb2882145a77391b4c7c5dfa9155c26bf Mon Sep 17 00:00:00 2001 From: Kevin Tian Date: Wed, 17 Jul 2024 12:28:33 -0400 Subject: [PATCH] Wait for job if maximum pending jobs limit is exceeded (#1794) * Add usage endpoint * warn users if maxPendingJobs exceeded * Wait for job if max pending jobs exceeded * Release note * Refactor * Update warning message --------- Co-authored-by: ptristan3 <44805021+ptristan3@users.noreply.github.com> --- qiskit_ibm_runtime/api/clients/runtime.py | 8 ++++ qiskit_ibm_runtime/api/rest/runtime.py | 10 +++++ qiskit_ibm_runtime/qiskit_runtime_service.py | 47 ++++++++++++++++++++ release-notes/unreleased/1794.feat.rst | 3 ++ 4 files changed, 68 insertions(+) create mode 100644 release-notes/unreleased/1794.feat.rst diff --git a/qiskit_ibm_runtime/api/clients/runtime.py b/qiskit_ibm_runtime/api/clients/runtime.py index 4bad22c5f..979ea9a80 100644 --- a/qiskit_ibm_runtime/api/clients/runtime.py +++ b/qiskit_ibm_runtime/api/clients/runtime.py @@ -358,3 +358,11 @@ def update_tags(self, job_id: str, tags: list) -> Response: API Response. """ return self._api.program_job(job_id).update_tags(tags) + + def usage(self) -> Dict[str, Any]: + """Return monthly open plan usage information. + + Returns: + API Response. + """ + return self._api.usage() diff --git a/qiskit_ibm_runtime/api/rest/runtime.py b/qiskit_ibm_runtime/api/rest/runtime.py index 86ee72c20..70f1af1f1 100644 --- a/qiskit_ibm_runtime/api/rest/runtime.py +++ b/qiskit_ibm_runtime/api/rest/runtime.py @@ -36,6 +36,7 @@ class Runtime(RestAdapterBase): "jobs": "/jobs", "backends": "/backends", "cloud_instance": "/instance", + "usage": "/usage", } def program_job(self, job_id: str) -> "ProgramJob": @@ -239,3 +240,12 @@ def is_qctrl_enabled(self) -> bool: """ url = self.get_url("cloud_instance") return self.session.get(url).json().get("qctrl_enabled") + + def usage(self) -> Dict[str, Any]: + """Return monthly open plan usage information. + + Returns: + JSON response. + """ + url = self.get_url("usage") + return self.session.get(url).json() diff --git a/qiskit_ibm_runtime/qiskit_runtime_service.py b/qiskit_ibm_runtime/qiskit_runtime_service.py index 5884d24ad..f132045ea 100644 --- a/qiskit_ibm_runtime/qiskit_runtime_service.py +++ b/qiskit_ibm_runtime/qiskit_runtime_service.py @@ -863,6 +863,9 @@ def run( f"The backend {backend.name} currently has a status of {status.status_msg}." ) + if hgp_name == "ibm-q/open/main": + self.check_pending_jobs() + version = inputs.get("version", 1) if inputs else 1 try: response = self._api_client.program_run( @@ -927,6 +930,35 @@ def _run(self, *args: Any, **kwargs: Any) -> Union[RuntimeJob, RuntimeJobV2]: """Private run method""" return self.run(*args, **kwargs) + def check_pending_jobs(self) -> None: + """Check the number of pending jobs and wait for the oldest pending job if + the maximum number of pending jobs has been reached. + """ + try: + usage = self.usage().get("byInstance")[0] + pending_jobs = usage.get("pendingJobs") + max_pending_jobs = usage.get("maxPendingJobs") + if pending_jobs >= max_pending_jobs: + oldest_running = self.jobs(limit=1, descending=False, pending=True) + if oldest_running: + logger.warning( + "The pending jobs limit has been reached. " + "Waiting for job %s to finish before submitting the next one.", + oldest_running[0], + ) + try: + oldest_running[0].wait_for_final_state(timeout=300) + + except Exception as ex: # pylint: disable=broad-except + logger.debug( + "An error occurred while waiting for job %s to finish: %s", + oldest_running[0].job_id(), + ex, + ) + + except Exception as ex: # pylint: disable=broad-except + logger.warning("Unable to retrieve open plan pending jobs details. %s", ex) + def job(self, job_id: str) -> Union[RuntimeJob, RuntimeJobV2]: """Retrieve a runtime job. @@ -1063,6 +1095,21 @@ def delete_job(self, job_id: str) -> None: raise RuntimeJobNotFound(f"Job not found: {ex.message}") from None raise IBMRuntimeError(f"Failed to delete job: {ex}") from None + def usage(self) -> Dict[str, Any]: + """Return monthly open plan usage information. + + Returns: + Dict with usage details. + + Raises: + IBMInputValueError: If method is called when using the ibm_cloud channel + """ + if self._channel == "ibm_cloud": + raise IBMInputValueError( + "Usage is only available for the ``ibm_quantum`` channel open plan." + ) + return self._api_client.usage() + def _decode_job(self, raw_data: Dict) -> Union[RuntimeJob, RuntimeJobV2]: """Decode job data received from the server. diff --git a/release-notes/unreleased/1794.feat.rst b/release-notes/unreleased/1794.feat.rst new file mode 100644 index 000000000..5090811ab --- /dev/null +++ b/release-notes/unreleased/1794.feat.rst @@ -0,0 +1,3 @@ +When running jobs on the open plan, there will now be a warning if the limit for the +maximum number of pending jobs has been reached. The service will also attempt to wait +for the oldest pending jobs to finish running before submitting a new job. \ No newline at end of file