Skip to content

Commit

Permalink
Add fetch error reporting (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcantelon committed Mar 23, 2024
1 parent fcc9d26 commit b2b032d
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 15 deletions.
3 changes: 3 additions & 0 deletions AIPscan/Aggregator/mets_parse_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def download_mets(
get_mets_url(storage_service, package_uuid, relative_path_to_mets)
)

if mets_response.status_code != 200:
raise Exception("Non-200 HTTP code from METS download")

# Create a directory to download the METS to.
numbered_subdir = create_numbered_subdirs(timestamp, package_list_no)

Expand Down
11 changes: 11 additions & 0 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

from dateutil.parser import ParserError, parse

from AIPscan import db
from AIPscan.Aggregator.types import StorageServicePackage
from AIPscan.models import FetchJobError


def format_api_url_with_limit_offset(storage_service):
Expand Down Expand Up @@ -158,3 +160,12 @@ def write_mets(http_response, package_uuid, subdir):
with open(download_file, "wb") as file:
file.write(http_response.content)
return download_file


def store_fetch_job_error_infomation(fetch_job_id, message):
fetch_error = FetchJobError()
fetch_error.fetch_job_id = fetch_job_id
fetch_error.message = message

db.session.add(fetch_error)
db.session.commit()
43 changes: 30 additions & 13 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
format_api_url_with_limit_offset,
parse_package_list_file,
process_package_object,
store_fetch_job_error_infomation,
)
from AIPscan.extensions import celery
from AIPscan.helpers import file_sha256_hash
Expand Down Expand Up @@ -63,13 +64,20 @@ def start_mets_task(
celery database.
"""
storage_service = StorageService.query.get(storage_service_id)
storage_location = database_helpers.create_or_update_storage_location(
current_location, storage_service
)

pipeline = database_helpers.create_or_update_pipeline(
origin_pipeline, storage_service
)
try:
storage_location = database_helpers.create_or_update_storage_location(

Check warning on line 69 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L68-L69

Added lines #L68 - L69 were not covered by tests
current_location, storage_service
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

Check warning on line 73 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L72-L73

Added lines #L72 - L73 were not covered by tests

try:
pipeline = database_helpers.create_or_update_pipeline(

Check warning on line 76 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L75-L76

Added lines #L75 - L76 were not covered by tests
origin_pipeline, storage_service
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

Check warning on line 80 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L79-L80

Added lines #L79 - L80 were not covered by tests

args = [
package_uuid,
Expand Down Expand Up @@ -168,6 +176,8 @@ def workflow_coordinator(
break

if isinstance(package_lists_task.info, TaskError):
store_fetch_job_error_infomation(fetch_job_id, str(package_lists_task.info))

Check warning on line 179 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L179

Added line #L179 was not covered by tests

# Re-raise.
raise (package_lists_task.info)

Expand Down Expand Up @@ -317,13 +327,20 @@ def get_mets(

# Download METS file
storage_service = StorageService.query.get(storage_service_id)
download_file = download_mets(
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)

try:
download_file = download_mets(
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)
except Exception as e:
store_fetch_job_error_infomation(fetch_job_id, str(e))

return

mets_name = os.path.basename(download_file)
mets_hash = file_sha256_hash(download_file)

Expand Down
2 changes: 2 additions & 0 deletions AIPscan/Aggregator/templates/storage_service.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<th><strong>Download duration</strong></th>
<th><strong>Packages in SS</strong></th>
<th><strong>New AIPs added</strong></th>
<th><strong>Errors</strong></th>
<th><strong>Action</strong></th>
</tr>
</thead>
Expand All @@ -76,6 +77,7 @@
</div>
</td>
<td>{{ mets_fetch_job.aips|length }}</td>
<td>{{ mets_fetch_job.errors|length }}</td>
<td>
<a href="{{ url_for('aggregator.delete_fetch_job', fetch_job_id=mets_fetch_job.id) }}"><button type="button" class="btn btn-danger">Delete</button></a>
</td>
Expand Down
58 changes: 56 additions & 2 deletions AIPscan/Aggregator/tests/test_task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import pytest

from AIPscan import models
from AIPscan.Aggregator import task_helpers
from AIPscan import models, test_helpers
from AIPscan.Aggregator import task_helpers, tasks
from AIPscan.Aggregator.types import StorageServicePackage

FIXTURES_DIR = "fixtures"
Expand Down Expand Up @@ -305,3 +305,57 @@ def test_process_package_object(packages, idx, storage_service_package):
"""
package_obj = task_helpers.process_package_object(packages[idx])
assert package_obj == storage_service_package, idx


def test_store_fetch_job_error_info(app_instance):
"""Test task helper for recording fetch job errors."""
fetch_job = test_helpers.create_test_fetch_job()

# Make sure no fetch job errors exist for test fetch job
fetch_job_error = models.FetchJobError.query.filter_by(
fetch_job_id=fetch_job.id
).first()

assert fetch_job_error is None

# Attempt to store fetch job error and make sure it ends up in the database
task_helpers.store_fetch_job_error_infomation(fetch_job.id, "Test")

fetch_job_error = models.FetchJobError.query.filter_by(
fetch_job_id=fetch_job.id
).first()

assert fetch_job_error is not None
assert fetch_job_error.fetch_job_id == fetch_job.id


def test_store_fetch_job_error_info_in_get_mets(app_with_populated_files):
"""Test recording fetch job errors."""
fetch_job = test_helpers.create_test_fetch_job()

# Make sure no fetch job errors exist for test fetch job
fetch_job_error = models.FetchJobError.query.filter_by(
fetch_job_id=fetch_job.id
).first()

assert fetch_job_error is None

tasks.get_mets(
"222",
1000,
"/",
"timestamp string",
"1",
1,
1,
1,
fetch_job.id,
None,
)

fetch_job_error = models.FetchJobError.query.filter_by(
fetch_job_id=fetch_job.id
).first()

assert fetch_job_error is not None
assert fetch_job_error.fetch_job_id == fetch_job.id
15 changes: 15 additions & 0 deletions AIPscan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import re
from datetime import date, datetime

from sqlalchemy.sql import func

from AIPscan import db

UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
Expand Down Expand Up @@ -257,6 +259,9 @@ class FetchJob(db.Model):
db.Integer(), db.ForeignKey("storage_service.id"), nullable=False
)
aips = db.relationship("AIP", cascade="all,delete", backref="fetch_job", lazy=True)
errors = db.relationship(
"FetchJobError", cascade="all,delete", backref="fetch_job", lazy=True
)

def __init__(
self,
Expand All @@ -280,6 +285,16 @@ def __repr__(self):
return "<Fetch Job '{}'>".format(self.download_start)


class FetchJobError(db.Model):
__tablename__ = "fetch_job_error"
id = db.Column(db.Integer(), primary_key=True)
fetch_job_id = db.Column(
db.Integer(), db.ForeignKey("fetch_job.id"), nullable=False
)
message = db.Column(db.String(255), index=True, unique=True)
create_date = db.Column(db.DateTime, server_default=func.now())


class Pipeline(db.Model):
__tablename__ = "pipeline"
id = db.Column(db.Integer(), primary_key=True)
Expand Down

0 comments on commit b2b032d

Please sign in to comment.