diff --git a/AIPscan/Aggregator/task_helpers.py b/AIPscan/Aggregator/task_helpers.py index f0a6db14..a706d665 100644 --- a/AIPscan/Aggregator/task_helpers.py +++ b/AIPscan/Aggregator/task_helpers.py @@ -9,7 +9,9 @@ from dateutil.parser import ParserError, parse +from AIPscan import db from AIPscan.Aggregator.types import StorageServicePackage +from AIPscan.models import FetchJobError def format_api_url_with_limit_offset(storage_service): @@ -158,3 +160,12 @@ def write_mets(http_response, package_uuid, subdir): with open(download_file, "wb") as file: file.write(http_response.content) return download_file + + +def store_fetch_job_error_infomation(fetch_job_id, message): + fetch_error = FetchJobError() + fetch_error.fetch_job_id = fetch_job_id + fetch_error.message = message + + db.session.add(fetch_error) + db.session.commit() diff --git a/AIPscan/Aggregator/tasks.py b/AIPscan/Aggregator/tasks.py index ff46ada2..9be422e8 100644 --- a/AIPscan/Aggregator/tasks.py +++ b/AIPscan/Aggregator/tasks.py @@ -19,6 +19,7 @@ format_api_url_with_limit_offset, parse_package_list_file, process_package_object, + store_fetch_job_error_infomation, ) from AIPscan.extensions import celery from AIPscan.helpers import file_sha256_hash @@ -63,13 +64,20 @@ def start_mets_task( celery database. """ storage_service = StorageService.query.get(storage_service_id) - storage_location = database_helpers.create_or_update_storage_location( - current_location, storage_service - ) - pipeline = database_helpers.create_or_update_pipeline( - origin_pipeline, storage_service - ) + try: + storage_location = database_helpers.create_or_update_storage_location( + current_location, storage_service + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) + + try: + pipeline = database_helpers.create_or_update_pipeline( + origin_pipeline, storage_service + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) args = [ package_uuid, @@ -168,6 +176,8 @@ def workflow_coordinator( break if isinstance(package_lists_task.info, TaskError): + store_fetch_job_error_infomation(fetch_job_id, str(package_lists_task.info)) + # Re-raise. raise (package_lists_task.info) @@ -315,13 +325,18 @@ def get_mets( # Download METS file storage_service = StorageService.query.get(storage_service_id) - download_file = download_mets( - storage_service, - package_uuid, - relative_path_to_mets, - timestamp_str, - package_list_no, - ) + + try: + download_file = download_mets( + storage_service, + package_uuid, + relative_path_to_mets, + timestamp_str, + package_list_no, + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) + mets_name = os.path.basename(download_file) mets_hash = file_sha256_hash(download_file) diff --git a/AIPscan/Aggregator/templates/storage_service.html b/AIPscan/Aggregator/templates/storage_service.html index 97d3a8f6..b20061cb 100644 --- a/AIPscan/Aggregator/templates/storage_service.html +++ b/AIPscan/Aggregator/templates/storage_service.html @@ -53,6 +53,7 @@ Download duration Packages in SS New AIPs added + Errors Action @@ -76,6 +77,7 @@ {{ mets_fetch_job.aips|length }} + {{ mets_fetch_job.errors|length }} diff --git a/AIPscan/Aggregator/tests/test_task_helpers.py b/AIPscan/Aggregator/tests/test_task_helpers.py index f85fe864..be1daf53 100644 --- a/AIPscan/Aggregator/tests/test_task_helpers.py +++ b/AIPscan/Aggregator/tests/test_task_helpers.py @@ -7,7 +7,7 @@ import pytest -from AIPscan import models +from AIPscan import models, test_helpers from AIPscan.Aggregator import task_helpers from AIPscan.Aggregator.types import StorageServicePackage @@ -305,3 +305,9 @@ def test_process_package_object(packages, idx, storage_service_package): """ package_obj = task_helpers.process_package_object(packages[idx]) assert package_obj == storage_service_package, idx + + +def test_store_fetch_job_error_infomation(app_instance): + fetch_job = test_helpers.create_test_fetch_job() + + task_helpers.store_fetch_job_error_infomation(fetch_job.id, "Test") diff --git a/AIPscan/models.py b/AIPscan/models.py index 932f7c3b..4b0ac246 100644 --- a/AIPscan/models.py +++ b/AIPscan/models.py @@ -3,6 +3,8 @@ import re from datetime import date, datetime +from sqlalchemy.sql import func + from AIPscan import db UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" @@ -257,6 +259,9 @@ class FetchJob(db.Model): db.Integer(), db.ForeignKey("storage_service.id"), nullable=False ) aips = db.relationship("AIP", cascade="all,delete", backref="fetch_job", lazy=True) + errors = db.relationship( + "FetchJobError", cascade="all,delete", backref="fetch_job", lazy=True + ) def __init__( self, @@ -280,6 +285,16 @@ def __repr__(self): return "".format(self.download_start) +class FetchJobError(db.Model): + __tablename__ = "fetch_job_error" + id = db.Column(db.Integer(), primary_key=True) + fetch_job_id = db.Column( + db.Integer(), db.ForeignKey("fetch_job.id"), nullable=False + ) + message = db.Column(db.String(255), index=True, unique=True) + create_date = db.Column(db.DateTime, server_default=func.now()) + + class Pipeline(db.Model): __tablename__ = "pipeline" id = db.Column(db.Integer(), primary_key=True)