Skip to content

Commit

Permalink
Add test for JHOVE validation
Browse files Browse the repository at this point in the history
  • Loading branch information
Dhwaniartefact committed Feb 11, 2025
1 parent 827fba4 commit 6d2b183
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 12 deletions.
21 changes: 18 additions & 3 deletions features/black_box/validation.feature
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ Feature: Alma wants to ensure that JHOVE validation in Archivematica works corre
When the transfer compliance is verified
Then the "Identify file format" microservice completes successfully
And the "Validate formats" job fails
And 17 "Validate formats" tasks were executed
And 8 "Validate formats" tasks failed
And 9 "Validate formats" tasks succeeded
And 17 "Validate formats" transfer tasks were executed
And 8 "Validate formats" transfer tasks failed
And 9 "Validate formats" transfer tasks succeeded

Scenario Outline: JHOVE Validation for a transfer files
Given a "standard" transfer type located in "<sample_transfer_path>"
When the transfer compliance is verified
Then the "Identify file format" microservice completes successfully
And the "Validation" microservice is executed
And the AIP can be successfully stored
And there are <validated_objects_count> original objects in the AIP METS with a validation event


Examples: sample transfers
| sample_transfer_path | validated_objects_count |
| SampleTransfers/DemoTransferCSV | 2 |
| SampleTransfers/JHOVEModulesValidation | 16 |

26 changes: 17 additions & 9 deletions features/steps/black_box_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1529,17 +1529,25 @@ def step(context):
assert uses_order_indexes == sorted(uses_order_indexes), error


@then('{task_count:d} "{job_name}" tasks were executed')
def step_impl(context, task_count, job_name):
unit_uuid = context.current_transfer["transfer_uuid"]
def get_uuid_value(context, unit_type):
return (
context.current_transfer["transfer_uuid"]
if unit_type == "transfer"
else context.current_transfer["sip_uuid"]
)


@then('{task_count:d} "{job_name}" {unit_type} tasks were executed')
def step_impl(context, task_count, job_name, unit_type):
unit_uuid = get_uuid_value(context, unit_type)
jobs = utils.get_jobs(context.api_clients_config, unit_uuid, job_name=job_name)
assert len(jobs), f"No jobs found for unit {unit_uuid}"
assert len(jobs[0]["tasks"]) == task_count


@then('{task_count:d} "{job_name}" tasks failed')
def step_impl(context, job_name, task_count):
unit_uuid = context.current_transfer["transfer_uuid"]
@then('{task_count:d} "{job_name}" {unit_type} tasks failed')
def step_impl(context, job_name, task_count, unit_type):
unit_uuid = get_uuid_value(context, unit_type)
jobs = utils.get_jobs(context.api_clients_config, unit_uuid, job_name=job_name)
assert len(jobs), f"No jobs found for unit {unit_uuid}"

Expand All @@ -1551,9 +1559,9 @@ def step_impl(context, job_name, task_count):
assert fail_task == task_count


@then('{task_count:d} "{job_name}" tasks succeeded')
def step_impl(context, job_name, task_count):
unit_uuid = context.current_transfer["transfer_uuid"]
@then('{task_count:d} "{job_name}" {unit_type} tasks succeeded')
def step_impl(context, job_name, task_count, unit_type):
unit_uuid = get_uuid_value(context, unit_type)
jobs = utils.get_jobs(context.api_clients_config, unit_uuid, job_name=job_name)
assert len(jobs), f"No jobs found for unit {unit_uuid}"

Expand Down

0 comments on commit 6d2b183

Please sign in to comment.