Skip to content

Commit

Permalink
ENH: Set question UUIDs in advance in Child.ask_multiple
Browse files Browse the repository at this point in the history
skipci
  • Loading branch information
cortadocodes committed Jun 7, 2024
1 parent e708c19 commit ea51c97
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
2 changes: 1 addition & 1 deletion octue/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ServiceConfiguration:
:param str app_source_path: the path to the directory containing the app's source code
:param str twine_path: the path to the twine file defining the schema for input, output, and configuration data for the service
:param str|None app_configuration_path: the path to the app configuration file containing configuration data for the service; if this is `None`, the default application configuration is used
:param str|None diagnostics_cloud_path: the path to a cloud directory to store diagnostics (this includes the configuration, input values and manifest, and logs)
:param str|None diagnostics_cloud_path: the path to a cloud directory to store diagnostics (this includes the configuration, input values and manifest, and logs for each question)
:param iter(dict)|None service_registries: the names and endpoints of the registries used to resolve service revisions when asking questions; these should be in priority order (highest priority first)
:param str|None directory: if provided, find the app source, twine, and app configuration relative to this directory
:return None:
Expand Down
20 changes: 16 additions & 4 deletions octue/resources/child.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import logging
import os
import uuid

from octue.cloud.pub_sub.service import Service
from octue.resources import service_backends
Expand Down Expand Up @@ -162,9 +163,14 @@ def ask_multiple(
logger.info("Asking %d questions.", n_questions)

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_question_index_mapping = {
executor.submit(self.ask, **question): i for i, question in enumerate(questions)
}
future_to_question_index_mapping = {}

for i, question in enumerate(questions):
if "question_uuid" not in question:
question["question_uuid"] = str(uuid.uuid4())

future = executor.submit(self.ask, **question)
future_to_question_index_mapping[future] = i

for i, future in enumerate(concurrent.futures.as_completed(future_to_question_index_mapping)):
logger.info("%d of %d answers received.", i + 1, n_questions)
Expand All @@ -177,7 +183,13 @@ def ask_multiple(
raise e

answers[question_index] = e
logger.exception("Question %d failed.", question_index)

logger.exception(
"Question %d failed. Run 'octue get-diagnostics gs://<diagnostics-cloud-path>/%s "
"--download-datasets' to get the crash diagnostics.",
question_index,
questions[question_index]["question_uuid"],
)

for retry in range(max_retries):
failed_questions = self._get_failed_questions(
Expand Down
2 changes: 1 addition & 1 deletion octue/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Runner:
:param str|dict|None configuration_manifest: The strand data. Can be expressed as a string path of a *.json file (relative or absolute), as an open file-like object (containing json data), as a string of json data or as an already-parsed dict.
:param str|list(dict)|None children: The children strand data. Can be expressed as a string path of a *.json file (relative or absolute), as an open file-like object (containing json data), as a string of json data or as an already-parsed dict.
:param str|None output_location: the path to a cloud directory to save output datasets at
:param str|None diagnostics_cloud_path: the path to a cloud directory to store diagnostics in the event that the service fails while processing a question (this includes the configuration, input values and manifest, and logs)
:param str|None diagnostics_cloud_path: the path to a cloud directory to store diagnostics (this includes the configuration, input values and manifest, and logs for each question)
:param str|None project_name: name of Google Cloud project to get credentials from
:param str|None service_id: the ID of the service being run
:param bool delete_local_files: if `True`, delete any files downloaded during the call to `Runner.run` once the analysis has finished
Expand Down

0 comments on commit ea51c97

Please sign in to comment.