Skip to content

Commit

Permalink
now TEST_INPUT_FILE can be an S3 URI as well; tested this; also found…
Browse files Browse the repository at this point in the history
… a bug, so worker mode should be fine now
  • Loading branch information
jblom committed Nov 6, 2023
1 parent c671819 commit 5cce9f1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 16 deletions.
12 changes: 6 additions & 6 deletions io_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,8 @@ def delete_input_file(input_file: str, source_id: str, actually_delete: bool) ->
return True # return True even if empty dirs were not removed


def obtain_input_file(handler, doc: Document) -> VisXPFeatureExtractionInput:
# first fetch and validate the obtained S3 URI
# TODO make sure this is a valid S3 URI
s3_uri = _fetch_visxp_prep_s3_uri(handler, doc)
def obtain_input_file(s3_uri: str) -> VisXPFeatureExtractionInput:

if not validate_s3_uri(s3_uri):
return VisXPFeatureExtractionInput(500, f"Invalid S3 URI: {s3_uri}")

Expand All @@ -231,9 +229,11 @@ def obtain_input_file(handler, doc: Document) -> VisXPFeatureExtractionInput:
# TODO download the content into get_download_dir()
s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL)
bucket, object_name = parse_s3_uri(s3_uri)
logger.info(f"OBJECT NAME: {object_name}")
input_file_path = os.path.join(
get_download_dir(),
os.path.basename(object_name), # source_id is part of the object_name
source_id,
os.path.basename(object_name), # i.e. visxp_prep__<source_id>.tar.gz
)
success = s3.download_file(bucket, object_name, output_folder)
if success:
Expand All @@ -258,7 +258,7 @@ def obtain_input_file(handler, doc: Document) -> VisXPFeatureExtractionInput:
return VisXPFeatureExtractionInput(500, f"Failed to download: {s3_uri}")


def _fetch_visxp_prep_s3_uri(handler, doc: Document) -> str:
def fetch_visxp_prep_s3_uri(handler, doc: Document) -> str:
logger.info("checking download worker output")
possibles = handler.searchResult(doc._id, DANE_VISXP_PREP_TASK_KEY)
logger.info(possibles)
Expand Down
34 changes: 24 additions & 10 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from dane import Document, Task, Result
from dane.base_classes import base_worker
from dane.config import cfg
from dane.s3_util import validate_s3_uri
from models import CallbackResponse, OutputType, Provenance, VisXPFeatureExtractionInput
from io_util import (
generate_output_dirs,
get_source_id_from_tar,
fetch_visxp_prep_s3_uri,
obtain_input_file,
get_base_output_dir,
get_download_dir,
Expand All @@ -35,13 +37,18 @@
# triggered by running: python worker.py --run-test-file
def process_configured_input_file() -> bool:
logger.info("Triggered processing of configured VISXP_EXTRACT.TEST_INPUT_PATH")
feature_extraction_input = VisXPFeatureExtractionInput(
200,
f"Thank you for running us: let's test {cfg.VISXP_EXTRACT.TEST_INPUT_PATH}",
get_source_id_from_tar(cfg.VISXP_EXTRACT.TEST_INPUT_PATH),
cfg.VISXP_EXTRACT.TEST_INPUT_PATH,
None, # no provenance needed in test
)

# S3 URI is also allowed for VISXP_EXTRACT.TEST_INPUT_PATH
if validate_s3_uri(cfg.VISXP_EXTRACT.TEST_INPUT_PATH):
feature_extraction_input = obtain_input_file(cfg.VISXP_EXTRACT.TEST_INPUT_PATH)
else:
feature_extraction_input = VisXPFeatureExtractionInput(
200,
f"Thank you for running us: let's test {cfg.VISXP_EXTRACT.TEST_INPUT_PATH}",
get_source_id_from_tar(cfg.VISXP_EXTRACT.TEST_INPUT_PATH),
cfg.VISXP_EXTRACT.TEST_INPUT_PATH,
None, # no provenance needed in test
)

# first generate the output dirs
generate_output_dirs(feature_extraction_input.source_id)
Expand Down Expand Up @@ -178,9 +185,16 @@ def callback(self, task: Task, doc: Document) -> CallbackResponse:
output_data={},
)

# obtain the input file
# TODO make sure to download the output from S3
feature_extraction_input = obtain_input_file(self.handler, doc)
# fetch s3 uri of visxp_prep data:
s3_uri = fetch_visxp_prep_s3_uri(self.handler, doc)
if not s3_uri:
return {
"state": 404,
"message": "Could not find VISXP_PREP data",
}

# download the VISXP_PREP data via the S3 URI
feature_extraction_input = obtain_input_file(s3_uri)
if not feature_extraction_input.state == 200:
return {
"state": feature_extraction_input.state,
Expand Down

0 comments on commit 5cce9f1

Please sign in to comment.