Skip to content

Commit

Permalink
move pipeline specific I/O to io_util.apply_desired_io_on_output
Browse files Browse the repository at this point in the history
  • Loading branch information
jblom committed Nov 1, 2023
1 parent ef5fd8a commit 2e22cd1
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 57 deletions.
59 changes: 55 additions & 4 deletions io_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,61 @@
from dane import Document
from dane.config import cfg
from dane.s3_util import S3Store, parse_s3_uri, validate_s3_uri
from models import Provenance
from models import CallbackResponse, Provenance, VisXPFeatureExtractionOutput


logger = logging.getLogger(__name__)
DANE_VISXP_PREP_TASK_KEY = "VISXP_PREP"


# assesses the output and makes sure input & output is handled properly
def apply_desired_io_on_output(
input_file: str,
proc_result: VisXPFeatureExtractionOutput,
delete_input_on_completion: bool,
delete_output_on_completetion: bool,
transfer_output_on_completion: bool,
) -> CallbackResponse:
# step 2: raise exception on failure
if proc_result.state != 200:
logger.error(f"Could not process the input properly: {proc_result.message}")
# something went wrong inside the VisXP work processor, return that response here
return {"state": proc_result.state, "message": proc_result.message}

# step 3: process returned successfully, generate the output
source_id = get_source_id(
input_file
) # TODO: this worker does not necessarily work per source, so consider how to capture output group
output_path = get_base_output_dir(source_id) # TODO actually make sure this works

# step 4: transfer the output to S3 (if configured so)
transfer_success = True
if transfer_output_on_completion:
transfer_success = transfer_output(source_id)

# failure of transfer, impedes the workflow, so return error
if not transfer_success:
return {
"state": 500,
"message": "Failed to transfer output to S3",
}

# clear the output files (if configured so)
if delete_output_on_completetion:
delete_success = delete_local_output(source_id)
if not delete_success:
# NOTE: just a warning for now, but one to keep an eye out for
logger.warning(f"Could not delete output files: {output_path}")

if delete_input_on_completion:
logger.warning("Deletion of input not supported yet")

return {
"state": 200,
"message": "Successfully generated VisXP features to be used for similarity search",
}


# returns the basename of the input path
# throughout processing this is then used as a unique ID
# to keep track of the input/output
Expand Down Expand Up @@ -97,6 +145,7 @@ def obtain_input_file(
handler, doc: Document
) -> Tuple[Optional[str], Optional[Provenance]]:
# first fetch and validate the obtained S3 URI
# TODO make sure this is a valid S3 URI
s3_uri = _fetch_visxp_prep_s3_uri(handler, doc)
if not validate_s3_uri(s3_uri):
return None, None
Expand All @@ -107,18 +156,20 @@ def obtain_input_file(
# TODO download the content into get_download_dir()
s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL)
bucket, object_name = parse_s3_uri(s3_uri)
output_file = os.path.join(output_folder, os.path.basename(object_name))
input_file_path = os.path.join(output_folder, os.path.basename(object_name))
success = s3.download_file(bucket, object_name, output_folder)
if success:
# TODO uncompress the visxp_prep.tar.gz

download_provenance = Provenance(
activity_name="download",
activity_description="Download VISXP_PREP data",
start_time_unix=start_time,
processing_time_ms=time() - start_time,
input_data={},
output_data={"file_path": output_file},
output_data={"file_path": input_file_path},
)
return output_file, download_provenance
return input_file_path, download_provenance
logger.error("Failed to download VISXP_PREP data from S3")
return None, None

Expand Down
76 changes: 23 additions & 53 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
from dane.config import cfg
from models import CallbackResponse, Provenance
from io_util import (
transfer_output,
apply_desired_io_on_output,
obtain_input_file,
delete_local_output,
get_base_output_dir,
get_source_id,
get_download_dir,
Expand Down Expand Up @@ -140,16 +139,15 @@ def callback(self, task: Task, doc: Document) -> CallbackResponse:

# obtain the input file
# TODO make sure to download the output from S3
output_file_path, download_provenance = obtain_input_file(self.handler, doc)
if not output_file_path:
input_file_path, download_provenance = obtain_input_file(self.handler, doc)
if not input_file_path:
return {
"state": 500,
"message": "Could not download the input from S3",
}
if download_provenance and provenance.steps:
provenance.steps.append(download_provenance)

input_file_path = output_file_path
output_path = "TODO" # TODO think of this

# step 1: apply model to extract features
Expand All @@ -160,57 +158,29 @@ def callback(self, task: Task, doc: Document) -> CallbackResponse:
output_path=output_path,
)

# step 2: raise exception on failure
if proc_result.state != 200:
logger.error(f"Could not process the input properly: {proc_result.message}")
# something went wrong inside the VisXP work processor, return that response here
return {"state": proc_result.state, "message": proc_result.message}

if proc_result.provenance:
if not provenance.steps:
provenance.steps = []
if proc_result.provenance and provenance.steps:
provenance.steps.append(proc_result.provenance)

# step 3: process returned successfully, generate the output
input_file = "*"
source_id = get_source_id(
input_file
) # TODO: this worker does not necessarily work per source, so consider how to capture output group

# step 4: transfer the output to S3 (if configured so)
transfer_success = True
if self.TRANSFER_OUTPUT_ON_COMPLETION:
transfer_success = transfer_output(source_id)

if (
not transfer_success
): # failure of transfer, impedes the workflow, so return error
return {
"state": 500,
"message": "Failed to transfer output to S3",
}

# step 5: clear the output files (if configured so)
delete_success = True
if self.DELETE_OUTPUT_ON_COMPLETION:
delete_success = delete_local_output(source_id)

if (
not delete_success
): # NOTE: just a warning for now, but one to keep an EYE out for
logger.warning(f"Could not delete output files: {output_path}")

# step 6: save the results back to the DANE index
self.save_to_dane_index(
doc,
task,
get_s3_base_url(source_id),
provenance=provenance,
validated_output: CallbackResponse = apply_desired_io_on_output(
input_file_path,
proc_result,
self.DELETE_INPUT_ON_COMPLETION,
self.DELETE_OUTPUT_ON_COMPLETION,
self.TRANSFER_OUTPUT_ON_COMPLETION,
)
return {
"state": 200,
"message": "Successfully generated VisXP data for the next worker",
}

if validated_output.get("state", 500) == 200:
logger.info(
"applying IO on output went well, now finally saving to DANE index"
)
# Lastly save the results back to the DANE index
self.save_to_dane_index(
doc,
task,
get_s3_base_url(get_source_id(input_file_path)),
provenance=provenance,
)
return validated_output

# TODO adapt to VisXP
def save_to_dane_index(
Expand Down

0 comments on commit 2e22cd1

Please sign in to comment.