Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMB 2091 - Added FHIR library and schema for validation checks #215

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 131 additions & 94 deletions backend/poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ cffi = "~1.16.0"
jsonpath-ng = "^1.6.0"
simplejson = "^3.19.2"
structlog = "^24.1.0"
marshmallow = "^3.21.3"

[build-system]
requires = ["poetry-core ~= 1.5.0"]
Expand Down
1 change: 1 addition & 0 deletions backend/src/create_imms_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def create_imms_handler(event, context):

def create_immunization(event, controller: FhirController):
try:
print(f"aws_event:{event}")
return controller.create_immunization(event)
except Exception as e:
exp_error = create_operation_outcome(resource_id=str(uuid.uuid4()), severity=Severity.error,
Expand Down
4 changes: 4 additions & 0 deletions backend/src/fhir_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def get_immunization_by_id(self, aws_event) -> dict:

def create_immunization(self, aws_event):
if response := self.authorize_request(EndpointOperation.CREATE, aws_event):
print(f"awsevent:{aws_event}")
return response

try:
Expand Down Expand Up @@ -151,14 +152,17 @@ def create_immunization(self, aws_event):
)

try:
print("1")
resource = self.fhir_service.create_immunization(imms,imms_vax_type_perms,app_id)
print(f"resource:{resource}")
if "diagnostics" in resource:
exp_error = create_operation_outcome(
resource_id=str(uuid.uuid4()),
severity=Severity.error,
code=Code.invariant,
diagnostics=resource["diagnostics"],
)
print(exp_error)
return self.create_response(400, json.dumps(exp_error))
location = f"{get_service_url()}/Immunization/{resource.id}"
return self.create_response(201, None, {"Location": location})
Expand Down
2 changes: 2 additions & 0 deletions backend/src/fhir_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ def get_immunization_by_id_all(self, imms_id: str,imms: Optional[dict],app_id: s

def create_immunization(self, immunization: dict, imms_vax_type_perms, app_id) -> Immunization:
try:
print("2")
self.validator.validate(immunization)
except (ValidationError, ValueError, MandatoryError, NotApplicableError) as error:
print(f"error:{error}")
raise CustomValidationError(message=str(error)) from error
patient = self._validate_patient(immunization)

Expand Down
79 changes: 53 additions & 26 deletions backend/src/models/fhir_immunization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from fhir.resources.R4B.immunization import Immunization
from models.fhir_immunization_pre_validators import PreValidators
from models.fhir_immunization_post_validators import PostValidators
from models.utils.generic_utils import get_generic_questionnaire_response_value
from models.utils.generic_utils import get_generic_questionnaire_response_value,extract_value
from models.schema import ImmunizationSchema
import json


class ImmunizationValidator:
Expand All @@ -15,28 +17,56 @@ def __init__(self, add_post_validators: bool = True) -> None:
self.add_post_validators = add_post_validators
self.pre_validators: PreValidators
self.post_validators: PostValidators
self.immunization_schema: ImmunizationSchema
self.errors = []

def initialize_immunization_and_run_fhir_validators(self, json_data):
"""Initialize immunization with data after parsing it through the FHIR validator"""
self.immunization = Immunization.parse_obj(json_data)

print("4")
try:
self.immunization = Immunization.parse_obj(json_data)
except Exception as e:
raise ValueError(e)
def initialize_pre_validators(self, immunization):
"""Initialize pre validators with data."""
self.pre_validators = PreValidators(immunization)
self.immunization_schema= ImmunizationSchema(context={'contained': immunization['contained']})
print("5")
errors = self.immunization_schema.validate(immunization)
print(errors)
if errors:
error_list = []
for key, value in errors.items():
if isinstance(value, dict):
for sub_key, sub_value in value.items():
error_list.append({key: {sub_key: sub_value}})
else:
error_list.append({key: value})

# Convert each dictionary in the list to a JSON-formatted string
error_strings = [json.dumps(error) for error in error_list]

# Extract content within the first and last square brackets and update the array
updated_errors = [extract_value(item) for item in error_strings]
# for error_string in error_strings:
# match = re.search(r'\[(.*?)\]', error_string)
# if match:
# updated_errors.append(match.group(1))
error_string = '; '.join(updated_errors)
raise ValueError(error_string)


def initialize_post_validators(self, immunization):
"""Initialize post validators with data"""
self.post_validators = PostValidators(immunization)

def run_pre_validators(self):
"""Run custom pre validators to the data"""
error = self.pre_validators.validate()
if error:
raise ValueError(error)
# def run_pre_validators(self):
# """Run custom pre validators to the data"""
# error = self.pre_validators.validate()
# if error:
# raise ValueError(error)

def run_post_validators(self):
"""Run custom pre validators to the data"""
"""Run custom post validators to the data"""
error = self.post_validators.validate()
if error:
raise ValueError(error)
Expand All @@ -60,24 +90,21 @@ def set_reduce_validation_code(self, json_data):

def validate(self, json_data) -> Immunization:
"""Generate the Immunization model"""
self.set_reduce_validation_code(json_data)

# Pre-FHIR validations
self.initialize_pre_validators(json_data)
try:
self.run_pre_validators()
except Exception as e:
raise e

# self.set_reduce_validation_code(json_data)

print("3")
# FHIR validations
self.initialize_immunization_and_run_fhir_validators(json_data)


# Pre-FHIR validations
self.initialize_pre_validators(json_data)

# Post-FHIR validations
if self.add_post_validators and not self.reduce_validation_code:
self.initialize_post_validators(self.immunization)
try:
self.run_post_validators()
except Exception as e:
raise e
# if self.add_post_validators and not self.reduce_validation_code:
# self.initialize_post_validators(self.immunization)
# try:
# self.run_post_validators()
# except Exception as e:
# raise e

return self.immunization
Loading