diff --git a/AIPscan/Aggregator/mets_parse_helpers.py b/AIPscan/Aggregator/mets_parse_helpers.py index 96651d79..5b71059c 100644 --- a/AIPscan/Aggregator/mets_parse_helpers.py +++ b/AIPscan/Aggregator/mets_parse_helpers.py @@ -94,6 +94,9 @@ def download_mets( get_mets_url(storage_service, package_uuid, relative_path_to_mets) ) + if mets_response.status_code != 200: + raise Exception("Non-200 HTTP code from METS download") + # Create a directory to download the METS to. numbered_subdir = create_numbered_subdirs(timestamp, package_list_no) diff --git a/AIPscan/Aggregator/task_helpers.py b/AIPscan/Aggregator/task_helpers.py index f0a6db14..a706d665 100644 --- a/AIPscan/Aggregator/task_helpers.py +++ b/AIPscan/Aggregator/task_helpers.py @@ -9,7 +9,9 @@ from dateutil.parser import ParserError, parse +from AIPscan import db from AIPscan.Aggregator.types import StorageServicePackage +from AIPscan.models import FetchJobError def format_api_url_with_limit_offset(storage_service): @@ -158,3 +160,12 @@ def write_mets(http_response, package_uuid, subdir): with open(download_file, "wb") as file: file.write(http_response.content) return download_file + + +def store_fetch_job_error_infomation(fetch_job_id, message): + fetch_error = FetchJobError() + fetch_error.fetch_job_id = fetch_job_id + fetch_error.message = message + + db.session.add(fetch_error) + db.session.commit() diff --git a/AIPscan/Aggregator/tasks.py b/AIPscan/Aggregator/tasks.py index f12c7f78..012d729f 100644 --- a/AIPscan/Aggregator/tasks.py +++ b/AIPscan/Aggregator/tasks.py @@ -19,6 +19,7 @@ format_api_url_with_limit_offset, parse_package_list_file, process_package_object, + store_fetch_job_error_infomation, ) from AIPscan.extensions import celery from AIPscan.helpers import file_sha256_hash @@ -63,13 +64,22 @@ def start_mets_task( celery database. """ storage_service = StorageService.query.get(storage_service_id) - storage_location = database_helpers.create_or_update_storage_location( - current_location, storage_service - ) - pipeline = database_helpers.create_or_update_pipeline( - origin_pipeline, storage_service - ) + try: + storage_location = database_helpers.create_or_update_storage_location( + current_location, storage_service + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) + + return + + try: + pipeline = database_helpers.create_or_update_pipeline( + origin_pipeline, storage_service + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) args = [ package_uuid, @@ -168,6 +178,8 @@ def workflow_coordinator( break if isinstance(package_lists_task.info, TaskError): + store_fetch_job_error_infomation(fetch_job_id, str(package_lists_task.info)) + # Re-raise. raise (package_lists_task.info) @@ -317,13 +329,20 @@ def get_mets( # Download METS file storage_service = StorageService.query.get(storage_service_id) - download_file = download_mets( - storage_service, - package_uuid, - relative_path_to_mets, - timestamp_str, - package_list_no, - ) + + try: + download_file = download_mets( + storage_service, + package_uuid, + relative_path_to_mets, + timestamp_str, + package_list_no, + ) + except Exception as e: + store_fetch_job_error_infomation(fetch_job_id, str(e)) + + return + mets_name = os.path.basename(download_file) mets_hash = file_sha256_hash(download_file) diff --git a/AIPscan/Aggregator/templates/storage_service.html b/AIPscan/Aggregator/templates/storage_service.html index 97d3a8f6..b20061cb 100644 --- a/AIPscan/Aggregator/templates/storage_service.html +++ b/AIPscan/Aggregator/templates/storage_service.html @@ -53,6 +53,7 @@ Download duration Packages in SS New AIPs added + Errors Action @@ -76,6 +77,7 @@ {{ mets_fetch_job.aips|length }} + {{ mets_fetch_job.errors|length }} diff --git a/AIPscan/Aggregator/tests/test_task_helpers.py b/AIPscan/Aggregator/tests/test_task_helpers.py index f85fe864..17438414 100644 --- a/AIPscan/Aggregator/tests/test_task_helpers.py +++ b/AIPscan/Aggregator/tests/test_task_helpers.py @@ -7,7 +7,7 @@ import pytest -from AIPscan import models +from AIPscan import models, test_helpers from AIPscan.Aggregator import task_helpers from AIPscan.Aggregator.types import StorageServicePackage @@ -305,3 +305,25 @@ def test_process_package_object(packages, idx, storage_service_package): """ package_obj = task_helpers.process_package_object(packages[idx]) assert package_obj == storage_service_package, idx + + +def test_store_fetch_job_error_info(app_instance): + """Test task helper for recording fetch job errors.""" + fetch_job = test_helpers.create_test_fetch_job() + + # Make sure no fetch job errors exist for test fetch job + fetch_job_error = models.FetchJobError.query.filter_by( + fetch_job_id=fetch_job.id + ).first() + + assert fetch_job_error is None + + # Attempt to store fetch job error and make sure it ends up in the database + task_helpers.store_fetch_job_error_infomation(fetch_job.id, "Test") + + fetch_job_error = models.FetchJobError.query.filter_by( + fetch_job_id=fetch_job.id + ).first() + + assert fetch_job_error is not None + assert fetch_job_error.fetch_job_id == fetch_job.id diff --git a/AIPscan/Aggregator/tests/test_tasks.py b/AIPscan/Aggregator/tests/test_tasks.py index b737aeba..6d56f837 100644 --- a/AIPscan/Aggregator/tests/test_tasks.py +++ b/AIPscan/Aggregator/tests/test_tasks.py @@ -17,6 +17,7 @@ make_request, parse_package_list_file, parse_packages_and_load_mets, + start_mets_task, ) from AIPscan.Aggregator.tests import ( INVALID_JSON, @@ -26,7 +27,7 @@ VALID_JSON, MockResponse, ) -from AIPscan.models import AIP, Agent, FetchJob, StorageService +from AIPscan.models import AIP, Agent, FetchJob, FetchJobError, StorageService SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) FIXTURES_DIR = os.path.join(SCRIPT_DIR, "fixtures") @@ -275,3 +276,66 @@ def test_delete_aip(app_instance): deleted_aip = AIP.query.filter_by(uuid=PACKAGE_UUID).first() assert deleted_aip is None + + +def test_store_fetch_job_error_during_get_mets(app_instance): + """Test recording fetch job error during get_mets task.""" + storage_service = test_helpers.create_test_storage_service() + + fetch_job = test_helpers.create_test_fetch_job( + storage_service_id=storage_service.id + ) + + # Make sure no fetch job errors exist for test fetch job + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is None + + get_mets( + "222", + 1000, + "/", + "timestamp string", + "1", + storage_service.id, + 1, + 1, + fetch_job.id, + None, + ) + + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + assert fetch_job_error.message == "Non-200 HTTP code from METS download" + + assert fetch_job_error is not None + assert fetch_job_error.fetch_job_id == fetch_job.id + + +def test_store_fetch_job_error_during_create_location(app_instance): + """Test recording fetch job error during create location.""" + fetch_job = test_helpers.create_test_fetch_job() + + # Make sure no fetch job errors exist for test fetch job + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is None + + package_uuid = str(uuid.uuid4()) + + start_mets_task( + package_uuid, + 1000, + "/tmp", + None, + None, + "timestamp src", + "1", + 1, + fetch_job.id, + False, + ) + + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is not None + assert fetch_job_error.fetch_job_id == fetch_job.id diff --git a/AIPscan/models.py b/AIPscan/models.py index 932f7c3b..4b0ac246 100644 --- a/AIPscan/models.py +++ b/AIPscan/models.py @@ -3,6 +3,8 @@ import re from datetime import date, datetime +from sqlalchemy.sql import func + from AIPscan import db UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" @@ -257,6 +259,9 @@ class FetchJob(db.Model): db.Integer(), db.ForeignKey("storage_service.id"), nullable=False ) aips = db.relationship("AIP", cascade="all,delete", backref="fetch_job", lazy=True) + errors = db.relationship( + "FetchJobError", cascade="all,delete", backref="fetch_job", lazy=True + ) def __init__( self, @@ -280,6 +285,16 @@ def __repr__(self): return "".format(self.download_start) +class FetchJobError(db.Model): + __tablename__ = "fetch_job_error" + id = db.Column(db.Integer(), primary_key=True) + fetch_job_id = db.Column( + db.Integer(), db.ForeignKey("fetch_job.id"), nullable=False + ) + message = db.Column(db.String(255), index=True, unique=True) + create_date = db.Column(db.DateTime, server_default=func.now()) + + class Pipeline(db.Model): __tablename__ = "pipeline" id = db.Column(db.Integer(), primary_key=True)