Skip to content

Commit

Permalink
Add fetch error reporting (#21)
Browse files Browse the repository at this point in the history
Added fetch error model, a helper function to add error information to
it, and reporting of errors to the storage service page.
  • Loading branch information
mcantelon committed Mar 26, 2024
1 parent fcc9d26 commit 7764155
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 15 deletions.
3 changes: 3 additions & 0 deletions AIPscan/Aggregator/mets_parse_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def download_mets(
get_mets_url(storage_service, package_uuid, relative_path_to_mets)
)

if mets_response.status_code != 200:
raise Exception("Non-200 HTTP code from METS download")

# Create a directory to download the METS to.
numbered_subdir = create_numbered_subdirs(timestamp, package_list_no)

Expand Down
11 changes: 11 additions & 0 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

from dateutil.parser import ParserError, parse

from AIPscan import db
from AIPscan.Aggregator.types import StorageServicePackage
from AIPscan.models import FetchJobError


def format_api_url_with_limit_offset(storage_service):
Expand Down Expand Up @@ -158,3 +160,12 @@ def write_mets(http_response, package_uuid, subdir):
with open(download_file, "wb") as file:
file.write(http_response.content)
return download_file


def store_fetch_job_error_infomation(fetch_job_id, message):
fetch_error = FetchJobError()
fetch_error.fetch_job_id = fetch_job_id
fetch_error.message = message

db.session.add(fetch_error)
db.session.commit()
47 changes: 34 additions & 13 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
format_api_url_with_limit_offset,
parse_package_list_file,
process_package_object,
store_fetch_job_error_infomation,
)
from AIPscan.extensions import celery
from AIPscan.helpers import file_sha256_hash
Expand Down Expand Up @@ -63,13 +64,24 @@ def start_mets_task(
celery database.
"""
storage_service = StorageService.query.get(storage_service_id)
storage_location = database_helpers.create_or_update_storage_location(
current_location, storage_service
)

pipeline = database_helpers.create_or_update_pipeline(
origin_pipeline, storage_service
)
try:
storage_location = database_helpers.create_or_update_storage_location(
current_location, storage_service
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

return

try:
pipeline = database_helpers.create_or_update_pipeline(
origin_pipeline, storage_service
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

return

args = [
package_uuid,
Expand Down Expand Up @@ -168,6 +180,8 @@ def workflow_coordinator(
break

if isinstance(package_lists_task.info, TaskError):
store_fetch_job_error_infomation(fetch_job_id, str(package_lists_task.info))

# Re-raise.
raise (package_lists_task.info)

Expand Down Expand Up @@ -317,13 +331,20 @@ def get_mets(

# Download METS file
storage_service = StorageService.query.get(storage_service_id)
download_file = download_mets(
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)

try:
download_file = download_mets(
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)
except Exception as e:
store_fetch_job_error_infomation(fetch_job_id, str(e))

return

mets_name = os.path.basename(download_file)
mets_hash = file_sha256_hash(download_file)

Expand Down
2 changes: 2 additions & 0 deletions AIPscan/Aggregator/templates/storage_service.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<th><strong>Download duration</strong></th>
<th><strong>Packages in SS</strong></th>
<th><strong>New AIPs added</strong></th>
<th><strong>Errors</strong></th>
<th><strong>Action</strong></th>
</tr>
</thead>
Expand All @@ -76,6 +77,7 @@
</div>
</td>
<td>{{ mets_fetch_job.aips|length }}</td>
<td>{{ mets_fetch_job.errors|length }}</td>
<td>
<a href="{{ url_for('aggregator.delete_fetch_job', fetch_job_id=mets_fetch_job.id) }}"><button type="button" class="btn btn-danger">Delete</button></a>
</td>
Expand Down
24 changes: 23 additions & 1 deletion AIPscan/Aggregator/tests/test_task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pytest

from AIPscan import models
from AIPscan import models, test_helpers
from AIPscan.Aggregator import task_helpers
from AIPscan.Aggregator.types import StorageServicePackage

Expand Down Expand Up @@ -305,3 +305,25 @@ def test_process_package_object(packages, idx, storage_service_package):
"""
package_obj = task_helpers.process_package_object(packages[idx])
assert package_obj == storage_service_package, idx


def test_store_fetch_job_error_info(app_instance):
"""Test task helper for recording fetch job errors."""
fetch_job = test_helpers.create_test_fetch_job()

# Make sure no fetch job errors exist for test fetch job
fetch_job_error = models.FetchJobError.query.filter_by(
fetch_job_id=fetch_job.id
).first()

assert fetch_job_error is None

# Attempt to store fetch job error and make sure it ends up in the database
task_helpers.store_fetch_job_error_infomation(fetch_job.id, "Test")

fetch_job_error = models.FetchJobError.query.filter_by(
fetch_job_id=fetch_job.id
).first()

assert fetch_job_error is not None
assert fetch_job_error.fetch_job_id == fetch_job.id
169 changes: 168 additions & 1 deletion AIPscan/Aggregator/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
make_request,
parse_package_list_file,
parse_packages_and_load_mets,
start_mets_task,
workflow_coordinator,
)
from AIPscan.Aggregator.tests import (
INVALID_JSON,
Expand All @@ -26,7 +28,7 @@
VALID_JSON,
MockResponse,
)
from AIPscan.models import AIP, Agent, FetchJob, StorageService
from AIPscan.models import AIP, Agent, FetchJob, FetchJobError, StorageService

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
FIXTURES_DIR = os.path.join(SCRIPT_DIR, "fixtures")
Expand Down Expand Up @@ -275,3 +277,168 @@ def test_delete_aip(app_instance):

deleted_aip = AIP.query.filter_by(uuid=PACKAGE_UUID).first()
assert deleted_aip is None


def test_store_fetch_job_error_during_get_mets(app_instance, tmpdir):
"""Test recording fetch job error during get_mets task."""
storage_service = test_helpers.create_test_storage_service()

fetch_job = test_helpers.create_test_fetch_job(
storage_service_id=storage_service.id
)

# Make sure no fetch job errors exist for test fetch job
fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first()

assert fetch_job_error is None

get_mets(
"222",
1000,
tmpdir,
"timestamp string",
"1",
storage_service.id,
1,
1,
fetch_job.id,
None,
)

fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first()

assert fetch_job_error is not None
assert fetch_job_error.message == "Non-200 HTTP code from METS download"
assert fetch_job_error.fetch_job_id == fetch_job.id


def test_store_fetch_job_error_during_create_location(app_instance, tmpdir):
"""Test recording fetch job error during create location."""
storage_service = test_helpers.create_test_storage_service()

fetch_job = test_helpers.create_test_fetch_job(
storage_service_id=storage_service.id
)

# Make sure no fetch job errors exist for test fetch job
fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first()

assert fetch_job_error is None

package_uuid = str(uuid.uuid4())

start_mets_task(
package_uuid,
1000,
tmpdir,
None,
None,
"timestamp src",
"1",
storage_service.id,
fetch_job.id,
False,
)

fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first()

assert fetch_job_error is not None
assert "Max retries exceeded" in fetch_job_error.message
assert fetch_job_error.fetch_job_id == fetch_job.id


def test_store_fetch_job_error_during_create_pipeline(app_instance, tmpdir, mocker):
"""Test recording fetch job error during create pipeline."""
storage_service = test_helpers.create_test_storage_service()
storage_location = test_helpers.create_test_storage_location(
storage_service_id=storage_service.id
)
pipeline = test_helpers.create_test_pipeline(storage_service_id=storage_service.id)

fetch_job = test_helpers.create_test_fetch_job(
storage_service_id=storage_service.id
)

# Make sure no fetch job errors exist for test fetch job
fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first()

assert fetch_job_error is None

updater = mocker.patch(
"AIPscan.Aggregator.database_helpers.create_or_update_storage_location"
)
updater.return_value = storage_location

package_uuid = str(uuid.uuid4())

start_mets_task(
package_uuid,
1000,
tmpdir,
storage_location.current_location,
pipeline.id,
"timestamp src",
"1",
1,
fetch_job.id,
False,
)

# Make sure we receive the expected results
fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first()

assert fetch_job_error is not None
assert "Name or service not known" in fetch_job_error.message
assert fetch_job_error.fetch_job_id == fetch_job.id


def test_store_fetch_job_error_during_workflow_coordinator(
app_instance, tmpdir, mocker
):
"""Test recording fetch job error during workflow coordinator."""
storage_service = test_helpers.create_test_storage_service()

fetch_job = test_helpers.create_test_fetch_job(
storage_service_id=storage_service.id
)

# Make sure no fetch job errors exist for test fetch job
fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first()

assert fetch_job_error is None

# Mock result containing task error
class MockPackageListsRequestDelayResult:
id = None
info = TaskError("Task error")

mock_result = MockPackageListsRequestDelayResult()

delay = mocker.patch("AIPscan.Aggregator.tasks.package_lists_request.delay")
delay.return_value = mock_result

# Skip writing of Celery update
mocker.patch("AIPscan.Aggregator.tasks.write_celery_update")

# Mock result indicating failure
class MockPackageListsRequestAsyncResult:
state = "FAILURE"

mock_result = MockPackageListsRequestAsyncResult()

async_result = mocker.patch(
"AIPscan.Aggregator.tasks.package_lists_request.AsyncResult"
)
async_result.return_value = mock_result

# Initiate workflow coordinator logic
args = ["timestamp str", storage_service.id, fetch_job.id, tmpdir]

workflow_coordinator.apply(args=args)

# Make sure we receive the expected results
fetch_job_error = FetchJobError.query.filter_by(fetch_job_id=fetch_job.id).first()

assert fetch_job_error is not None
assert fetch_job_error.message == "Task error"
assert fetch_job_error.fetch_job_id == fetch_job.id
15 changes: 15 additions & 0 deletions AIPscan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import re
from datetime import date, datetime

from sqlalchemy.sql import func

from AIPscan import db

UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
Expand Down Expand Up @@ -257,6 +259,9 @@ class FetchJob(db.Model):
db.Integer(), db.ForeignKey("storage_service.id"), nullable=False
)
aips = db.relationship("AIP", cascade="all,delete", backref="fetch_job", lazy=True)
errors = db.relationship(
"FetchJobError", cascade="all,delete", backref="fetch_job", lazy=True
)

def __init__(
self,
Expand All @@ -280,6 +285,16 @@ def __repr__(self):
return "<Fetch Job '{}'>".format(self.download_start)


class FetchJobError(db.Model):
__tablename__ = "fetch_job_error"
id = db.Column(db.Integer(), primary_key=True)
fetch_job_id = db.Column(
db.Integer(), db.ForeignKey("fetch_job.id"), nullable=False
)
message = db.Column(db.String(255), index=True, unique=True)
create_date = db.Column(db.DateTime, server_default=func.now())


class Pipeline(db.Model):
__tablename__ = "pipeline"
id = db.Column(db.Integer(), primary_key=True)
Expand Down

0 comments on commit 7764155

Please sign in to comment.