Skip to content

Commit

Permalink
Add fetch error reporting (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcantelon committed Mar 21, 2024
1 parent f3313fd commit ac54b64
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 14 deletions.
11 changes: 11 additions & 0 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

from dateutil.parser import ParserError, parse

from AIPscan import db
from AIPscan.Aggregator.types import StorageServicePackage
from AIPscan.models import FetchJobError


def format_api_url_with_limit_offset(storage_service):
Expand Down Expand Up @@ -158,3 +160,12 @@ def write_mets(http_response, package_uuid, subdir):
with open(download_file, "wb") as file:
file.write(http_response.content)
return download_file


def store_fetch_job_error_infomation(fetch_job_id, message):
fetch_error = FetchJobError()
fetch_error.fetch_job_id = fetch_job_id
fetch_error.message = message

db.session.add(fetch_error)
db.session.commit()
41 changes: 28 additions & 13 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
format_api_url_with_limit_offset,
parse_package_list_file,
process_package_object,
store_fetch_job_error_infomation,
)
from AIPscan.extensions import celery
from AIPscan.helpers import file_sha256_hash
Expand Down Expand Up @@ -63,13 +64,20 @@ def start_mets_task(
celery database.
"""
storage_service = StorageService.query.get(storage_service_id)
storage_location = database_helpers.create_or_update_storage_location(
current_location, storage_service
)

pipeline = database_helpers.create_or_update_pipeline(
origin_pipeline, storage_service
)
try:
storage_location = database_helpers.create_or_update_storage_location(

Check warning on line 69 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L68-L69

Added lines #L68 - L69 were not covered by tests
current_location, storage_service
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

Check warning on line 73 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L72-L73

Added lines #L72 - L73 were not covered by tests

try:
pipeline = database_helpers.create_or_update_pipeline(

Check warning on line 76 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L75-L76

Added lines #L75 - L76 were not covered by tests
origin_pipeline, storage_service
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

Check warning on line 80 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L79-L80

Added lines #L79 - L80 were not covered by tests

args = [
package_uuid,
Expand Down Expand Up @@ -168,6 +176,8 @@ def workflow_coordinator(
break

if isinstance(package_lists_task.info, TaskError):
store_fetch_job_error_infomation(fetch_job_id, str(package_lists_task.info))

Check warning on line 179 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L179

Added line #L179 was not covered by tests

# Re-raise.
raise (package_lists_task.info)

Expand Down Expand Up @@ -315,13 +325,18 @@ def get_mets(

# Download METS file
storage_service = StorageService.query.get(storage_service_id)
download_file = download_mets(
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)

try:
download_file = download_mets(
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)
except Exception as err:
store_fetch_job_error_infomation(fetch_job_id, str(err))

Check warning on line 338 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L337-L338

Added lines #L337 - L338 were not covered by tests

mets_name = os.path.basename(download_file)
mets_hash = file_sha256_hash(download_file)

Expand Down
2 changes: 2 additions & 0 deletions AIPscan/Aggregator/templates/storage_service.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<th><strong>Download duration</strong></th>
<th><strong>Packages in SS</strong></th>
<th><strong>New AIPs added</strong></th>
<th><strong>Errors</strong></th>
<th><strong>Action</strong></th>
</tr>
</thead>
Expand All @@ -76,6 +77,7 @@
</div>
</td>
<td>{{ mets_fetch_job.aips|length }}</td>
<td>{{ mets_fetch_job.errors|length }}</td>
<td>
<a href="{{ url_for('aggregator.delete_fetch_job', fetch_job_id=mets_fetch_job.id) }}"><button type="button" class="btn btn-danger">Delete</button></a>
</td>
Expand Down
8 changes: 7 additions & 1 deletion AIPscan/Aggregator/tests/test_task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pytest

from AIPscan import models
from AIPscan import models, test_helpers
from AIPscan.Aggregator import task_helpers
from AIPscan.Aggregator.types import StorageServicePackage

Expand Down Expand Up @@ -305,3 +305,9 @@ def test_process_package_object(packages, idx, storage_service_package):
"""
package_obj = task_helpers.process_package_object(packages[idx])
assert package_obj == storage_service_package, idx


def test_store_fetch_job_error_infomation(app_instance):
fetch_job = test_helpers.create_test_fetch_job()

task_helpers.store_fetch_job_error_infomation(fetch_job.id, "Test")
15 changes: 15 additions & 0 deletions AIPscan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import re
from datetime import date, datetime

from sqlalchemy.sql import func

from AIPscan import db

UUID_REGEX = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
Expand Down Expand Up @@ -257,6 +259,9 @@ class FetchJob(db.Model):
db.Integer(), db.ForeignKey("storage_service.id"), nullable=False
)
aips = db.relationship("AIP", cascade="all,delete", backref="fetch_job", lazy=True)
errors = db.relationship(
"FetchJobError", cascade="all,delete", backref="fetch_job", lazy=True
)

def __init__(
self,
Expand All @@ -280,6 +285,16 @@ def __repr__(self):
return "<Fetch Job '{}'>".format(self.download_start)


class FetchJobError(db.Model):
__tablename__ = "fetch_job_error"
id = db.Column(db.Integer(), primary_key=True)
fetch_job_id = db.Column(
db.Integer(), db.ForeignKey("fetch_job.id"), nullable=False
)
message = db.Column(db.String(255), index=True, unique=True)
create_date = db.Column(db.DateTime, server_default=func.now())


class Pipeline(db.Model):
__tablename__ = "pipeline"
id = db.Column(db.Integer(), primary_key=True)
Expand Down

0 comments on commit ac54b64

Please sign in to comment.