Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented and tested transfer to S3 and delete output #22

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
.mypy_cache
.pytest_cache
.coverage
__pycache__
__pycache__
s3-creds.env
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ __pycache__
.coverage
/model/*
!/model/README.md
/tests/data/data.provenance
/tests/data/data.provenance
s3-creds.env
Binary file removed data/input-files/testob/visxp_prep__testob.tar.gz
Binary file not shown.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ services:
- ./config:/root/.DANE
container_name: visxp_worker_2
command: --run-test-file # NOTE: comment this line to spin up th worker
env_file:
- s3-creds.env
logging:
options:
max-size: 20m
Expand Down
18 changes: 11 additions & 7 deletions feature_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ def run(
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
logger.info(f"Device is: {device}")

# Step 2: verify the input file's existence
input_file_path = feature_extraction_input.input_file_path
source_id = feature_extraction_input.source_id
if not os.path.exists(input_file_path):
logger.error(f"Input path does not exist {input_file_path}")
return None

# Step 1: this is the "processing ID" if you will
# This is the "processing ID" if you will
source_id = feature_extraction_input.source_id
logger.info(f"Extracting features for: {source_id}.")

# Step 2: check the type of input (tar.gz vs a directory)
# Step 3: check the type of input (tar.gz vs a directory)
if input_file_path.find(".tar.gz") != -1:
logger.info("Input is an archive, uncompressing it")
untar_input_file(input_file_path) # extracts contents in same dir
Expand All @@ -54,12 +58,12 @@ def run(
) # change the input path to the parent dir
logger.info(f"Changed input_file_path to: {input_file_path}")

# Step 3: Load spectograms + keyframes from file & preprocess
# Step 4: Load spectograms + keyframes from file & preprocess
dataset = VisXPData(
Path(input_file_path), model_config_file=model_config_file, device=device
)

# Step 4: Load model from file
# Step 5: Load model from file
model = load_model_from_file(
checkpoint_file=model_path,
config_file=model_config_file,
Expand All @@ -68,15 +72,15 @@ def run(
# Switch model mode: in training mode, model layers behave differently!
model.eval()

# Step 5: Apply model to data
# Step 6: Apply model to data
logger.info(f"Going to extract features for {dataset.__len__()} items. ")

result_list = []
for i, batch in enumerate(dataset.batches(batch_size=256)):
batch_result = apply_model(batch=batch, model=model, device=device)
result_list.append(batch_result)

# concatenate results and save to file
# Step 7: concatenate results and save to file
result = torch.cat(result_list)
file_saved = _save_features_to_file(result, output_file_path)

Expand Down
121 changes: 102 additions & 19 deletions io_util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import os
from pathlib import Path
import shutil
import tarfile
from time import time
from typing import Dict
from typing import Dict, List

from dane import Document
from dane.config import cfg
Expand All @@ -19,7 +20,11 @@
DANE_VISXP_PREP_TASK_KEY = "VISXP_PREP"
OUTPUT_FILE_BASE_NAME = "visxp_features"
INPUT_FILE_BASE_NAME = "visxp_prep"
INPUT_FILE_EXTENSION = ".tar.gz"
TAR_GZ_EXTENSION = ".tar.gz"
S3_OUTPUT_TYPES: List[OutputType] = [
OutputType.FEATURES,
OutputType.PROVENANCE,
] # only upload this output to S3


# for each OutputType a subdir is created inside the base output dir
Expand All @@ -42,6 +47,14 @@ def get_base_output_dir(source_id: str = "") -> str:
return os.path.join(*path_elements)


# output file name of the final tar.gz that will be uploaded to S3
def get_archive_file_path(source_id: str) -> str:
return os.path.join(
get_base_output_dir(source_id),
f"{OUTPUT_FILE_BASE_NAME}__{source_id}{TAR_GZ_EXTENSION}",
)


def get_output_file_name(source_id: str, output_type: OutputType) -> str:
output_file_name = ""
match output_type:
Expand Down Expand Up @@ -80,7 +93,7 @@ def get_s3_output_file_uri(source_id: str, output_type: OutputType) -> str:
def get_source_id_from_tar(input_path: str) -> str:
fn = os.path.basename(input_path)
tmp = fn.split("__")
source_id = tmp[1][: -len(INPUT_FILE_EXTENSION)]
source_id = tmp[1][: -len(TAR_GZ_EXTENSION)]
logger.info(f"Using source_id: {source_id}")
return source_id

Expand All @@ -94,22 +107,84 @@ def source_id_from_s3_uri(s3_uri: str) -> str:


def delete_local_output(source_id: str) -> bool:
# TODO: implement
output_dir = get_base_output_dir(source_id)
logger.info(f"Deleting output folder: {output_dir}")
if output_dir == os.sep or output_dir == ".":
logger.warning(f"Rejected deletion of: {output_dir}")
return False

if not _is_valid_visxp_output(output_dir):
logger.warning(
f"Tried to delete a dir that did not contain VisXP output: {output_dir}"
)
return False

try:
shutil.rmtree(output_dir)
logger.info(f"Cleaned up folder {output_dir}")
except Exception:
logger.exception(f"Failed to delete output dir {output_dir}")
return False
return True


def transfer_output(output_dir: str) -> bool:
logger.warning(f"Transferring {output_dir} to S3 requires implementation")
# TODO: implement
# TODO implement some more, now checks presence of provenance dir
def _is_valid_visxp_output(output_dir: str) -> bool:
return os.path.exists(os.path.join(output_dir, OutputType.PROVENANCE.value))


def _validate_transfer_config() -> bool:
if any(
[
not x
for x in [
cfg.OUTPUT.S3_ENDPOINT_URL,
cfg.OUTPUT.S3_BUCKET,
cfg.OUTPUT.S3_FOLDER_IN_BUCKET,
]
]
):
logger.warning(
"TRANSFER_ON_COMPLETION configured without all the necessary S3 settings"
)
return False
return True


# compresses all desired output dirs into a single tar and uploads it to S3
def transfer_output(source_id: str) -> bool:
output_dir = get_base_output_dir(source_id)
logger.info(f"Transferring {output_dir} to S3 (asset={source_id})")
if not _validate_transfer_config():
return False

s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL)
file_list = [os.path.join(output_dir, ot.value) for ot in S3_OUTPUT_TYPES]
tar_file = get_archive_file_path(source_id)

success = s3.transfer_to_s3(
cfg.OUTPUT.S3_BUCKET,
os.path.join(
cfg.OUTPUT.S3_FOLDER_IN_BUCKET, source_id
), # assets/<program ID>__<carrier ID>
file_list, # this list of subdirs will be compressed into the tar below
tar_file, # this file will be uploaded
)
if not success:
logger.error(f"Failed to upload: {tar_file}")
return False
return True


def get_download_dir():
def get_download_dir() -> str:
return os.path.join(cfg.FILE_SYSTEM.BASE_MOUNT, cfg.FILE_SYSTEM.INPUT_DIR)


# NOTE: untested
def delete_input_file(input_file: str, actually_delete: bool) -> bool:
def get_base_input_dir(source_id: str) -> str:
return os.path.join(get_download_dir(), source_id)


def delete_input_file(input_file: str, source_id: str, actually_delete: bool) -> bool:
logger.info(f"Verifying deletion of input file: {input_file}")
if actually_delete is False:
logger.info("Configured to leave the input alone, skipping deletion")
Expand All @@ -118,18 +193,22 @@ def delete_input_file(input_file: str, actually_delete: bool) -> bool:
# first remove the input file
try:
os.remove(input_file)
logger.info(f"Deleted VisXP input file: {input_file}")
logger.info(f"Deleted VisXP input tar file: {input_file}")
except OSError:
logger.exception("Could not delete input file")
return False

# now remove the "chunked path" from /mnt/dane-fs/input-files/03/d2/8a/03d28a03643a981284b403b91b95f6048576c234/xyz.mp4
# now remove the folders that were extracted from the input tar file
base_input_dir = get_base_input_dir(source_id)
try:
os.chdir(get_download_dir()) # cd /mnt/dane-fs/input-files
os.removedirs(
f".{input_file[len(get_download_dir()):input_file.rfind(os.sep)]}"
) # /03/d2/8a/03d28a03643a981284b403b91b95f6048576c234
logger.info("Deleted empty input dirs too")
for root, dirs, files in os.walk(base_input_dir):
for d in dirs:
dir_path = os.path.join(root, d)
logger.info(f"Deleting {dir_path}")
shutil.rmtree(dir_path)
logger.info("Deleted extracted input dirs")
os.removedirs(base_input_dir)
logger.info(f"Finally deleted the base_input_dir: {base_input_dir}")
except OSError:
logger.exception("OSError while removing empty input file dirs")
except FileNotFoundError:
Expand All @@ -145,13 +224,17 @@ def obtain_input_file(handler, doc: Document) -> VisXPFeatureExtractionInput:
if not validate_s3_uri(s3_uri):
return VisXPFeatureExtractionInput(500, f"Invalid S3 URI: {s3_uri}")

source_id = source_id_from_s3_uri(s3_uri)
start_time = time()
output_folder = get_download_dir()
output_folder = get_base_input_dir(source_id)

# TODO download the content into get_download_dir()
s3 = S3Store(cfg.OUTPUT.S3_ENDPOINT_URL)
bucket, object_name = parse_s3_uri(s3_uri)
input_file_path = os.path.join(output_folder, os.path.basename(object_name))
input_file_path = os.path.join(
get_download_dir(),
os.path.basename(object_name), # source_id is part of the object_name
)
success = s3.download_file(bucket, object_name, output_folder)
if success:
# TODO uncompress the visxp_prep.tar.gz
Expand Down
17 changes: 14 additions & 3 deletions main_data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
get_output_file_path,
transfer_output,
delete_local_output,
delete_input_file,
)
from models import (
CallbackResponse,
Expand All @@ -21,7 +22,9 @@
logger = logging.getLogger(__name__)


def extract_visual_features(feature_extraction_input: VisXPFeatureExtractionInput):
def extract_visual_features(
feature_extraction_input: VisXPFeatureExtractionInput,
) -> VisXPFeatureExtractionOutput:
logger.info("Starting VisXP visual feature extraction")
start_time = time()
feature_extraction_provenance = feature_extraction.run(
Expand Down Expand Up @@ -88,8 +91,16 @@ def apply_desired_io_on_output(
# NOTE: just a warning for now, but one to keep an eye out for
logger.warning(f"Could not delete output files: {output_path}")

if delete_input_on_completion:
logger.warning("Deletion of input not supported yet")
# step 8: clean the input file (if configured so)
if not delete_input_file(
feature_extraction_input.input_file_path,
feature_extraction_input.source_id,
delete_input_on_completion,
):
return {
"state": 500,
"message": "Generated VISXP_PREP output, but could not delete the input file",
}

return {
"state": 200,
Expand Down