From 96f1b0818d3d163f31d442f0e2009f7a27957b4e Mon Sep 17 00:00:00 2001 From: Phillip Weinberg Date: Tue, 26 Sep 2023 13:48:01 -0400 Subject: [PATCH] Virtual queue support quera internal api (#636) * adding virtual queue header. * fixing unit tests. * adding modification of base header. --- src/bloqade/submission/quera.py | 10 +- .../submission/quera_api_client/api.py | 37 ++++--- tests/test_quera_internal_api.py | 97 ++++++++----------- 3 files changed, 72 insertions(+), 72 deletions(-) diff --git a/src/bloqade/submission/quera.py b/src/bloqade/submission/quera.py index ca0c669ec..cfae53c0b 100644 --- a/src/bloqade/submission/quera.py +++ b/src/bloqade/submission/quera.py @@ -1,3 +1,4 @@ +from pydantic import PrivateAttr from bloqade.submission.base import SubmissionBackend, ValidationError from bloqade.submission.quera_api_client.api import QueueApi from bloqade.submission.ir.task_specification import ( @@ -15,6 +16,7 @@ class QuEraBackend(SubmissionBackend): api_hostname: str qpu_id: str api_stage: str = "v0" + virtual_queue: Optional[str] = None proxy: Optional[str] = None # Sigv4Request arguments region: Optional[str] = None @@ -25,11 +27,15 @@ class QuEraBackend(SubmissionBackend): role_arn: Optional[str] = None role_session_name: Optional[str] = None profile: Optional[str] = None + _queue_api: Optional[QueueApi] = PrivateAttr(None) @property def queue_api(self): - kwargs = {k: v for k, v in self.__dict__.items() if v is not None} - return QueueApi(**kwargs) + if self._queue_api is None: + kwargs = {k: v for k, v in self.__dict__.items() if v is not None} + self._queue_api = QueueApi(**kwargs) + + return self._queue_api def get_capabilities(self) -> QuEraCapabilities: try: diff --git a/src/bloqade/submission/quera_api_client/api.py b/src/bloqade/submission/quera_api_client/api.py index 30ddbee17..df9b15363 100644 --- a/src/bloqade/submission/quera_api_client/api.py +++ b/src/bloqade/submission/quera_api_client/api.py @@ -21,6 +21,7 @@ def __init__( api_hostname: str, qpu_id: str, api_stage="v0", + virtual_queue: Optional[str] = None, proxy: Optional[str] = None, ): """ @@ -28,6 +29,7 @@ def __init__( @param api_hostname: hostname of the API instance. @param qpu_id: The QPU ID, for example `qpu1-mock`. @param api_stage: Specify which version of the API to call from this object. + @param virtual_queue: Optional, the virtual queue to use for the API. @param proxy: Optional, the hostname for running the API via some proxy endpoint. """ @@ -43,6 +45,7 @@ def __init__( self.base_url = "https://" + uri_with_version self.qpu_id = qpu_id + self.virtual_queue = virtual_queue self.logger = logging.getLogger(self.__class__.__name__) @staticmethod @@ -60,17 +63,18 @@ def _result_as_json(result: requests.Response) -> Dict: def _generate_headers(self, base: Optional[dict] = None) -> Dict: if base is None: - if self.hostname is None: - return {"Content-Type": "application/json"} - else: - return {"Content-Type": "application/json", "Host": self.hostname} + header = {"Content-Type": "application/json"} else: - if self.hostname is None: - return base - else: - header = dict(base) - header["Host"] = self.hostname - return header # type: ignore + header = dict(base) + header["Content-Type"] = "application/json" + + if self.hostname is not None: + header["Host"] = self.hostname + + if self.virtual_queue is not None: + header["virtual_queue"] = self.virtual_queue + + return header def _get_path(self, *path_list: str): return "/".join((self.base_url, self.qpu_id) + path_list) @@ -124,6 +128,7 @@ def __init__( api_hostname: str, qpu_id: str, api_stage="v0", + virtual_queue: Optional[str] = None, proxy: Optional[str] = None, # Sigv4Request arguments region: str = "us-east-1", @@ -140,6 +145,7 @@ def __init__( @param api_hostname: hostname of the API instance. @param qpu_id: The QPU ID, for example `qpu1-mock`. @param api_stage: Specify which version of the API to call from this object. + @param virtual_queue: Optional, the virtual queue to use for the API. @param proxy: Optional, the hostname for running the API via some proxy endpoint. @param region: AWS region, default value: "us-east-1" @@ -152,7 +158,13 @@ def __init__( @param role_session_name: AWS role session name, defualy value: 'awsrequest', @param profile: Optional, AWS profile to use credentials for. """ - super().__init__(api_hostname, qpu_id, api_stage=api_stage, proxy=proxy) + super().__init__( + api_hostname, + qpu_id, + virtual_queue=virtual_queue, + api_stage=api_stage, + proxy=proxy, + ) self.request = Sigv4Request( region=region, @@ -262,6 +274,7 @@ def __init__( api_hostname: str, qpu_id: str, api_stage="v0", + virtual_queue: Optional[str] = None, proxy: Optional[str] = None, **request_sigv4_kwargs, ): @@ -270,6 +283,7 @@ def __init__( @param api_hostname: hostname of the API instance. @param qpu_id: The QPU ID, for example `qpu1-mock`. @param api_stage: Specify which version of the API to call from this object. + @param virtual_queue: Optional, the virtual queue to use for the API. @param proxy: Optional, the hostname for running the API via some proxy endpoint. @@ -289,6 +303,7 @@ def __init__( api_hostname, qpu_id, api_stage=api_stage, + virtual_queue=virtual_queue, proxy=proxy, **request_sigv4_kwargs, ) diff --git a/tests/test_quera_internal_api.py b/tests/test_quera_internal_api.py index e178a11ec..977f51e92 100644 --- a/tests/test_quera_internal_api.py +++ b/tests/test_quera_internal_api.py @@ -12,6 +12,25 @@ import os +API_HOSTNAME = "api.que-ee.com" +API_PROXY = "proxy-api.que-ee.com" +VIRTUAL_QUEUE = "virtual-queue" + +API_CONFIG = dict( + api_hostname=API_HOSTNAME, + qpu_id="qpu-1", + api_stage="v0", + proxy=API_PROXY, + virtual_queue=VIRTUAL_QUEUE, +) + +HEADERS = { + "Content-Type": "application/json", + "Host": API_HOSTNAME, + "virtual_queue": VIRTUAL_QUEUE, +} + + # Integraiton tests @pytest.mark.skip(reason="Depricating save_batch") @pytest.mark.vcr @@ -115,18 +134,18 @@ def test_happy_path_queue_api(*args): def mock_get(*args, **kwargs): print("get", args, kwargs) if (args, kwargs) == ( - ("https://https://api.que-ee.com/v0/qpu-1/capabilities",), - {"headers": {"Content-Type": "application/json"}}, + ("https://" + API_PROXY + "/v0/qpu-1/capabilities",), + {"headers": HEADERS}, ): return create_response(200, get_capabilities().dict()) elif (args, kwargs) == ( - ("https://https://api.que-ee.com/v0/qpu-1/queue/task/task_id",), - {"headers": {"Content-Type": "application/json"}}, + ("https://" + API_PROXY + "/v0/qpu-1/queue/task/task_id",), + {"headers": HEADERS}, ): return create_response(200, {"status": "Completed"}) elif (args, kwargs) == ( - ("https://https://api.que-ee.com/v0/qpu-1/task/task_id/results",), - {"headers": {"Content-Type": "application/json"}}, + ("https://" + API_PROXY + "/v0/qpu-1/task/task_id/results",), + {"headers": HEADERS}, ): return create_response(200, mock_results()) else: @@ -135,8 +154,8 @@ def mock_get(*args, **kwargs): def mock_put(*args, **kwargs): print("put", args, kwargs) if (args, kwargs) == ( - ("https://https://api.que-ee.com/v0/qpu-1/queue/task/task_id/cancel",), - {"headers": {"Content-Type": "application/json"}}, + ("https://" + API_PROXY + "/v0/qpu-1/queue/task/task_id/cancel",), + {"headers": HEADERS}, ): return create_response(200, {}) else: @@ -145,7 +164,7 @@ def mock_put(*args, **kwargs): def mock_post(*args, **kwargs): print("post", args, kwargs) if ( - args == ("https://https://api.que-ee.com/v0/qpu-1/queue/task",) + args == ("https://" + API_PROXY + "/v0/qpu-1/queue/task",) and kwargs["headers"]["Content-Type"] == "application/json" ): task_ir_string = kwargs["data"] @@ -159,10 +178,6 @@ def mock_post(*args, **kwargs): else: raise NotImplementedError - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -178,7 +193,7 @@ def mock_post(*args, **kwargs): request_api.get.side_effect = mock_get request_api.post.side_effect = mock_post request_api.put.side_effect = mock_put - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) assert queue.get_capabilities() == get_capabilities().dict() assert queue.post_task(mock_task_ir()) == "task_id" @@ -190,10 +205,6 @@ def mock_post(*args, **kwargs): @patch("bloqade.submission.quera_api_client.api.Sigv4Request") def test_task_results_polling(*args): - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -206,7 +217,7 @@ def test_task_results_polling(*args): ) request_api = bloqade.submission.quera_api_client.api.Sigv4Request(**sig_v4_config) - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) request_api.get.side_effect = [ create_response(200, {"status": "Enqueued"}), @@ -226,10 +237,6 @@ def test_task_results_polling(*args): @patch("bloqade.submission.quera_api_client.api.Sigv4Request") def test_post_task_error_paths(*args): - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -243,7 +250,7 @@ def test_post_task_error_paths(*args): QueueApi = bloqade.submission.quera_api_client.api.QueueApi request_api = bloqade.submission.quera_api_client.api.Sigv4Request(**sig_v4_config) - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) request_api.post.side_effect = [ create_response(404, {}), @@ -271,10 +278,6 @@ def test_post_task_error_paths(*args): @patch("bloqade.submission.quera_api_client.api.Sigv4Request") def test_task_validation_error_paths(*args): - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -288,7 +291,7 @@ def test_task_validation_error_paths(*args): QueueApi = bloqade.submission.quera_api_client.api.QueueApi request_api = bloqade.submission.quera_api_client.api.Sigv4Request(**sig_v4_config) - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) request_api.post.side_effect = [ create_response(404, {}), @@ -312,10 +315,6 @@ def test_task_validation_error_paths(*args): @patch("bloqade.submission.quera_api_client.api.Sigv4Request") def test_get_capabilities_error_paths(*args): - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -329,7 +328,7 @@ def test_get_capabilities_error_paths(*args): QueueApi = bloqade.submission.quera_api_client.api.QueueApi request_api = bloqade.submission.quera_api_client.api.Sigv4Request(**sig_v4_config) - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) request_api.get.side_effect = [ create_response(404, {}), @@ -348,11 +347,7 @@ def test_get_capabilities_error_paths(*args): def test_api_requests_errors(): - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - - api_requests = bloqade.submission.quera_api_client.api.ApiRequest(**api_config) + api_requests = bloqade.submission.quera_api_client.api.ApiRequest(**API_CONFIG) with pytest.raises(NotImplementedError): api_requests._post("https://api.que-ee.com/v0/qpu-1", {}, {}) @@ -366,10 +361,6 @@ def test_api_requests_errors(): @patch("bloqade.submission.quera_api_client.api.Sigv4Request") def test_get_task_status_in_queue_error_paths(*args): - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -383,7 +374,7 @@ def test_get_task_status_in_queue_error_paths(*args): QueueApi = bloqade.submission.quera_api_client.api.QueueApi request_api = bloqade.submission.quera_api_client.api.Sigv4Request(**sig_v4_config) - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) request_api.get.side_effect = [ create_response(400, {}), @@ -403,10 +394,6 @@ def test_get_task_status_in_queue_error_paths(*args): @patch("bloqade.submission.quera_api_client.api.Sigv4Request") def test_cancel_task_in_queue_error_paths(*args): - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -420,7 +407,7 @@ def test_cancel_task_in_queue_error_paths(*args): QueueApi = bloqade.submission.quera_api_client.api.QueueApi request_api = bloqade.submission.quera_api_client.api.Sigv4Request(**sig_v4_config) - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) request_api.put.side_effect = [ create_response(403, {}), @@ -442,10 +429,6 @@ def test_cancel_task_in_queue_error_paths(*args): def test_get_task_results_paths(*args): from bloqade.submission.quera_api_client.api import ApiRequest - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -459,7 +442,7 @@ def test_get_task_results_paths(*args): QueueApi = bloqade.submission.quera_api_client.api.QueueApi request_api = bloqade.submission.quera_api_client.api.Sigv4Request(**sig_v4_config) - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) request_api.get.side_effect = [ create_response(200, {"status": "Created"}), @@ -540,10 +523,6 @@ def test_get_task_results_paths(*args): @patch("bloqade.submission.quera_api_client.api.Sigv4Request") def test_get_task_summary_paths(*args): - api_config = dict( - api_hostname="https://api.que-ee.com", qpu_id="qpu-1", api_stage="v0" - ) - sig_v4_config = dict( region="us-east-1", access_key=None, @@ -557,7 +536,7 @@ def test_get_task_summary_paths(*args): QueueApi = bloqade.submission.quera_api_client.api.QueueApi request_api = bloqade.submission.quera_api_client.api.Sigv4Request(**sig_v4_config) - queue = bloqade.submission.quera_api_client.api.QueueApi(**api_config) + queue = bloqade.submission.quera_api_client.api.QueueApi(**API_CONFIG) request_api.get.side_effect = [ create_response(200, {"status": "Running"}),