From 1d5ea1602094d2ae7b3d2f4d5e83b3089c4fa747 Mon Sep 17 00:00:00 2001 From: Nate Shim Date: Fri, 22 Mar 2024 07:38:29 -0700 Subject: [PATCH] [DEV-11456] make request chains wait appropriately for sync/async calls (#298) * refactor wait logic * edits * formatting * check for debouncer instance instead of float or int; update typing; add kwarg max_wait_time for all calls using wait * fix random typo in createexport; add debouncer to rest of uqeries * update typing tuple to union; update docstrings * update timer logic * address comments * change max_wait_time to request_interval * correct job docstring * address comments * address comments --------- Co-authored-by: Nathanael Shim --- examples/aio_client.py | 2 +- indico/client/client.py | 22 ++++--- indico/client/request.py | 19 ++---- indico/queries/datasets.py | 54 ++++++++------- indico/queries/export.py | 49 +++++++++----- indico/queries/jobs.py | 26 +++++--- indico/queries/model_groups/model_groups.py | 73 +++++++++++---------- indico/queries/questionnaire.py | 5 +- indico/queries/submission.py | 11 ++-- indico/queries/workflow.py | 6 +- 10 files changed, 148 insertions(+), 119 deletions(-) diff --git a/examples/aio_client.py b/examples/aio_client.py index 844eca3b..efaa9db9 100644 --- a/examples/aio_client.py +++ b/examples/aio_client.py @@ -47,4 +47,4 @@ async def example_1(client): if __name__ == "__main__": # How to run a Python script using async - asyncio.run(example_with_client) + asyncio.run(example_with_client()) diff --git a/indico/client/client.py b/indico/client/client.py index 1ac05b15..8710a359 100644 --- a/indico/client/client.py +++ b/indico/client/client.py @@ -1,17 +1,21 @@ # -*- coding: utf-8 -*- -from typing import Union, Optional +import asyncio +import time +from typing import Optional, Union + import urllib3 -from indico.config import IndicoConfig -from indico.errors import IndicoError -from indico.http.client import HTTPClient, AIOHTTPClient from indico.client.request import ( + GraphQLRequest, HTTPRequest, - RequestChain, PagedRequest, - GraphQLRequest, + RequestChain, ) +from indico.config import IndicoConfig +from indico.errors import IndicoError +from indico.client.request import Delay +from indico.http.client import AIOHTTPClient, HTTPClient class IndicoClient: @@ -47,7 +51,8 @@ def _handle_request_chain(self, chain: RequestChain): elif isinstance(request, RequestChain): response = self._handle_request_chain(request) chain.previous = response - + elif isinstance(request, Delay): + time.sleep(request.seconds) if chain.result: return chain.result return response @@ -147,7 +152,8 @@ async def _handle_request_chain(self, chain: RequestChain): elif isinstance(request, RequestChain): response = await self._handle_request_chain(request) chain.previous = response - + elif isinstance(request, Delay): + await asyncio.sleep(request.seconds) if chain.result: return chain.result return response diff --git a/indico/client/request.py b/indico/client/request.py index dcb13c8d..1db5e11f 100644 --- a/indico/client/request.py +++ b/indico/client/request.py @@ -1,7 +1,7 @@ -from typing import Dict, Any from enum import Enum +from typing import Any, Dict, Union + from indico.errors import IndicoRequestError -import time class HTTPMethod(Enum): @@ -89,15 +89,6 @@ def requests(self): pass -class Debouncer: - def __init__(self, max_timeout: int = 5): - self.timeout = 0 - self.max_timeout = max_timeout or 5 # prevent None and 0 - - def backoff(self): - self.increment_timeout() - time.sleep(self.timeout) - - def increment_timeout(self): - if self.timeout < self.max_timeout: - self.timeout += 1 +class Delay: + def __init__(self, seconds: Union[int, float] = 2): + self.seconds = seconds diff --git a/indico/queries/datasets.py b/indico/queries/datasets.py index d7c8a562..0dc497e4 100644 --- a/indico/queries/datasets.py +++ b/indico/queries/datasets.py @@ -1,32 +1,32 @@ # -*- coding: utf-8 -*- import json -import jsons import tempfile from pathlib import Path -from typing import List, Union, Dict, Optional +from typing import Dict, List, Optional, Union -import pandas as pd import deprecation +import jsons +import pandas as pd from indico.client.request import ( - Debouncer, + Delay, GraphQLRequest, HTTPMethod, HTTPRequest, PagedRequest, RequestChain, ) -from indico.errors import IndicoNotFound, IndicoInputError +from indico.errors import IndicoInputError, IndicoNotFound +from indico.filters import DatasetFilter from indico.queries.storage import UploadBatched, UploadImages from indico.types.dataset import ( Dataset, OcrEngine, + OcrInputLanguage, OmnipageOcrOptionsInput, ReadApiOcrOptionsInput, - OcrInputLanguage, ) -from indico.filters import DatasetFilter class ListDatasets(PagedRequest): @@ -196,12 +196,17 @@ class CreateDataset(RequestChain): Create a dataset and upload the associated files. Args: - name (str): Name of the dataset - files (List[str]): List of pathnames to the dataset files - - Options: - dataset_type (str): Type of dataset to create [TEXT, DOCUMENT, IMAGE] - wait (bool, default=True): Wait for the dataset to upload and finish + name (str): Name of the dataset. + files (List[str]): List of path names to the dataset files. + wait (bool, optional): Wait for the dataset to upload and finish. Defaults to True. + dataset_type (str, optional): Type of dataset to create [TEXT, DOCUMENT, IMAGE]. Defaults to TEXT. + from_local_images (bool, optional): Flag whether files are local images or not. Defaults to False. + image_filename_col (str, optional): Image filename column. Defaults to 'filename'. + batch_size (int, optional): Size of file batch to upload at a time. Defaults to 20. + ocr_engine (OcrEngine, optional): Specify an OCR engine [OMNIPAGE, READAPI, READAPI_V2, READAPI_TABLES_V1]. Defaults to None. + omnipage_ocr_options (OmnipageOcrOptionsInput, optional): If using Omnipage, specify Omnipage OCR options. Defaults to None. + read_api_ocr_options: (ReadApiOcrOptionsInput, optional): If using ReadAPI, specify ReadAPI OCR options. Defaults to None. + request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 5 seconds. Returns: Dataset object @@ -222,6 +227,7 @@ def __init__( ocr_engine: OcrEngine = None, omnipage_ocr_options: OmnipageOcrOptionsInput = None, read_api_ocr_options: ReadApiOcrOptionsInput = None, + request_interval: Union[int, float] = 5, ): self.files = files self.name = name @@ -233,6 +239,7 @@ def __init__( self.ocr_engine = ocr_engine self.omnipage_ocr_options = omnipage_ocr_options self.read_api_ocr_options = read_api_ocr_options + self.request_interval = request_interval if omnipage_ocr_options is not None and read_api_ocr_options is not None: raise IndicoInputError( "Must supply either omnipage or readapi options but not both." @@ -278,13 +285,12 @@ def requests(self): ) dataset_id = self.previous.id yield GetDatasetFileStatus(id=dataset_id) - debouncer = Debouncer() if self.wait is True: while not all( [f.status in ["PROCESSED", "FAILED"] for f in self.previous.files] ): yield GetDatasetFileStatus(id=dataset_id) - debouncer.backoff() + yield Delay(seconds=self.request_interval) yield GetDataset(id=dataset_id) @@ -475,12 +481,11 @@ def requests(self): ) yield GetDatasetFileStatus(id=self.dataset_id) if self.wait: - debouncer = Debouncer() while not all( f.status in self.expected_statuses for f in self.previous.files ): yield GetDatasetFileStatus(id=self.previous.id) - debouncer.backoff() + yield Delay() # Alias for backwards compatibility @@ -538,9 +543,10 @@ class ProcessFiles(RequestChain): Process files associated with a dataset and add corresponding data to the dataset Args: - dataset_id (int): ID of the dataset - datafile_ids (List[str]): IDs of the datafiles to process - wait (bool): Block while polling for status of files + dataset_id (int): ID of the dataset. + datafile_ids (List[str]): IDs of the datafiles to process. + wait (bool, optional): Block while polling for status of files. Defaults to True. + request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 5 seconds. Returns: @@ -552,21 +558,22 @@ def __init__( dataset_id: int, datafile_ids: List[int], wait: bool = True, + request_interval: Union[int, float] = 5, ): self.dataset_id = dataset_id self.datafile_ids = datafile_ids self.wait = wait + self.request_interval = request_interval def requests(self): yield _ProcessFiles(self.dataset_id, self.datafile_ids) - debouncer = Debouncer() yield GetDatasetFileStatus(id=self.dataset_id) if self.wait: while not all( f.status in ["PROCESSED", "FAILED"] for f in self.previous.files ): yield GetDatasetFileStatus(id=self.dataset_id) - debouncer.backoff() + yield Delay(seconds=self.request_interval) @deprecation.deprecated( @@ -593,14 +600,13 @@ def __init__(self, dataset_id: int, datafile_ids: List[int], wait: bool = True): def requests(self): yield _ProcessCSV(self.dataset_id, self.datafile_ids) - debouncer = Debouncer() yield GetDatasetFileStatus(id=self.dataset_id) if self.wait: while not all( f.status in ["PROCESSED", "FAILED"] for f in self.previous.files ): yield GetDatasetFileStatus(id=self.dataset_id) - debouncer.backoff() + yield Delay() class GetAvailableOcrEngines(GraphQLRequest): diff --git a/indico/queries/export.py b/indico/queries/export.py index 435ac301..bd8cfdf7 100644 --- a/indico/queries/export.py +++ b/indico/queries/export.py @@ -1,11 +1,13 @@ -import pandas as pd import io +import warnings from typing import List, Union -from indico.client import GraphQLRequest, RequestChain, Debouncer +import pandas as pd + +from indico.client import Delay, GraphQLRequest, RequestChain from indico.errors import IndicoNotFound, IndicoRequestError -from indico.types.export import LabelResolutionStrategy, Export from indico.queries.storage import RetrieveStorageObject +from indico.types.export import Export, LabelResolutionStrategy class _CreateExport(GraphQLRequest): @@ -13,7 +15,7 @@ class _CreateExport(GraphQLRequest): mutation CreateExport( $datasetId: Int!, $labelsetId: Int!, - $columnIds: [Int], + $columnIds: [Int], $modelIds: [Int], $frozenLabelsetIds: [Int], $combineLabels: LabelResolutionStrategy, @@ -55,7 +57,16 @@ def __init__( combine_labels: LabelResolutionStrategy = LabelResolutionStrategy.ALL.name, file_info: bool = None, anonymoous: bool = None, + anonymous: bool = None, ): + if anonymoous: + warnings.warn( + "Argument anonymoous is deprecated and will be removed in future versions. Use argument anonymous instead." + ) + if anonymous: + raise IndicoRequestError("Cannot use both anonymoous and anonymous.") + else: + anonymous = anonymoous super().__init__( self.query, variables={ @@ -66,7 +77,7 @@ def __init__( "frozenLabelsetIds": frozen_labelset_ids, "combineLabels": combine_labels, "fileInfo": file_info, - "anonymous": anonymoous, + "anonymous": anonymous, }, ) @@ -93,7 +104,7 @@ class GetExport(GraphQLRequest): exports { id datasetId - name + name status columnIds labelsetId @@ -165,15 +176,16 @@ class CreateExport(RequestChain): Create an export job for a dataset. Args: - dataset_id (int): Dataset to create the export for - labelset_id (int): Labelset column id to export - column_ids (List(int)): Data column ids to export - model_ids (List(int)): Model ids to include predictions from - frozen_labelset_ids: (List(int)): frozen labelset ids to limit examples by - combine_labels (LabelResolutionStrategy): One row per example, combine labels from multiple labels into a single row - file_info (bool): Include datafile information - anonymous (bool): Anonymize user information - wait (bool): Wait for the export to complete. Default is True + dataset_id (int): Dataset to create the export for. + labelset_id (int): Labelset column id to export. + column_ids (List(int), optional): Data column ids to export. Defaults to None. + model_ids (List(int), optional): Model ids to include predictions from. Defaults to None. + frozen_labelset_ids: (List(int), optional): frozen labelset ids to limit examples by. Defaults to None. + combine_labels (LabelResolutionStrategy, optional): One row per example, combine labels from multiple labels into a single row. Defaults to 'all'. + file_info (bool, optional): Include datafile information. Defaults to False. + anonymous (bool, optional): Anonymize user information. Defaults to False. + wait (bool, optional): Wait for the export to complete. Defaults to True. + request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 5 seconds. Returns: Export object @@ -193,6 +205,7 @@ def __init__( file_info: bool = False, anonymous: bool = False, wait: bool = True, + request_interval: Union[int, float] = 5, ): self.dataset_id = dataset_id self.labelset_id = labelset_id @@ -203,6 +216,7 @@ def __init__( self.file_info = file_info self.anonymous = anonymous self.wait = wait + self.request_interval = request_interval super().__init__() def requests(self): @@ -214,12 +228,11 @@ def requests(self): frozen_labelset_ids=self.frozen_labelset_ids, combine_labels=self.combine_labels, file_info=self.file_info, - anonymoous=self.anonymous, + anonymous=self.anonymous, ) - debouncer = Debouncer() if self.wait is True: while self.previous.status not in ["COMPLETE", "FAILED"]: yield GetExport(self.previous.id) - debouncer.backoff() + yield Delay(seconds=self.request_interval) yield GetExport(self.previous.id) diff --git a/indico/queries/jobs.py b/indico/queries/jobs.py index f24e4483..506c5e40 100644 --- a/indico/queries/jobs.py +++ b/indico/queries/jobs.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- -import time +from typing import Union -from indico.client.request import GraphQLRequest, RequestChain +from indico.client.request import Delay, GraphQLRequest, RequestChain from indico.types.jobs import Job from indico.types.utils import Timer + class _JobStatus(GraphQLRequest): query = """ query JobStatus($id: String) { @@ -51,9 +52,10 @@ class JobStatus(RequestChain): Args: id (int): ID of the job to query for status. - wait (bool, optional): Whether to ait for the job to complete. Default is True + wait (bool, optional): Whether to wait for the job to complete. Defaults to True. + request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 0.2. timeout (float or int, optional): Timeout after this many seconds. - Ignored if not `wait`. Defaults to None + Ignored if not `wait`. Defaults to None. Returns: Job: With the job result available in a result attribute. Note that the result @@ -67,7 +69,13 @@ class JobStatus(RequestChain): previous: Job = None - def __init__(self, id: str, wait: bool = True, request_interval=0.2, timeout=None): + def __init__( + self, + id: str, + wait: bool = True, + request_interval: Union[int, float] = 0.2, + timeout: Union[int, float] = None, + ): self.id = id self.wait = wait self.request_interval = request_interval @@ -76,6 +84,9 @@ def __init__(self, id: str, wait: bool = True, request_interval=0.2, timeout=Non def requests(self): yield _JobStatus(id=self.id) if self.wait: + timer = None + if self.timeout is not None: + timer = Timer(self.timeout) # Check status of job until done if wait == True while not ( (self.previous.status in ["SUCCESS"] and self.previous.ready) @@ -88,9 +99,8 @@ def requests(self): "RETRY", ] ): - if self.timeout is not None: - timer = Timer(self.timeout) + if timer: timer.check() - time.sleep(self.request_interval) + yield Delay(seconds=self.request_interval) yield _JobStatus(id=self.id) yield _JobStatusWithResult(id=self.id) diff --git a/indico/queries/model_groups/model_groups.py b/indico/queries/model_groups/model_groups.py index 6198f1ff..2446a8ef 100644 --- a/indico/queries/model_groups/model_groups.py +++ b/indico/queries/model_groups/model_groups.py @@ -1,20 +1,16 @@ import json -from time import sleep -from typing import List, Dict +from typing import Dict, List, Union import deprecation -from indico.client.request import GraphQLRequest, RequestChain +from indico.client.request import Delay, GraphQLRequest, RequestChain +from indico.errors import IndicoNotFound from indico.queries.workflow_components import AddModelGroupComponent -from indico.types import Workflow -from indico.types.model_group import ModelGroup, NewLabelsetArguments, \ - NewQuestionnaireArguments -from indico.types.model import Model from indico.types.jobs import Job +from indico.types.model import Model +from indico.types.model_group import ModelGroup from indico.types.utils import cc_to_snake -from indico.errors import IndicoNotFound, IndicoError, IndicoInputError - class GetModelGroup(RequestChain): """ @@ -22,21 +18,26 @@ class GetModelGroup(RequestChain): Args: id (int): model group id to query + wait (bool, optional): Wait until the Model Group status is FAILED, COMPLETE, or NOT_ENOUGH_DATA. Defaults to False. + request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 5 seconds. Returns: ModelGroup object """ - def __init__(self, id: int, wait: bool = False): + def __init__( + self, id: int, wait: bool = False, request_interval: Union[int, float] = 5 + ): self.id = id self.wait = wait + self.request_interval = request_interval def requests(self): if self.wait: req = GetModelGroupSelectedModelStatus(id=self.id) yield req while self.previous not in ["FAILED", "COMPLETE", "NOT_ENOUGH_DATA"]: - sleep(1) + yield Delay(seconds=self.request_interval) yield req yield _GetModelGroup(id=self.id) @@ -128,7 +129,6 @@ def process_response(self, response): return Model(**last) - class GetModelGroupSelectedModelStatus(GraphQLRequest): """ Get the status string of the selected model for the given model group id @@ -164,8 +164,9 @@ def process_response(self, response): return mg.selected_model.status -@deprecation.deprecated(deprecated_in="5.0", - details="Use AddModelGroupComponent instead") +@deprecation.deprecated( + deprecated_in="5.0", details="Use AddModelGroupComponent instead" +) class CreateModelGroup(RequestChain): """ Create a new model group and train a model @@ -186,16 +187,17 @@ class CreateModelGroup(RequestChain): """ def __init__( - self, - name: str, - dataset_id: int, - source_column_id: int, - labelset_id: int, - workflow_id: int, - after_component_id: int, - wait: bool = False, - model_training_options: dict = None, - model_type: str = None + self, + name: str, + dataset_id: int, + source_column_id: int, + labelset_id: int, + workflow_id: int, + after_component_id: int, + wait: bool = False, + model_training_options: dict = None, + model_type: str = None, + request_interval: Union[int, float] = 5, ): self.name = name self.dataset_id = dataset_id @@ -206,6 +208,7 @@ def __init__( self.model_type = model_type self.workflow_id = workflow_id self.after_component_id = after_component_id + self.request_interval = request_interval def requests(self): yield AddModelGroupComponent( @@ -216,8 +219,7 @@ def requests(self): model_training_options=self.model_training_options, model_type=self.model_type, workflow_id=self.workflow_id, - after_component_id=self.after_component_id - + after_component_id=self.after_component_id, ) mg = self.previous.model_group_by_name(self.name) @@ -226,13 +228,15 @@ def requests(self): req = GetModelGroupSelectedModelStatus(id=model_group_id) yield req while self.previous not in ["FAILED", "COMPLETE", "NOT_ENOUGH_DATA"]: - sleep(1) + yield Delay(seconds=self.request_interval) yield req yield _GetModelGroup(id=model_group_id) -@deprecation.deprecated(deprecated_in="6.0", - details="Removed from platform. This call is a no-op.") + +@deprecation.deprecated( + deprecated_in="6.0", details="Removed from platform. This call is a no-op." +) class LoadModel(GraphQLRequest): """ Load model into system cache (implicit in ModelGroupPredict unless load=False) @@ -318,11 +322,11 @@ class ModelGroupPredict(RequestChain): """ def __init__( - self, - model_id: int, - data: List[str], - load: bool = True, - predict_options: Dict = None, + self, + model_id: int, + data: List[str], + load: bool = True, + predict_options: Dict = None, ): self.model_id = model_id self.data = data @@ -333,4 +337,3 @@ def requests(self): yield _ModelGroupPredict( model_id=self.model_id, data=self.data, predict_options=self.predict_options ) - diff --git a/indico/queries/questionnaire.py b/indico/queries/questionnaire.py index 9fa9bee0..4e5b02c7 100644 --- a/indico/queries/questionnaire.py +++ b/indico/queries/questionnaire.py @@ -7,7 +7,7 @@ from indico.client.request import ( GraphQLRequest, RequestChain, - Debouncer, + Delay, ) from indico.queries import AddModelGroupComponent from indico.types import NewLabelsetArguments, NewQuestionnaireArguments, Workflow, ModelTaskType @@ -322,9 +322,8 @@ def requests(self): ).model_group.questionnaire_id yield GetQuestionnaire(questionaire_id) status = self.previous.questions_status - debouncer = Debouncer() while status == "STARTED": - debouncer.backoff() + yield Delay() yield GetQuestionnaire(questionaire_id) status = self.previous.questions_status diff --git a/indico/queries/submission.py b/indico/queries/submission.py index f2ef3172..dbc0aa15 100644 --- a/indico/queries/submission.py +++ b/indico/queries/submission.py @@ -1,10 +1,9 @@ import json -import time from functools import partial from operator import eq, ne from typing import Dict, List, Union -from indico.client.request import GraphQLRequest, PagedRequest, RequestChain +from indico.client.request import Delay, GraphQLRequest, PagedRequest, RequestChain from indico.errors import IndicoInputError, IndicoTimeoutError from indico.filters import SubmissionFilter from indico.queries import JobStatus @@ -358,6 +357,7 @@ class SubmissionResult(RequestChain): and wait for the result file to be generated. Defaults to False timeout (int or float, optional): Maximum number of seconds to wait before timing out. Ignored if not `wait`. Defaults to 30 + request_interval (int or float, optional): The maximum time in between retry calls when waiting. Defaults to 5 seconds. Returns: Job: A Job that can be watched for results @@ -378,12 +378,14 @@ def __init__( check_status: str = None, wait: bool = False, timeout: Union[int, float] = 30, + request_interval: Union[int, float] = 5, ): self.submission_id = ( submission if isinstance(submission, int) else submission.id ) self.wait = wait self.timeout = timeout + self.request_interval = request_interval if check_status and check_status.upper() not in VALID_SUBMISSION_STATUSES: raise IndicoInputError( f"{check_status} is not one of valid submission statuses: " @@ -403,7 +405,7 @@ def requests(self) -> Union[Job, str]: while not self.status_check(self.previous.status): timer.check() yield GetSubmission(self.submission_id) - time.sleep(1) + yield Delay(seconds=self.request_interval) if not self.status_check(self.previous.status): raise IndicoTimeoutError(timer.elapsed) elif not self.status_check(self.previous.status): @@ -526,7 +528,8 @@ def __init__(self, submission_id: int): def process_response(self, response) -> List[SubmissionReviewFull]: return [ - SubmissionReviewFull(**review) for review in super().process_response(response)["submission"]["reviews"] + SubmissionReviewFull(**review) + for review in super().process_response(response)["submission"]["reviews"] ] diff --git a/indico/queries/workflow.py b/indico/queries/workflow.py index 6e698c70..73c03dde 100644 --- a/indico/queries/workflow.py +++ b/indico/queries/workflow.py @@ -3,7 +3,7 @@ import tempfile from typing import Dict, List, Union -from indico.client.request import Debouncer, GraphQLRequest, RequestChain +from indico.client.request import Delay, GraphQLRequest, RequestChain from indico.errors import IndicoError, IndicoInputError from indico.queries.storage import UploadBatched, UploadDocument from indico.types import SUBMISSION_RESULT_VERSIONS, Job, Submission, Workflow @@ -486,11 +486,9 @@ def requests(self): yield _AddDataToWorkflow(self.workflow_id) if self.wait: - debouncer = Debouncer() - while self.previous.status != "COMPLETE": yield GetWorkflow(workflow_id=self.workflow_id) - debouncer.backoff() + yield Delay() class CreateWorkflow(GraphQLRequest):