Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: experiment library schema changes, ingestion report with errors, json schema warnings #435

Open
wants to merge 26 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e9cf81d
xml parsing
v-rocheleau Sep 13, 2023
2f765da
xsd ontologies utils
v-rocheleau Sep 14, 2023
e479e4c
lint
v-rocheleau Sep 14, 2023
26becc3
experiment library strategy data migration
v-rocheleau Sep 14, 2023
762d23a
xsd dir name change
v-rocheleau Sep 14, 2023
a32bd01
fix poetry.lock
v-rocheleau Sep 14, 2023
1390066
code clean
v-rocheleau Sep 14, 2023
d8d9a85
lint
v-rocheleau Sep 14, 2023
d030cb6
read library selection, add doc
v-rocheleau Sep 14, 2023
48ff585
fix test data
v-rocheleau Sep 14, 2023
2a93fe4
fix migration type
v-rocheleau Sep 14, 2023
eb93de0
migration fix, lint
v-rocheleau Sep 15, 2023
fae4540
add validation error descriptions to IngestError
v-rocheleau Sep 18, 2023
0c1cb1b
ingest response format
v-rocheleau Sep 18, 2023
9f8ff0c
ingestion error responds with warnings on schema changes
v-rocheleau Sep 19, 2023
2194a08
update api ingestion tests
v-rocheleau Sep 20, 2023
948f6bc
lint
v-rocheleau Sep 20, 2023
db378d2
add ingestion tests, exp workflow payload schema
v-rocheleau Sep 20, 2023
881f792
infer success from status code
v-rocheleau Sep 20, 2023
b359973
remove line call
v-rocheleau Sep 20, 2023
9b58dc4
get ingestion warnings from derived experiment results ingestion
v-rocheleau Sep 20, 2023
dd0f66c
save ingest report to file and output
v-rocheleau Sep 20, 2023
74e55fa
Merge branch 'develop' into features/library-strategies
v-rocheleau Jan 18, 2024
f9fa0f9
fix migrations, lint
v-rocheleau Jan 18, 2024
4148d36
schema changes version update
v-rocheleau Jan 18, 2024
bbef730
fix api ingest tests
v-rocheleau Jan 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ Katsu Metadata Service is a service to store epigenomic metadata.
6. Rest api service handles all generic functionality shared among other services


## Schemas
### Clinical Data

Katsu implements the [Phenopacket V1.0.0](https://phenopacket-schema.readthedocs.io/en/1.0.0/) schema for clinical data.

The schema definition for the phenopacket object is located in [chord_metadata_service/phenopackets/schemas.py](https://github.com/bento-platform/katsu/blob/4ab3c55d6052994ef69b188fb872261c47de24e0/chord_metadata_service/phenopackets/schemas.py#L336).

### Experiments

Katsu's experiments schemas are based on the IHEC [schema](https://github.com/IHEC/ihec-ecosystems/blob/master/docs/metadata/2.0/Ihec_metadata_specification.md#experiments), which is based on EBI/SRA schemas.

The value options for `library_strategy` and `library_selection` are read from [chord_metadata_service/ontologies/xsd/SRA.experiment.xsd.xml](./chord_metadata_service/ontologies/xsd/SRA.experiment.xsd.xml), downloaded from the EBI's [SRA v1.5 database](http://ftp.ebi.ac.uk/pub/databases/ena/doc/xsd/sra_1_5/).

The `SRA.experiment.xsd.xml` file is licensed under Apache License V2.0, the full copyright text is included in the file's header.

## REST API highlights

* Swagger schema docs can be found
Expand Down
108 changes: 107 additions & 1 deletion chord_metadata_service/chord/ingest/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,113 @@
from typing import List, Optional
from jsonschema.exceptions import ValidationError
from chord_metadata_service import __version__
from chord_metadata_service.experiments.schemas import EXPERIMENT_SCHEMA_CHANGES
from chord_metadata_service.chord.data_types import DATA_TYPE_EXPERIMENT, DATA_TYPE_PHENOPACKET

__all__ = [
"IngestError",
]


DATA_TYPE_SCHEMA_CHANGES = {
DATA_TYPE_EXPERIMENT: EXPERIMENT_SCHEMA_CHANGES,
DATA_TYPE_PHENOPACKET: None
}


def parse_validation_errors(errors: List[ValidationError]) -> Optional[List[dict]]:
"""
Accepts a list of jsonschema ValidationError and converts them to a client error format.

Parameters:
errors (List[ValidationError]): errors raised by jsonschema during validation
Returns:
List[dict]:
dict:
schema_path (str): Schema path string (e.g "properties.library_strategy")
faulty_value (str | obj): The value at the schema_path causing the error
property_schema (dict): JSON schema of the property (includes valid options)
message (str): The ValidationError.message
"""
error_descriptions = []
for error in errors:
schema_path = ".".join(error.schema_path)
error_descriptions.append({
"schema_path": schema_path,
"faulty_value": error.instance,
"message": error.message,
"property_schema": error.schema,
})
return error_descriptions if len(error_descriptions) else None


def parse_property_warnings(data: dict, prop_name: str, property_changes: List[tuple]) -> Optional[dict]:
for (old_value, new_value) in property_changes:
value = data[prop_name]
property_warning = {
"property_name": prop_name,
"property_value": value,
"deprecated_value": old_value,
"suggested_replacement": new_value,
}

if value == old_value:
# Naive comparison for dicts
return property_warning

if isinstance(value, str) and isinstance(old_value, str):
# Lower case comparison for string values (JSON schema enum)
if value.lower() == old_value.lower():
return property_warning

# Only warn when necessary
return None


def parse_schema_warnings(data: dict, schema: dict) -> Optional[List[dict]]:
"""
Schema warnings are issued on Katsu releases that include schema changes.
Warnings are returned to highlight schema changes that may be the root cause of an IngestionError.

Parameters:
data (dict): the data submitted for ingestion

Returns:
List[dict]:
dict:
property_name (str): The name of the property
property_value (str | dict)
deprecated_value (str | dict): The deprecated property option
suggested_replacement (str | dict): The new suggested property option
version (str): The Katsu release version associated with the schema change
"""
if not data or not schema:
return None

data_type = schema.get("$id", "").split("/")[-1]
applicable_changes = DATA_TYPE_SCHEMA_CHANGES.get(data_type, None)

if not applicable_changes or __version__ not in applicable_changes:
# Skip if data type's schema is not affected in current Katsu version
return None

warnings = []
for (version, version_changes) in applicable_changes.items():
for (prop_name, changes) in version_changes.get("properties", {}).items():
if property_warning := parse_property_warnings(data, prop_name, changes):
property_warning["version"] = version
warnings.append(property_warning)
return warnings if len(warnings) else None


class IngestError(Exception):
pass

def __init__(self,
data: dict = None,
schema: dict = None,
schema_validation_errors: List[ValidationError] = [],
message="An error occured during ingestion."):

self.validation_errors = parse_validation_errors(schema_validation_errors)
self.schema_warnings = parse_schema_warnings(data=data, schema=schema)
self.message = message
42 changes: 31 additions & 11 deletions chord_metadata_service/chord/ingest/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from chord_metadata_service.chord.models import Dataset
from chord_metadata_service.experiments import models as em
from chord_metadata_service.experiments.schemas import EXPERIMENT_SCHEMA, EXPERIMENT_RESULT_SCHEMA
from chord_metadata_service.experiments.schemas import EXPERIMENT_SCHEMA, \
EXPERIMENT_RESULT_SCHEMA, EXPERIMENT_WORKFLOW_SCHEMA
from chord_metadata_service.phenopackets import models as pm

from typing import Optional
Expand Down Expand Up @@ -56,12 +57,24 @@ def create_experiment_result(er: dict) -> em.ExperimentResult:

def validate_experiment(experiment_data, idx: Optional[int] = None) -> None:
# Validate experiment data against experiments schema.
validation = schema_validation(experiment_data, EXPERIMENT_SCHEMA)
if not validation:
# TODO: Report more precise errors
if val_errors := schema_validation(experiment_data, EXPERIMENT_SCHEMA):
raise IngestError(
f"Failed schema validation for experiment{(' ' + str(idx)) if idx is not None else ''} "
f"(check Katsu logs for more information)")
data=experiment_data,
schema=EXPERIMENT_SCHEMA,
schema_validation_errors=val_errors,
message=f"Failed schema validation for experiment{(' ' + str(idx)) if idx is not None else ''} "
f"(check Katsu logs for more information)"
)


def validate_experiment_workflow(json_data: dict) -> None:
if val_errors := schema_validation(json_data, EXPERIMENT_WORKFLOW_SCHEMA):
raise IngestError(
data=json_data,
schema=EXPERIMENT_WORKFLOW_SCHEMA,
schema_validation_errors=val_errors,
message="Failed schema validation for experiments ingestion workflow payload.",
)


def ingest_experiment(
Expand Down Expand Up @@ -139,14 +152,17 @@ def ingest_experiment(


def ingest_experiments_workflow(json_data, dataset_id: str) -> list[em.Experiment]:
# First, validate the workflow's json_data
validate_experiment_workflow(json_data)

dataset = Dataset.objects.get(identifier=dataset_id)

for rs in json_data.get("resources", []):
dataset.additional_resources.add(ingest_resource(rs))

exps = json_data.get("experiments", [])

# First, validate all experiments with the schema before creating anything in the database.
# Second, validate all experiments with the schema before creating anything in the database.
for idx, exp in enumerate(exps):
validate_experiment(exp, idx)

Expand All @@ -164,12 +180,16 @@ def ingest_derived_experiment_results(json_data: list[dict]) -> list[em.Experime
# First, validate all experiment results with the schema before creating anything in the database.

for idx, exp_result in enumerate(json_data):
validation = schema_validation(exp_result, EXPERIMENT_RESULT_SCHEMA)
if not validation:
val_errors = schema_validation(exp_result, EXPERIMENT_RESULT_SCHEMA)
v-rocheleau marked this conversation as resolved.
Show resolved Hide resolved
if val_errors:
# TODO: Report more precise errors
raise IngestError(
f"Failed schema validation for experiment result {idx} "
f"(check Katsu logs for more information)")
data=exp_result,
schema=EXPERIMENT_RESULT_SCHEMA,
schema_validation_errors=val_errors,
message=f"Failed schema validation for experiment result {idx} "
f"(check Katsu logs for more information)"
)

# If everything passes, perform the actual ingestion next.

Expand Down
12 changes: 8 additions & 4 deletions chord_metadata_service/chord/ingest/phenopackets.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,16 @@ def validate_phenopacket(phenopacket_data: dict[str, Any],
idx: Optional[int] = None) -> None:
# Validate phenopacket data against phenopackets schema.
# validation = schema_validation(phenopacket_data, PHENOPACKET_SCHEMA)
validation = schema_validation(phenopacket_data, schema, registry=VRS_REF_REGISTRY)
if not validation:
val_errors = schema_validation(phenopacket_data, schema, registry=VRS_REF_REGISTRY)
if val_errors:
# TODO: Report more precise errors
raise IngestError(
f"Failed schema validation for phenopacket{(' ' + str(idx)) if idx is not None else ''} "
f"(check Katsu logs for more information)")
data=phenopacket_data,
schema=PHENOPACKET_SCHEMA,
schema_validation_errors=val_errors,
message=f"Failed schema validation for phenopacket{(' ' + str(idx)) if idx is not None else ''} "
f"(check Katsu logs for more information)"
)


def update_or_create_subject(subject: dict) -> pm.Individual:
Expand Down
4 changes: 2 additions & 2 deletions chord_metadata_service/chord/ingest/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ def schema_validation(obj, schema, registry=None):
try:
validator.validate(obj, schema)
logger.info("JSON schema validation passed.")
return True
return None
except ValidationError:
errors = [e for e in validator.iter_errors(obj)]
logger.info("JSON schema validation failed.")
for i, error in enumerate(errors, 1):
logger.error(f"{i} Validation error in {'.'.join(str(v) for v in error.path)}: {error.message}")
return False
return errors
61 changes: 49 additions & 12 deletions chord_metadata_service/chord/ingest/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

from django.core.exceptions import ValidationError
from django.db import transaction
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
from rest_framework.response import Response

from bento_lib.responses import errors
from typing import List

from . import WORKFLOW_INGEST_FUNCTION_MAP
from .exceptions import IngestError
Expand All @@ -23,18 +23,55 @@
logger = logging.getLogger(__name__)


class IngestResponseBuilder:

def __init__(self, workflow_id: str, dataset_id: str):
self.workflow_id = workflow_id
self.dataset_id = dataset_id
self.errors = []
self.warnings = []

def add_error(self, error):
self.errors.append(error)

def add_errors(self, errors: List):
self.errors.extend(errors)

def add_ingest_error(self, error: IngestError):
if error.validation_errors:
self.add_errors(error.validation_errors)
else:
self.add_error(error.message)

if error.schema_warnings:
self.warnings.extend(error.schema_warnings)

def as_response(self, status_code: int) -> Response:
body = {
"success": status_code < status.HTTP_400_BAD_REQUEST,
"warnings": self.warnings,
"errors": self.errors,
}
logger.info(f"Finished {self.workflow_id} ingest request for dataset {self.dataset_id}", body)
return Response(body, status=status_code)


@api_view(["POST"])
@permission_classes([AllowAny])
def ingest_into_dataset(request, dataset_id: str, workflow_id: str):
logger.info(f"Received a {workflow_id} ingest request for dataset {dataset_id}.")

response_builder = IngestResponseBuilder(workflow_id=workflow_id, dataset_id=dataset_id)

# Check that the workflow exists
if workflow_id not in WORKFLOW_INGEST_FUNCTION_MAP:
return Response(errors.bad_request_error(f"Ingestion workflow ID {workflow_id} does not exist"), status=400)
response_builder.add_error(f"Ingestion workflow ID {workflow_id} does not exist")
return response_builder.as_response(status.HTTP_400_BAD_REQUEST)

if dataset_id not in DATASET_ID_OVERRIDES:
if not Dataset.objects.filter(identifier=dataset_id).exists():
return Response(errors.bad_request_error(f"Dataset with ID {dataset_id} does not exist"), status=400)
response_builder.add_error(f"Dataset with ID {dataset_id} does not exist")
return response_builder.as_response(status.HTTP_400_BAD_REQUEST)
dataset_id = str(uuid.UUID(dataset_id)) # Normalize dataset ID to UUID's str format.

try:
Expand All @@ -43,17 +80,17 @@ def ingest_into_dataset(request, dataset_id: str, workflow_id: str):
WORKFLOW_INGEST_FUNCTION_MAP[workflow_id](request.data, dataset_id)

except IngestError as e:
return Response(errors.bad_request_error(f"Encountered ingest error: {e}"), status=400)
response_builder.add_ingest_error(e)
return response_builder.as_response(status.HTTP_400_BAD_REQUEST)

except ValidationError as e:
return Response(errors.bad_request_error(
"Encountered validation errors during ingestion",
*(e.error_list if hasattr(e, "error_list") else e.error_dict.items()),
))
response_builder.add_errors(e.error_list if hasattr(e, "error_list") else e.error_dict.items())
return response_builder.as_response(status.HTTP_400_BAD_REQUEST)

except Exception as e:
# Encountered some other error from the ingestion attempt, return a somewhat detailed message
logger.error(f"Encountered an exception while processing an ingest attempt:\n{traceback.format_exc()}")
return Response(errors.internal_server_error(f"Encountered an exception while processing an ingest attempt "
f"(error: {repr(e)}"), status=500)
return Response(status=204)
response_builder.add_error(f"Encountered an exception while processing an ingest attempt (error: {repr(e)})")
return response_builder.as_response(status.HTTP_500_INTERNAL_SERVER_ERROR)

return response_builder.as_response(status.HTTP_201_CREATED)
4 changes: 2 additions & 2 deletions chord_metadata_service/chord/tests/example_experiment.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
],
"library_strategy": "ChIP-Seq",
"library_source": "Genomic",
"library_selection": "Random",
"library_selection": "RANDOM",
"library_layout": "Single",
"extraction_protocol": "NGS",
"molecule": "genomic DNA",
Expand Down Expand Up @@ -75,4 +75,4 @@
"url": "http://purl.obolibrary.org/obo/so.owl"
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
],
"library_strategy": "ChIP-Seq",
"library_source": "Genomic",
"library_selection": "Random",
"library_selection": "RANDOM",
"library_layout": "Single",
"extraction_protocol": "NGS",
"molecule": "genomic DNA",
Expand Down Expand Up @@ -75,4 +75,4 @@
"url": "http://purl.obolibrary.org/obo/so.owl"
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
"label": "ChIP-seq"
}
],
"library_strategy": "ChIP-Seq",
"library_strategy": "WES",
"library_source": "Genomic",
"library_selection": "Random",
"library_selection": "random",
"library_layout": "Single",
"extraction_protocol": "NGS",
"molecule": "genomic DNA",
Expand Down Expand Up @@ -75,4 +75,4 @@
"url": "http://purl.obolibrary.org/obo/so.owl"
}
]
}
}
Loading