Skip to content

Commit

Permalink
ingestion error responds with warnings on schema changes
Browse files Browse the repository at this point in the history
  • Loading branch information
v-rocheleau committed Sep 19, 2023
1 parent 0c1cb1b commit 9f8ff0c
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 35 deletions.
99 changes: 90 additions & 9 deletions chord_metadata_service/chord/ingest/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,111 @@
from typing import List
from typing import List, Optional
from jsonschema.exceptions import ValidationError
from chord_metadata_service import __version__
from chord_metadata_service.experiments.schemas import EXPERIMENT_SCHEMA_CHANGES
from chord_metadata_service.chord.data_types import DATA_TYPE_EXPERIMENT, DATA_TYPE_PHENOPACKET

__all__ = [
"IngestError",
]


def parse_validation_errors(errors: List[ValidationError]):
DATA_TYPE_SCHEMA_CHANGES = {
DATA_TYPE_EXPERIMENT: EXPERIMENT_SCHEMA_CHANGES,
DATA_TYPE_PHENOPACKET: None
}


def parse_validation_errors(errors: List[ValidationError]) -> Optional[List[dict]]:
"""
Accepts a list of jsonschema ValidationError and converts them to a client error format.
Parameters:
errors (List[ValidationError]): errors raised by jsonschema during validation
Returns:
List[dict]:
dict:
schema_path (str): Schema path string (e.g "properties.library_strategy")
faulty_value (str | obj): The value at the schema_path causing the error
property_schema (dict): JSON schema of the property (includes valid options)
message (str): The ValidationError.message
"""
error_descriptions = []
for error in errors:
schema_path = ".".join(error.schema_path)
error_descriptions.append({
"schema_path": schema_path,
"faulty_value": error.instance,
"valid_options": error.validator_value,
"field_schema": error.schema,
"message": error.message,
"property_schema": error.schema,
})
return error_descriptions
return error_descriptions if len(error_descriptions) else None


class IngestError(Exception):
def parse_property_warnings(data: dict, prop_name: str, property_changes: List[tuple]) -> Optional[dict]:
for (old_value, new_value) in property_changes:
value = data[prop_name]
property_warning = {
"property_name": prop_name,
"property_value": value,
"deprecated_value": old_value,
"suggested_replacement": new_value,
}

if value == old_value:
# Naive comparison for dicts
return property_warning

if isinstance(value, str) and isinstance(old_value, str):
# Lower case comparison for string values (JSON schema enum)
if value.lower() == old_value.lower():
return property_warning

Check warning on line 61 in chord_metadata_service/chord/ingest/exceptions.py

View check run for this annotation

Codecov / codecov/patch

chord_metadata_service/chord/ingest/exceptions.py#L61

Added line #L61 was not covered by tests

# Only warn when mecessary
return None


def __init__(self, schema_validation_errors: List[ValidationError]=[], message="An error occured during ingestion."):
def parse_schema_warnings(data: dict, schema: dict) -> Optional[List[dict]]:
"""
Schema warnings are issued on Katsu releases that include schema changes.
Warnings are returned to highlight schema changes that may be the root cause of an IngestionError.
Parameters:
data (dict): the data submitted for ingestion
Returns:
List[dict]:
dict:
property_name (str): The name of the property
property_value (str | dict)
deprecated_value (str | dict): The deprecated property option
suggested_replacement (str | dict): The new suggested property option
"""
if not data or not schema:
return None

data_type = schema.get("$id", "").split(":")[-1]
applicable_changes = DATA_TYPE_SCHEMA_CHANGES.get(data_type, None)

if not applicable_changes or __version__ not in applicable_changes:
# Skip if data type's schema is not affected in current Katsu version
return None

Check warning on line 91 in chord_metadata_service/chord/ingest/exceptions.py

View check run for this annotation

Codecov / codecov/patch

chord_metadata_service/chord/ingest/exceptions.py#L91

Added line #L91 was not covered by tests

warnings = []
for (prop_name, changes) in applicable_changes[__version__].get("properties", {}).items():
property_warning = parse_property_warnings(data=data, prop_name=prop_name, property_changes=changes)
if property_warning:
warnings.append(property_warning)
return warnings if len(warnings) else None


class IngestError(Exception):

errors_descriptions = parse_validation_errors(schema_validation_errors)
def __init__(self,
data: dict = None,
schema: dict = None,
schema_validation_errors: List[ValidationError] = [],
message="An error occured during ingestion."):

self.validation_errors = errors_descriptions
self.validation_errors = parse_validation_errors(schema_validation_errors)
self.schema_warnings = parse_schema_warnings(data=data, schema=schema)
self.message = message
3 changes: 2 additions & 1 deletion chord_metadata_service/chord/ingest/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ def validate_experiment(experiment_data, idx: Optional[int] = None) -> None:
# Validate experiment data against experiments schema.
val_errors = schema_validation(experiment_data, EXPERIMENT_SCHEMA)
if val_errors:
# TODO: Report more precise errors
raise IngestError(
data=experiment_data,
schema=EXPERIMENT_SCHEMA,
schema_validation_errors=val_errors,
message=f"Failed schema validation for experiment{(' ' + str(idx)) if idx is not None else ''} "
f"(check Katsu logs for more information)"
Expand Down
28 changes: 16 additions & 12 deletions chord_metadata_service/chord/ingest/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from typing import List

from bento_lib.schemas.bento import BENTO_INGEST_SCHEMA
from bento_lib.responses import errors

from . import WORKFLOW_INGEST_FUNCTION_MAP
from .exceptions import IngestError
Expand All @@ -33,7 +32,7 @@ def __init__(self, workflow_id: str, dataset_id: str):
self.workflow_id = workflow_id
self.dataset_id = dataset_id
self.success = False
self.errors = []
self.errors = []
self.warnings = []

def set_success(self, success: bool):
Expand All @@ -42,12 +41,21 @@ def set_success(self, success: bool):
def add_error(self, error):
self.errors.append(error)

def add_errors(self, errors: List[any]):
def add_errors(self, errors: List):
self.errors.extend(errors)

def add_warning(self, warnings: List[any]):
def add_warning(self, warnings: List):
self.warnings.extend(warnings)

Check warning on line 48 in chord_metadata_service/chord/ingest/views.py

View check run for this annotation

Codecov / codecov/patch

chord_metadata_service/chord/ingest/views.py#L48

Added line #L48 was not covered by tests

def add_ingest_error(self, error: IngestError):
if error.validation_errors:
self.add_errors(error.validation_errors)
else:
self.add_error(error.message)

Check warning on line 54 in chord_metadata_service/chord/ingest/views.py

View check run for this annotation

Codecov / codecov/patch

chord_metadata_service/chord/ingest/views.py#L54

Added line #L54 was not covered by tests

if error.schema_warnings:
self.warnings.extend(error.schema_warnings)

Check warning on line 57 in chord_metadata_service/chord/ingest/views.py

View check run for this annotation

Codecov / codecov/patch

chord_metadata_service/chord/ingest/views.py#L57

Added line #L57 was not covered by tests

def as_response(self, status_code: int):
body = {
"success": self.success,
Expand Down Expand Up @@ -82,10 +90,7 @@ def ingest_into_dataset(request, dataset_id: str, workflow_id: str):
WORKFLOW_INGEST_FUNCTION_MAP[workflow_id](request.data, dataset_id)

except IngestError as e:
if e.validation_errors:
response_builder.add_errors(e.validation_errors)
else:
response_builder.add_error(e.message)
response_builder.add_ingest_error(e)
return response_builder.as_response(400)

except ValidationError as e:
Expand All @@ -94,11 +99,10 @@ def ingest_into_dataset(request, dataset_id: str, workflow_id: str):

except Exception as e:
# Encountered some other error from the ingestion attempt, return a somewhat detailed message
error_message = f"Encountered an exception while processing an ingest attempt:\n{traceback.format_exc()}"
logger.error(error_message)
response_builder.add_error(error_message)
logger.error(f"Encountered an exception while processing an ingest attempt:\n{traceback.format_exc()}")
response_builder.add_error(f"Encountered an exception while processing an ingest attempt (error: {repr(e)})")
return response_builder.as_response(500)

# return Response(status=204)
response_builder.set_success(True)
return response_builder.as_response(204)
18 changes: 5 additions & 13 deletions chord_metadata_service/experiments/migrations/0009_v4_1_0.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
from typing import List, Tuple
from django.db import migrations
from chord_metadata_service.experiments.schemas import EXPERIMENT_SCHEMA_CHANGES

LIB_STRATEGY_CONVERSIONS: List[Tuple[str, str]] = [
# Convert WES -> WXS ...
("WES", "WXS"),
("Other", "OTHER"),
]

LIB_SELECTION_CONVERIONS: List[Tuple[str, str]] = [
("Random", "RANDOM"),
("Random PCR", "RANDOM PCR"),
("Exome capture", "Hybrid Selection"), # 'Exome capture' no longer supported
("Other", "other"),
]
V4_1_0_PROPERTIES = EXPERIMENT_SCHEMA_CHANGES["4.1.0"]["properties"]
LIB_STRATEGY_CONVERSIONS = V4_1_0_PROPERTIES["library_strategy"]
LIB_SELECTION_CONVERIONS = V4_1_0_PROPERTIES["library_selection"]


def set_experiment_library(apps, _schema_editor):
Expand All @@ -22,7 +14,7 @@ def set_experiment_library(apps, _schema_editor):
for exp in Experiment.objects.filter(library_strategy=old_val):
exp.library_strategy = new_val
exp.save()

for (old_val, new_val) in LIB_SELECTION_CONVERIONS:
# Modify library_selection if necessary
for exp in Experiment.objects.filter(library_selection=old_val):
Expand Down
21 changes: 21 additions & 0 deletions chord_metadata_service/experiments/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,24 @@
},
"required": ["id", "experiment_type"]
}, EXPERIMENT)


"""
Dictionary of schema changes for warnings.
"""
EXPERIMENT_SCHEMA_CHANGES = {
"4.1.0": {
"properties": {
"library_strategy": [
("WES", "WXS"),
("Other", "OTHER"),
],
"library_selection": [
("Random", "RANDOM"),
("Random PCR", "RANDOM PCR"),
("Exome capture", "Hybrid Selection"),
("Other", "other"),
]
}
}
}

0 comments on commit 9f8ff0c

Please sign in to comment.