diff --git a/.dockerignore b/.dockerignore index 2a70125..6f4230b 100644 --- a/.dockerignore +++ b/.dockerignore @@ -11,4 +11,5 @@ .mypy_cache .pytest_cache .coverage -__pycache__ \ No newline at end of file +__pycache__ +s3-creds.env \ No newline at end of file diff --git a/.gitignore b/.gitignore index d2c1cd1..9421911 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ __pycache__ .coverage /model/* !/model/README.md -/tests/data/data.provenance \ No newline at end of file +/tests/data/data.provenance +s3-creds.env \ No newline at end of file diff --git a/data/input-files/testob/visxp_prep__testob.tar.gz b/data/input-files/testob/visxp_prep__testob.tar.gz deleted file mode 100644 index e4de141..0000000 Binary files a/data/input-files/testob/visxp_prep__testob.tar.gz and /dev/null differ diff --git a/docker-compose.yml b/docker-compose.yml index 5d206c6..01f7611 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,8 @@ services: - ./config:/root/.DANE container_name: visxp_worker_2 command: --run-test-file # NOTE: comment this line to spin up th worker + env_file: + - s3-creds.env logging: options: max-size: 20m diff --git a/feature_extraction.py b/feature_extraction.py index 97adf7b..51fdace 100644 --- a/feature_extraction.py +++ b/feature_extraction.py @@ -39,13 +39,17 @@ def run( device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") logger.info(f"Device is: {device}") + # Step 2: verify the input file's existence input_file_path = feature_extraction_input.input_file_path - source_id = feature_extraction_input.source_id + if not os.path.exists(input_file_path): + logger.error(f"Input path does not exist {input_file_path}") + return None - # Step 1: this is the "processing ID" if you will + # This is the "processing ID" if you will + source_id = feature_extraction_input.source_id logger.info(f"Extracting features for: {source_id}.") - # Step 2: check the type of input (tar.gz vs a directory) + # Step 3: check the type of input (tar.gz vs a directory) if input_file_path.find(".tar.gz") != -1: logger.info("Input is an archive, uncompressing it") untar_input_file(input_file_path) # extracts contents in same dir @@ -54,12 +58,12 @@ def run( ) # change the input path to the parent dir logger.info(f"Changed input_file_path to: {input_file_path}") - # Step 3: Load spectograms + keyframes from file & preprocess + # Step 4: Load spectograms + keyframes from file & preprocess dataset = VisXPData( Path(input_file_path), model_config_file=model_config_file, device=device ) - # Step 4: Load model from file + # Step 5: Load model from file model = load_model_from_file( checkpoint_file=model_path, config_file=model_config_file, @@ -68,7 +72,7 @@ def run( # Switch model mode: in training mode, model layers behave differently! model.eval() - # Step 5: Apply model to data + # Step 6: Apply model to data logger.info(f"Going to extract features for {dataset.__len__()} items. ") result_list = [] @@ -76,7 +80,7 @@ def run( batch_result = apply_model(batch=batch, model=model, device=device) result_list.append(batch_result) - # concatenate results and save to file + # Step 7: concatenate results and save to file result = torch.cat(result_list) file_saved = _save_features_to_file(result, output_file_path) diff --git a/io_util.py b/io_util.py index 1ccc26f..efb8f38 100644 --- a/io_util.py +++ b/io_util.py @@ -1,9 +1,10 @@ import logging import os from pathlib import Path +import shutil import tarfile from time import time -from typing import Dict +from typing import Dict, List from dane import Document from dane.config import cfg @@ -19,7 +20,11 @@ DANE_VISXP_PREP_TASK_KEY = "VISXP_PREP" OUTPUT_FILE_BASE_NAME = "visxp_features" INPUT_FILE_BASE_NAME = "visxp_prep" -INPUT_FILE_EXTENSION = ".tar.gz" +TAR_GZ_EXTENSION = ".tar.gz" +S3_OUTPUT_TYPES: List[OutputType] = [ + OutputType.FEATURES, + OutputType.PROVENANCE, +] # only upload this output to S3 # for each OutputType a subdir is created inside the base output dir @@ -42,6 +47,14 @@ def get_base_output_dir(source_id: str = "") -> str: return os.path.join(*path_elements) +# output file name of the final tar.gz that will be uploaded to S3 +def get_archive_file_path(source_id: str) -> str: + return os.path.join( + get_base_output_dir(source_id), + f"{OUTPUT_FILE_BASE_NAME}__{source_id}{TAR_GZ_EXTENSION}", + ) + + def get_output_file_name(source_id: str, output_type: OutputType) -> str: output_file_name = "" match output_type: @@ -80,7 +93,7 @@ def get_s3_output_file_uri(source_id: str, output_type: OutputType) -> str: def get_source_id_from_tar(input_path: str) -> str: fn = os.path.basename(input_path) tmp = fn.split("__") - source_id = tmp[1][: -len(INPUT_FILE_EXTENSION)] + source_id = tmp[1][: -len(TAR_GZ_EXTENSION)] logger.info(f"Using source_id: {source_id}") return source_id @@ -94,22 +107,84 @@ def source_id_from_s3_uri(s3_uri: str) -> str: def delete_local_output(source_id: str) -> bool: - # TODO: implement + output_dir = get_base_output_dir(source_id) + logger.info(f"Deleting output folder: {output_dir}") + if output_dir == os.sep or output_dir == ".": + logger.warning(f"Rejected deletion of: {output_dir}") + return False + + if not _is_valid_visxp_output(output_dir): + logger.warning( + f"Tried to delete a dir that did not contain VisXP output: {output_dir}" + ) + return False + + try: + shutil.rmtree(output_dir) + logger.info(f"Cleaned up folder {output_dir}") + except Exception: + logger.exception(f"Failed to delete output dir {output_dir}") + return False return True -def transfer_output(output_dir: str) -> bool: - logger.warning(f"Transferring {output_dir} to S3 requires implementation") - # TODO: implement +# TODO implement some more, now checks presence of provenance dir +def _is_valid_visxp_output(output_dir: str) -> bool: + return os.path.exists(os.path.join(output_dir, OutputType.PROVENANCE.value)) + + +def _validate_transfer_config() -> bool: + if any( + [ + not x + for x in [ + cfg.OUTPUT.S3_ENDPOINT_URL, + cfg.OUTPUT.S3_BUCKET, + cfg.OUTPUT.S3_FOLDER_IN_BUCKET, + ] + ] + ): + logger.warning( + "TRANSFER_ON_COMPLETION configured without all the necessary S3 settings" + ) + return False + return True + + +# compresses all desired output dirs into a single tar and uploads it to S3 +def transfer_output(source_id: str) -> bool: + output_dir = get_base_output_dir(source_id) + logger.info(f"Transferring {output_dir} to S3 (asset={source_id})") + if not _validate_transfer_config(): + return False + + s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL) + file_list = [os.path.join(output_dir, ot.value) for ot in S3_OUTPUT_TYPES] + tar_file = get_archive_file_path(source_id) + + success = s3.transfer_to_s3( + cfg.OUTPUT.S3_BUCKET, + os.path.join( + cfg.OUTPUT.S3_FOLDER_IN_BUCKET, source_id + ), # assets/__ + file_list, # this list of subdirs will be compressed into the tar below + tar_file, # this file will be uploaded + ) + if not success: + logger.error(f"Failed to upload: {tar_file}") + return False return True -def get_download_dir(): +def get_download_dir() -> str: return os.path.join(cfg.FILE_SYSTEM.BASE_MOUNT, cfg.FILE_SYSTEM.INPUT_DIR) -# NOTE: untested -def delete_input_file(input_file: str, actually_delete: bool) -> bool: +def get_base_input_dir(source_id: str) -> str: + return os.path.join(get_download_dir(), source_id) + + +def delete_input_file(input_file: str, source_id: str, actually_delete: bool) -> bool: logger.info(f"Verifying deletion of input file: {input_file}") if actually_delete is False: logger.info("Configured to leave the input alone, skipping deletion") @@ -118,18 +193,22 @@ def delete_input_file(input_file: str, actually_delete: bool) -> bool: # first remove the input file try: os.remove(input_file) - logger.info(f"Deleted VisXP input file: {input_file}") + logger.info(f"Deleted VisXP input tar file: {input_file}") except OSError: logger.exception("Could not delete input file") return False - # now remove the "chunked path" from /mnt/dane-fs/input-files/03/d2/8a/03d28a03643a981284b403b91b95f6048576c234/xyz.mp4 + # now remove the folders that were extracted from the input tar file + base_input_dir = get_base_input_dir(source_id) try: - os.chdir(get_download_dir()) # cd /mnt/dane-fs/input-files - os.removedirs( - f".{input_file[len(get_download_dir()):input_file.rfind(os.sep)]}" - ) # /03/d2/8a/03d28a03643a981284b403b91b95f6048576c234 - logger.info("Deleted empty input dirs too") + for root, dirs, files in os.walk(base_input_dir): + for d in dirs: + dir_path = os.path.join(root, d) + logger.info(f"Deleting {dir_path}") + shutil.rmtree(dir_path) + logger.info("Deleted extracted input dirs") + os.removedirs(base_input_dir) + logger.info(f"Finally deleted the base_input_dir: {base_input_dir}") except OSError: logger.exception("OSError while removing empty input file dirs") except FileNotFoundError: @@ -145,13 +224,17 @@ def obtain_input_file(handler, doc: Document) -> VisXPFeatureExtractionInput: if not validate_s3_uri(s3_uri): return VisXPFeatureExtractionInput(500, f"Invalid S3 URI: {s3_uri}") + source_id = source_id_from_s3_uri(s3_uri) start_time = time() - output_folder = get_download_dir() + output_folder = get_base_input_dir(source_id) # TODO download the content into get_download_dir() s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL) bucket, object_name = parse_s3_uri(s3_uri) - input_file_path = os.path.join(output_folder, os.path.basename(object_name)) + input_file_path = os.path.join( + get_download_dir(), + os.path.basename(object_name), # source_id is part of the object_name + ) success = s3.download_file(bucket, object_name, output_folder) if success: # TODO uncompress the visxp_prep.tar.gz diff --git a/main_data_processor.py b/main_data_processor.py index 61c2c9a..c130117 100644 --- a/main_data_processor.py +++ b/main_data_processor.py @@ -8,6 +8,7 @@ get_output_file_path, transfer_output, delete_local_output, + delete_input_file, ) from models import ( CallbackResponse, @@ -21,7 +22,9 @@ logger = logging.getLogger(__name__) -def extract_visual_features(feature_extraction_input: VisXPFeatureExtractionInput): +def extract_visual_features( + feature_extraction_input: VisXPFeatureExtractionInput, +) -> VisXPFeatureExtractionOutput: logger.info("Starting VisXP visual feature extraction") start_time = time() feature_extraction_provenance = feature_extraction.run( @@ -88,8 +91,16 @@ def apply_desired_io_on_output( # NOTE: just a warning for now, but one to keep an eye out for logger.warning(f"Could not delete output files: {output_path}") - if delete_input_on_completion: - logger.warning("Deletion of input not supported yet") + # step 8: clean the input file (if configured so) + if not delete_input_file( + feature_extraction_input.input_file_path, + feature_extraction_input.source_id, + delete_input_on_completion, + ): + return { + "state": 500, + "message": "Generated VISXP_PREP output, but could not delete the input file", + } return { "state": 200,