From 7dc005a9d38d24ed8205ca142ba98c62bc30717a Mon Sep 17 00:00:00 2001 From: Mike Cantelon Date: Tue, 2 Jan 2024 17:01:52 -0800 Subject: [PATCH] Add fetch error reporting (#21) Added fetch error model, a helper function to add error information to it, and reporting of errors to the storage service page. --- AIPscan/Aggregator/mets_parse_helpers.py | 3 + AIPscan/Aggregator/task_helpers.py | 11 ++ AIPscan/Aggregator/tasks.py | 47 +++-- .../Aggregator/templates/storage_service.html | 2 + AIPscan/Aggregator/tests/test_task_helpers.py | 24 ++- AIPscan/Aggregator/tests/test_tasks.py | 170 +++++++++++++++++- AIPscan/models.py | 15 ++ 7 files changed, 257 insertions(+), 15 deletions(-) diff --git a/AIPscan/Aggregator/mets_parse_helpers.py b/AIPscan/Aggregator/mets_parse_helpers.py index 96651d79..5b71059c 100644 --- a/AIPscan/Aggregator/mets_parse_helpers.py +++ b/AIPscan/Aggregator/mets_parse_helpers.py @@ -94,6 +94,9 @@ def download_mets( get_mets_url(storage_service, package_uuid, relative_path_to_mets) ) + if mets_response.status_code != 200: + raise Exception("Non-200 HTTP code from METS download") + # Create a directory to download the METS to. numbered_subdir = create_numbered_subdirs(timestamp, package_list_no) diff --git a/AIPscan/Aggregator/task_helpers.py b/AIPscan/Aggregator/task_helpers.py index f0a6db14..a706d665 100644 --- a/AIPscan/Aggregator/task_helpers.py +++ b/AIPscan/Aggregator/task_helpers.py @@ -9,7 +9,9 @@ from dateutil.parser import ParserError, parse +from AIPscan import db from AIPscan.Aggregator.types import StorageServicePackage +from AIPscan.models import FetchJobError def format_api_url_with_limit_offset(storage_service): @@ -158,3 +160,12 @@ def write_mets(http_response, package_uuid, subdir): with open(download_file, "wb") as file: file.write(http_response.content) return download_file + + +def store_fetch_job_error_infomation(fetch_job_id, message): + fetch_error = FetchJobError() + fetch_error.fetch_job_id = fetch_job_id + fetch_error.message = message + + db.session.add(fetch_error) + db.session.commit() diff --git a/AIPscan/Aggregator/tasks.py b/AIPscan/Aggregator/tasks.py index 7da9442f..7dc33e40 100644 --- a/AIPscan/Aggregator/tasks.py +++ b/AIPscan/Aggregator/tasks.py @@ -20,6 +20,7 @@ format_api_url_with_limit_offset, parse_package_list_file, process_package_object, + store_fetch_job_error_infomation, ) from AIPscan.extensions import celery from AIPscan.helpers import file_sha256_hash @@ -71,13 +72,24 @@ def start_mets_task( celery database. """ storage_service = StorageService.query.get(storage_service_id) - storage_location = database_helpers.create_or_update_storage_location( - current_location, storage_service - ) - pipeline = database_helpers.create_or_update_pipeline( - origin_pipeline, storage_service - ) + try: + storage_location = database_helpers.create_or_update_storage_location( + current_location, storage_service + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) + + return + + try: + pipeline = database_helpers.create_or_update_pipeline( + origin_pipeline, storage_service + ) + except Exception as err: + store_fetch_job_error_infomation(fetch_job_id, str(err)) + + return args = [ package_uuid, @@ -176,6 +188,8 @@ def workflow_coordinator( break if isinstance(package_lists_task.info, TaskError): + store_fetch_job_error_infomation(fetch_job_id, str(package_lists_task.info)) + # Re-raise. raise (package_lists_task.info) @@ -358,13 +372,20 @@ def get_mets( # Download METS file storage_service = StorageService.query.get(storage_service_id) - download_file = download_mets( - storage_service, - package_uuid, - relative_path_to_mets, - timestamp_str, - package_list_no, - ) + + try: + download_file = download_mets( + storage_service, + package_uuid, + relative_path_to_mets, + timestamp_str, + package_list_no, + ) + except Exception as e: + store_fetch_job_error_infomation(fetch_job_id, str(e)) + + return + mets_name = os.path.basename(download_file) mets_hash = file_sha256_hash(download_file) diff --git a/AIPscan/Aggregator/templates/storage_service.html b/AIPscan/Aggregator/templates/storage_service.html index 97d3a8f6..b20061cb 100644 --- a/AIPscan/Aggregator/templates/storage_service.html +++ b/AIPscan/Aggregator/templates/storage_service.html @@ -53,6 +53,7 @@ Download duration Packages in SS New AIPs added + Errors Action @@ -76,6 +77,7 @@ {{ mets_fetch_job.aips|length }} + {{ mets_fetch_job.errors|length }} diff --git a/AIPscan/Aggregator/tests/test_task_helpers.py b/AIPscan/Aggregator/tests/test_task_helpers.py index f85fe864..17438414 100644 --- a/AIPscan/Aggregator/tests/test_task_helpers.py +++ b/AIPscan/Aggregator/tests/test_task_helpers.py @@ -7,7 +7,7 @@ import pytest -from AIPscan import models +from AIPscan import models, test_helpers from AIPscan.Aggregator import task_helpers from AIPscan.Aggregator.types import StorageServicePackage @@ -305,3 +305,25 @@ def test_process_package_object(packages, idx, storage_service_package): """ package_obj = task_helpers.process_package_object(packages[idx]) assert package_obj == storage_service_package, idx + + +def test_store_fetch_job_error_info(app_instance): + """Test task helper for recording fetch job errors.""" + fetch_job = test_helpers.create_test_fetch_job() + + # Make sure no fetch job errors exist for test fetch job + fetch_job_error = models.FetchJobError.query.filter_by( + fetch_job_id=fetch_job.id + ).first() + + assert fetch_job_error is None + + # Attempt to store fetch job error and make sure it ends up in the database + task_helpers.store_fetch_job_error_infomation(fetch_job.id, "Test") + + fetch_job_error = models.FetchJobError.query.filter_by( + fetch_job_id=fetch_job.id + ).first() + + assert fetch_job_error is not None + assert fetch_job_error.fetch_job_id == fetch_job.id diff --git a/AIPscan/Aggregator/tests/test_tasks.py b/AIPscan/Aggregator/tests/test_tasks.py index c711c6d9..774fb487 100644 --- a/AIPscan/Aggregator/tests/test_tasks.py +++ b/AIPscan/Aggregator/tests/test_tasks.py @@ -20,6 +20,8 @@ parse_package_list_file, parse_packages_and_load_mets, start_index_task, + start_mets_task, + workflow_coordinator, ) from AIPscan.Aggregator.tests import ( INVALID_JSON, @@ -29,7 +31,7 @@ VALID_JSON, MockResponse, ) -from AIPscan.models import AIP, Agent, FetchJob, StorageService, index_tasks +from AIPscan.models import AIP, Agent, FetchJob, FetchJobError, StorageService, index_tasks SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) FIXTURES_DIR = os.path.join(SCRIPT_DIR, "fixtures") @@ -300,6 +302,131 @@ class MockTask: mock_index_task.return_value = MockTask() # Create test fetch job + + +def test_store_fetch_job_error_during_get_mets(app_instance, tmpdir): + """Test recording fetch job error during get_mets task.""" + storage_service = test_helpers.create_test_storage_service() + pipeline = test_helpers.create_test_pipeline(storage_service_id=storage_service.id) + + fetch_job = test_helpers.create_test_fetch_job( + storage_service_id=storage_service.id + ) + + # Make sure no fetch job errors exist for test fetch job + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is None + + package_uuid = str(uuid.uuid4()) + + get_mets( + package_uuid, + 192643, + tmpdir, + "timestamp string", + "1", + storage_service.id, + None, + pipeline.id, + fetch_job.id, + None, + ) + + # Make sure we receive the expected results + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is not None + assert fetch_job_error.message == "Non-200 HTTP code from METS download" + assert fetch_job_error.fetch_job_id == fetch_job.id + + +def test_store_fetch_job_error_during_create_location(app_instance, tmpdir): + """Test recording fetch job error during create location.""" + storage_service = test_helpers.create_test_storage_service() + pipeline = test_helpers.create_test_pipeline(storage_service_id=storage_service.id) + + fetch_job = test_helpers.create_test_fetch_job( + storage_service_id=storage_service.id + ) + + # Make sure no fetch job errors exist for test fetch job + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is None + + package_uuid = str(uuid.uuid4()) + + start_mets_task( + package_uuid, + 192643, + tmpdir, + None, + pipeline.id, + "timestamp src", + "1", + storage_service.id, + fetch_job.id, + False, + ) + + # Make sure we receive the expected results + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is not None + assert "Max retries exceeded" in fetch_job_error.message + assert fetch_job_error.fetch_job_id == fetch_job.id + + +def test_store_fetch_job_error_during_create_pipeline(app_instance, tmpdir, mocker): + """Test recording fetch job error during create pipeline.""" + storage_service = test_helpers.create_test_storage_service() + storage_location = test_helpers.create_test_storage_location( + storage_service_id=storage_service.id + ) + pipeline = test_helpers.create_test_pipeline(storage_service_id=storage_service.id) + + fetch_job = test_helpers.create_test_fetch_job( + storage_service_id=storage_service.id + ) + + # Make sure no fetch job errors exist for test fetch job + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is None + + updater = mocker.patch( + "AIPscan.Aggregator.database_helpers.create_or_update_storage_location" + ) + updater.return_value = storage_location + + package_uuid = str(uuid.uuid4()) + + start_mets_task( + package_uuid, + 1000, + tmpdir, + storage_location.current_location, + pipeline.id, + "timestamp src", + "1", + storage_service.id, + fetch_job.id, + False, + ) + + # Make sure we receive the expected results + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is not None + assert "Name or service not known" in fetch_job_error.message + assert fetch_job_error.fetch_job_id == fetch_job.id + + +def test_store_fetch_job_error_during_workflow_coordinator( + app_instance, tmpdir, mocker +): + """Test recording fetch job error during workflow coordinator.""" storage_service = test_helpers.create_test_storage_service() fetch_job = test_helpers.create_test_fetch_job( @@ -346,3 +473,44 @@ def test_index_task(app_instance, enable_typesense, mocker): # Make sure finish bulk document creation function got called mock_finish_bulk_doc.assert_called() + + # Make sure no fetch job errors exist for test fetch job + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is None + + # Mock result containing task error + class MockPackageListsRequestDelayResult: + id = None + info = TaskError("Task error") + + mock_result = MockPackageListsRequestDelayResult() + + delay = mocker.patch("AIPscan.Aggregator.tasks.package_lists_request.delay") + delay.return_value = mock_result + + # Skip writing of Celery update + mocker.patch("AIPscan.Aggregator.tasks.write_celery_update") + + # Mock result indicating failure + class MockPackageListsRequestAsyncResult: + state = "FAILURE" + + mock_result = MockPackageListsRequestAsyncResult() + + async_result = mocker.patch( + "AIPscan.Aggregator.tasks.package_lists_request.AsyncResult" + ) + async_result.return_value = mock_result + + # Initiate workflow coordinator logic + args = ["timestamp str", storage_service.id, fetch_job.id, tmpdir] + + workflow_coordinator.apply(args=args) + + # Make sure we receive the expected results + fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first() + + assert fetch_job_error is not None + assert fetch_job_error.message == "Task error" + assert fetch_job_error.fetch_job_id == fetch_job.id diff --git a/AIPscan/models.py b/AIPscan/models.py index 4691c675..dcff2fd8 100644 --- a/AIPscan/models.py +++ b/AIPscan/models.py @@ -3,6 +3,8 @@ import re from datetime import date, datetime +from sqlalchemy.sql import func + from AIPscan import db UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}" @@ -271,6 +273,9 @@ class FetchJob(db.Model): index_tasks = db.relationship( "index_tasks", cascade="all,delete", backref="fetch_job", lazy=True ) + errors = db.relationship( + "FetchJobError", cascade="all,delete", backref="fetch_job", lazy=True + ) def __init__( self, @@ -294,6 +299,16 @@ def __repr__(self): return "".format(self.download_start) +class FetchJobError(db.Model): + __tablename__ = "fetch_job_error" + id = db.Column(db.Integer(), primary_key=True) + fetch_job_id = db.Column( + db.Integer(), db.ForeignKey("fetch_job.id"), nullable=False + ) + message = db.Column(db.String(255), index=True, unique=True) + create_date = db.Column(db.DateTime, server_default=func.now()) + + class Pipeline(db.Model): __tablename__ = "pipeline" id = db.Column(db.Integer(), primary_key=True)