diff --git a/io_util.py b/io_util.py index efb8f38..67d9049 100644 --- a/io_util.py +++ b/io_util.py @@ -217,10 +217,8 @@ def delete_input_file(input_file: str, source_id: str, actually_delete: bool) -> return True # return True even if empty dirs were not removed -def obtain_input_file(handler, doc: Document) -> VisXPFeatureExtractionInput: - # first fetch and validate the obtained S3 URI - # TODO make sure this is a valid S3 URI - s3_uri = _fetch_visxp_prep_s3_uri(handler, doc) +def obtain_input_file(s3_uri: str) -> VisXPFeatureExtractionInput: + if not validate_s3_uri(s3_uri): return VisXPFeatureExtractionInput(500, f"Invalid S3 URI: {s3_uri}") @@ -231,9 +229,11 @@ def obtain_input_file(handler, doc: Document) -> VisXPFeatureExtractionInput: # TODO download the content into get_download_dir() s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL) bucket, object_name = parse_s3_uri(s3_uri) + logger.info(f"OBJECT NAME: {object_name}") input_file_path = os.path.join( get_download_dir(), - os.path.basename(object_name), # source_id is part of the object_name + source_id, + os.path.basename(object_name), # i.e. visxp_prep__.tar.gz ) success = s3.download_file(bucket, object_name, output_folder) if success: @@ -258,7 +258,7 @@ def obtain_input_file(handler, doc: Document) -> VisXPFeatureExtractionInput: return VisXPFeatureExtractionInput(500, f"Failed to download: {s3_uri}") -def _fetch_visxp_prep_s3_uri(handler, doc: Document) -> str: +def fetch_visxp_prep_s3_uri(handler, doc: Document) -> str: logger.info("checking download worker output") possibles = handler.searchResult(doc._id, DANE_VISXP_PREP_TASK_KEY) logger.info(possibles) diff --git a/worker.py b/worker.py index ebc29e9..10a8def 100644 --- a/worker.py +++ b/worker.py @@ -7,10 +7,12 @@ from dane import Document, Task, Result from dane.base_classes import base_worker from dane.config import cfg +from dane.s3_util import validate_s3_uri from models import CallbackResponse, OutputType, Provenance, VisXPFeatureExtractionInput from io_util import ( generate_output_dirs, get_source_id_from_tar, + fetch_visxp_prep_s3_uri, obtain_input_file, get_base_output_dir, get_download_dir, @@ -35,13 +37,18 @@ # triggered by running: python worker.py --run-test-file def process_configured_input_file() -> bool: logger.info("Triggered processing of configured VISXP_EXTRACT.TEST_INPUT_PATH") - feature_extraction_input = VisXPFeatureExtractionInput( - 200, - f"Thank you for running us: let's test {cfg.VISXP_EXTRACT.TEST_INPUT_PATH}", - get_source_id_from_tar(cfg.VISXP_EXTRACT.TEST_INPUT_PATH), - cfg.VISXP_EXTRACT.TEST_INPUT_PATH, - None, # no provenance needed in test - ) + + # S3 URI is also allowed for VISXP_EXTRACT.TEST_INPUT_PATH + if validate_s3_uri(cfg.VISXP_EXTRACT.TEST_INPUT_PATH): + feature_extraction_input = obtain_input_file(cfg.VISXP_EXTRACT.TEST_INPUT_PATH) + else: + feature_extraction_input = VisXPFeatureExtractionInput( + 200, + f"Thank you for running us: let's test {cfg.VISXP_EXTRACT.TEST_INPUT_PATH}", + get_source_id_from_tar(cfg.VISXP_EXTRACT.TEST_INPUT_PATH), + cfg.VISXP_EXTRACT.TEST_INPUT_PATH, + None, # no provenance needed in test + ) # first generate the output dirs generate_output_dirs(feature_extraction_input.source_id) @@ -178,9 +185,16 @@ def callback(self, task: Task, doc: Document) -> CallbackResponse: output_data={}, ) - # obtain the input file - # TODO make sure to download the output from S3 - feature_extraction_input = obtain_input_file(self.handler, doc) + # fetch s3 uri of visxp_prep data: + s3_uri = fetch_visxp_prep_s3_uri(self.handler, doc) + if not s3_uri: + return { + "state": 404, + "message": "Could not find VISXP_PREP data", + } + + # download the VISXP_PREP data via the S3 URI + feature_extraction_input = obtain_input_file(s3_uri) if not feature_extraction_input.state == 200: return { "state": feature_extraction_input.state,