From be02c98e39aadb5b11b75c37b08aaef9caebfad0 Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Thu, 29 Aug 2024 15:02:12 +0100 Subject: [PATCH 1/4] Add required extensions into various resources --- fhirflat/resources/immunization.py | 23 +++++-- .../resources/medicationadministration.py | 62 ++++++++++++++++++- fhirflat/resources/medicationstatement.py | 47 +++++++++++++- fhirflat/resources/procedure.py | 21 +++++-- 4 files changed, 143 insertions(+), 10 deletions(-) diff --git a/fhirflat/resources/immunization.py b/fhirflat/resources/immunization.py index da415c5..f332d99 100644 --- a/fhirflat/resources/immunization.py +++ b/fhirflat/resources/immunization.py @@ -15,17 +15,30 @@ from .base import FHIRFlatBase from .extension_types import ( dateTimeExtensionType, + presenceAbsenceType, + prespecifiedQueryType, timingPhaseDetailType, timingPhaseType, ) -from .extensions import timingPhase, timingPhaseDetail +from .extensions import ( + presenceAbsence, + prespecifiedQuery, + timingPhase, + timingPhaseDetail, +) JsonString: TypeAlias = str class Immunization(_Immunization, FHIRFlatBase): extension: list[ - Union[timingPhaseType, timingPhaseDetailType, fhirtypes.ExtensionType] + Union[ + presenceAbsenceType, + prespecifiedQueryType, + timingPhaseType, + timingPhaseDetailType, + fhirtypes.ExtensionType, + ] ] = Field( None, alias="extension", @@ -77,9 +90,11 @@ class Immunization(_Immunization, FHIRFlatBase): def validate_extension_contents(cls, extensions): timing_count = sum(isinstance(item, timingPhase) for item in extensions) detail_count = sum(isinstance(item, timingPhaseDetail) for item in extensions) + pa_count = sum(isinstance(item, presenceAbsence) for item in extensions) + pq_count = sum(isinstance(item, prespecifiedQuery) for item in extensions) - if timing_count > 1 or detail_count > 1: - raise ValueError("timingPhase and timingPhaseDetail can only appear once.") + if timing_count > 1 or detail_count > 1 or pa_count > 1 or pq_count > 1: + raise ValueError("Each extension can only appear once.") if timing_count > 0 and detail_count > 0: raise ValueError( diff --git a/fhirflat/resources/medicationadministration.py b/fhirflat/resources/medicationadministration.py index 138dc26..83b58a8 100644 --- a/fhirflat/resources/medicationadministration.py +++ b/fhirflat/resources/medicationadministration.py @@ -1,7 +1,8 @@ from __future__ import annotations -from typing import ClassVar, TypeAlias +from typing import ClassVar, TypeAlias, Union +from fhir.resources import fhirtypes from fhir.resources.medicationadministration import ( MedicationAdministration as _MedicationAdministration, ) @@ -9,13 +10,53 @@ MedicationAdministrationDosage, MedicationAdministrationPerformer, ) +from pydantic.v1 import Field, validator from .base import FHIRFlatBase +from .extension_types import ( + durationType, + presenceAbsenceType, + prespecifiedQueryType, + timingPhaseDetailType, + timingPhaseType, +) +from .extensions import ( + Duration, + presenceAbsence, + prespecifiedQuery, + timingPhase, + timingPhaseDetail, +) JsonString: TypeAlias = str class MedicationAdministration(_MedicationAdministration, FHIRFlatBase): + extension: list[ + Union[ + presenceAbsenceType, + prespecifiedQueryType, + timingPhaseType, + timingPhaseDetailType, + durationType, + fhirtypes.ExtensionType, + ] + ] = Field( + None, + alias="extension", + title="Additional content defined by implementations", + description=( + """ + Contains the G.H 'presenceAbsence', 'prespecifiedQuery', 'duration' and + 'timingPhaseDetail' extensions, and allows extensions from other + implementations to be included. + """ + ), + # if property is element of this resource. + element_property=True, + union_mode="smart", + ) + # attributes to exclude from the flat representation flat_exclusions: ClassVar[set[str]] = FHIRFlatBase.flat_exclusions | { "id", @@ -28,6 +69,25 @@ class MedicationAdministration(_MedicationAdministration, FHIRFlatBase): # required attributes that are not present in the FHIRflat representation flat_defaults: ClassVar[list[str]] = [*FHIRFlatBase.flat_defaults, "status"] + @validator("extension") + def validate_extension_contents(cls, extensions): + pa_count = sum(isinstance(item, presenceAbsence) for item in extensions) + pq_count = sum(isinstance(item, prespecifiedQuery) for item in extensions) + tp_count = sum(isinstance(item, timingPhase) for item in extensions) + tpd_count = sum(isinstance(item, timingPhaseDetail) for item in extensions) + dur_count = sum(isinstance(item, Duration) for item in extensions) + + if ( + pa_count > 1 + or pq_count > 1 + or tp_count > 1 + or tpd_count > 1 + or dur_count > 1 + ): + raise ValueError("Each extension can can only appear once.") + + return extensions + backbone_elements: ClassVar[dict] = { "performer": MedicationAdministrationPerformer, "dosage": MedicationAdministrationDosage, diff --git a/fhirflat/resources/medicationstatement.py b/fhirflat/resources/medicationstatement.py index 3d19589..a4bfa84 100644 --- a/fhirflat/resources/medicationstatement.py +++ b/fhirflat/resources/medicationstatement.py @@ -1,20 +1,51 @@ from __future__ import annotations -from typing import ClassVar, TypeAlias +from typing import ClassVar, TypeAlias, Union +from fhir.resources import fhirtypes from fhir.resources.medicationstatement import ( MedicationStatement as _MedicationStatement, ) from fhir.resources.medicationstatement import ( MedicationStatementAdherence, ) +from pydantic.v1 import Field, validator from .base import FHIRFlatBase +from .extension_types import ( + presenceAbsenceType, + prespecifiedQueryType, + timingPhaseDetailType, +) +from .extensions import presenceAbsence, prespecifiedQuery, timingPhaseDetail JsonString: TypeAlias = str class MedicationStatement(_MedicationStatement, FHIRFlatBase): + extension: list[ + Union[ + presenceAbsenceType, + prespecifiedQueryType, + timingPhaseDetailType, + fhirtypes.ExtensionType, + ] + ] = Field( + None, + alias="extension", + title="Additional content defined by implementations", + description=( + """ + Contains the G.H 'presenceAbsence', 'prespecifiedQuery' and + 'timingPhaseDetail' extensions, and allows extensions from other + implementations to be included. + """ + ), + # if property is element of this resource. + element_property=True, + union_mode="smart", + ) + # attributes to exclude from the flat representation flat_exclusions: ClassVar[set[str]] = FHIRFlatBase.flat_exclusions | { "id", @@ -28,6 +59,20 @@ class MedicationStatement(_MedicationStatement, FHIRFlatBase): backbone_elements: ClassVar[dict] = {"adherence": MedicationStatementAdherence} + @validator("extension") + def validate_extension_contents(cls, extensions): + pa_count = sum(isinstance(item, presenceAbsence) for item in extensions) + pq_count = sum(isinstance(item, prespecifiedQuery) for item in extensions) + tpd_count = sum(isinstance(item, timingPhaseDetail) for item in extensions) + + if pa_count > 1 or pq_count > 1 or tpd_count > 1: + raise ValueError( + "presenceAbsence, prespecifiedQuery and timingPhaseDetail can only " + "appear once." + ) + + return extensions + @classmethod def cleanup(cls, data: dict) -> dict: """ diff --git a/fhirflat/resources/procedure.py b/fhirflat/resources/procedure.py index 695266e..bd63030 100644 --- a/fhirflat/resources/procedure.py +++ b/fhirflat/resources/procedure.py @@ -16,11 +16,20 @@ from .extension_types import ( dateTimeExtensionType, durationType, + presenceAbsenceType, + prespecifiedQueryType, relativePeriodType, timingPhaseDetailType, timingPhaseType, ) -from .extensions import Duration, relativePeriod, timingPhase, timingPhaseDetail +from .extensions import ( + Duration, + presenceAbsence, + prespecifiedQuery, + relativePeriod, + timingPhase, + timingPhaseDetail, +) JsonString: TypeAlias = str @@ -32,6 +41,8 @@ class Procedure(_Procedure, FHIRFlatBase): timingPhaseType, timingPhaseDetailType, relativePeriodType, + presenceAbsenceType, + prespecifiedQueryType, fhirtypes.ExtensionType, ] ] = Field( @@ -86,16 +97,18 @@ def validate_extension_contents(cls, extensions): tim_phase_count = sum(isinstance(item, timingPhase) for item in extensions) rel_phase_count = sum(isinstance(item, relativePeriod) for item in extensions) detail_count = sum(isinstance(item, timingPhaseDetail) for item in extensions) + pa_count = sum(isinstance(item, presenceAbsence) for item in extensions) + pq_count = sum(isinstance(item, prespecifiedQuery) for item in extensions) if ( duration_count > 1 or tim_phase_count > 1 or rel_phase_count > 1 or detail_count > 1 + or pa_count > 1 + or pq_count > 1 ): - raise ValueError( - "duration, timingPhase, timingPhaseDetail and relativePeriod can only appear once." # noqa E501 - ) + raise ValueError("Each extension can only appear once.") if tim_phase_count > 0 and detail_count > 0: raise ValueError( From cd1257d1068397670a6e63f309ef421601b57581 Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Thu, 29 Aug 2024 15:08:39 +0100 Subject: [PATCH 2/4] Update get_fhirtype --- fhirflat/util.py | 52 +++++++++++++++++++++++++++++++++------------ tests/test_utils.py | 2 ++ 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/fhirflat/util.py b/fhirflat/util.py index 176170f..335e5ef 100644 --- a/fhirflat/util.py +++ b/fhirflat/util.py @@ -14,6 +14,7 @@ import fhirflat from fhirflat.resources import extensions +from fhirflat.resources.extensions import _ISARICExtension if TYPE_CHECKING: from .resources.base import FHIRFlatBase @@ -47,17 +48,19 @@ def get_fhirtype(t: str | list[str]): return getattr(getattr(fhir.resources, t.lower()), t) except AttributeError: file_words = re.findall(r"[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))", t) - file = "".join(file_words[:-1]).lower() + for i in range(-1, (len(file_words) * -1), -1): + try: + file = "".join(file_words[:i]).lower() + return getattr(getattr(fhir.resources, file), t) + except AttributeError: + continue try: - return getattr(getattr(fhir.resources, file), t) - except AttributeError: - try: - module = importlib.import_module(f"fhir.resources.{t.lower()}") - return getattr(module, t) - except (ImportError, ModuleNotFoundError) as e: - # Handle the case where the module does not exist. - raise AttributeError(f"Could not find {t} in fhir.resources") from e + module = importlib.import_module(f"fhir.resources.{t.lower()}") + return getattr(module, t) + except (ImportError, ModuleNotFoundError) as e: + # Handle the case where the module does not exist. + raise AttributeError(f"Could not find {t} in fhir.resources") from e else: return get_local_extension_type(t) @@ -99,6 +102,10 @@ def find_data_class_options( a class with a matching title to k. k The property to search for within the data class + + Returns + ------- + The relevent FHIR class. """ if isinstance(data_class, list): @@ -126,11 +133,28 @@ def find_data_class_options( return get_fhirtype(base_class) -def code_or_codeable_concept( - col_name: str, resource: FHIRFlatBase | list[FHIRFlatBase] +def is_codeable_concept( + col_name: str, resource: FHIRFlatBase | list[FHIRFlatBase] | _ISARICExtension ) -> bool: + """ + Determines if a column is a code or a codeable concept. + """ search_terms = col_name.split(".") - fhir_type = find_data_class_options(resource, search_terms[0]) + + if not issubclass( + resource if not isinstance(resource, list) else type(str), + _ISARICExtension, + ): + fhir_type = find_data_class_options(resource, search_terms[0]) + + else: + prop = get_local_extension_type(search_terms[0]).schema()["properties"] + value_type = [key for key in prop.keys() if key.startswith("value")] + if len(value_type) == 1: + return True if "codeableconcept" in value_type[0].lower() else False + elif "valueCodeableConcept" in value_type: + # if longer it's a value with 'code' in it, e.g. Quantity. + return True if len(search_terms) == 2 else False if len(search_terms) == 2: # e.g. "code.code", "age.code" schema = fhir_type.schema()["properties"] @@ -144,7 +168,7 @@ def code_or_codeable_concept( else: return False else: - return code_or_codeable_concept(".".join(search_terms[1:]), fhir_type) + return is_codeable_concept(".".join(search_terms[1:]), fhir_type) def format_flat(flat_df: pd.DataFrame, resource: FHIRFlatBase) -> pd.DataFrame: @@ -176,7 +200,7 @@ def format_flat(flat_df: pd.DataFrame, resource: FHIRFlatBase) -> pd.DataFrame: for x in flat_df.columns if ( (x.lower().endswith(".code") or x.lower().endswith(".text")) - and code_or_codeable_concept(x, resource) + and is_codeable_concept(x, resource) ) ]: flat_df[coding_column] = flat_df[coding_column].apply( diff --git a/tests/test_utils.py b/tests/test_utils.py index 872d4f2..d90eddb 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -11,6 +11,7 @@ from fhir.resources.quantity import Quantity from fhir.resources.codeableconcept import CodeableConcept from fhir.resources.medicationstatement import MedicationStatementAdherence +from fhir.resources.immunization import ImmunizationProtocolApplied from fhirflat.resources.extensions import dateTimeExtension, Duration from fhirflat import MedicationStatement @@ -53,6 +54,7 @@ def test_group_keys(): ("MedicationStatementAdherence", MedicationStatementAdherence), ("dateTimeExtension", dateTimeExtension), ("duration", Duration), + ("ImmunizationProtocolApplied", ImmunizationProtocolApplied), ], ) def test_get_fhirtype(input, expected): From 9300fc249649d95b2e9e94fc67c8d55cbd20c294 Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Thu, 29 Aug 2024 15:30:12 +0100 Subject: [PATCH 3/4] Misc fixes and warnings from data ingestion testing --- fhirflat/flat2fhir.py | 3 ++ fhirflat/ingest.py | 42 +++++++++++++++++++------- fhirflat/resources/diagnosticreport.py | 1 + fhirflat/resources/encounter.py | 17 +++++++---- 4 files changed, 46 insertions(+), 17 deletions(-) diff --git a/fhirflat/flat2fhir.py b/fhirflat/flat2fhir.py index 977317e..a821199 100644 --- a/fhirflat/flat2fhir.py +++ b/fhirflat/flat2fhir.py @@ -251,6 +251,9 @@ def create_single_extension(k: str, v: dict | str | float | bool) -> dict: else: raise e # pragma: no cover + except KeyError: + continue + raise RuntimeError(f"extension not created from {k, v}") # pragma: no cover diff --git a/fhirflat/ingest.py b/fhirflat/ingest.py index 8969a67..3a911b3 100644 --- a/fhirflat/ingest.py +++ b/fhirflat/ingest.py @@ -5,6 +5,7 @@ import argparse import hashlib +import logging import os import shutil import timeit @@ -25,6 +26,8 @@ import fhirflat from fhirflat.util import get_local_resource, group_keys +logger = logging.getLogger(__name__) + # 1:1 (single row, single resource) mapping: Patient, Encounter # 1:M (single row, multiple resources) mapping: Observation, Condition, Procedure, ... @@ -95,12 +98,12 @@ def find_field_value( return return_val -def format_dates(date_str: str, date_format: str, timezone: str) -> str: +def format_dates(date_str: str | float, date_format: str, timezone: str) -> str: """ Converts dates into ISO8601 format with timezone information. """ - if date_str is None: + if date_str is None or date_str is np.nan: return date_str new_tz = ZoneInfo(timezone) @@ -124,7 +127,7 @@ def format_dates(date_str: str, date_format: str, timezone: str) -> str: f"Date {date_str} could not be converted using date format" f" {date_format}", UserWarning, - stacklevel=2, + stacklevel=1, ) return date_str @@ -167,7 +170,7 @@ def create_dict_wide( warnings.warn( f"No mapping for column {column} response {response}", UserWarning, - stacklevel=2, + stacklevel=1, ) continue else: @@ -261,11 +264,15 @@ def create_dict_long( except KeyError: # No mapping found for this column and response despite presence # in mapping file - warnings.warn( - f"No mapping for column {column} response {response}", - UserWarning, - stacklevel=2, - ) + if response == 0.0: + # mostly this is ignoring unfilled responses + logger.info(f"No mapping for column {column} response {response}") + else: + warnings.warn( + f"No mapping for column {column} response {response}", + UserWarning, + stacklevel=1, + ) return None return None @@ -329,7 +336,11 @@ def condense(x): # If the column contains a single non-nan value, return it non_nan_values = x.dropna() if non_nan_values.nunique() == 1: - return non_nan_values + return ( + non_nan_values + if len(non_nan_values) == 1 + else non_nan_values.unique()[0] + ) elif non_nan_values.empty: return np.nan else: @@ -337,6 +348,8 @@ def condense(x): else: if len(x) == 1: return x + elif x.nunique() == 1: + return x.unique()[0] else: raise ValueError("Multiple values found in one-to-one mapping") @@ -364,6 +377,13 @@ def condense(x): # Set multi-index for easier access map_df.set_index(["raw_variable", "raw_response"], inplace=True) + map_df.sort_index(inplace=True) # for performance improvements + + if not map_df.index.is_unique: + raise ValueError( + f"Mapping file for the {resource} resource has duplicate entries " + f"{map_df.index[map_df.index.duplicated()]}" + ) # Generate the flat_like dictionary if one_to_one: @@ -522,7 +542,7 @@ def convert_resource( date_format=date_format, timezone=timezone, ) - if df is None: + if df is None or df.empty: return None else: raise ValueError(f"Unknown mapping type {t}") diff --git a/fhirflat/resources/diagnosticreport.py b/fhirflat/resources/diagnosticreport.py index 8de5640..323326e 100644 --- a/fhirflat/resources/diagnosticreport.py +++ b/fhirflat/resources/diagnosticreport.py @@ -79,6 +79,7 @@ def cleanup(cls, data: dict) -> dict: { "basedOn", "subject", + "encounter", "performer", "resultsInterpreter", "specimen", diff --git a/fhirflat/resources/encounter.py b/fhirflat/resources/encounter.py index 6270d75..65d4e65 100644 --- a/fhirflat/resources/encounter.py +++ b/fhirflat/resources/encounter.py @@ -14,8 +14,13 @@ from pydantic.v1 import Field, validator from .base import FHIRFlatBase -from .extension_types import relativePeriodType, timingPhaseDetailType, timingPhaseType -from .extensions import relativePeriod, timingPhase, timingPhaseDetail +from .extension_types import ( + durationType, + relativePeriodType, + timingPhaseDetailType, + timingPhaseType, +) +from .extensions import Duration, relativePeriod, timingPhase, timingPhaseDetail JsonString: TypeAlias = str @@ -26,6 +31,7 @@ class Encounter(_Encounter, FHIRFlatBase): relativePeriodType, timingPhaseType, timingPhaseDetailType, + durationType, fhirtypes.ExtensionType, ] ] = Field( @@ -71,11 +77,10 @@ def validate_extension_contents(cls, extensions): rel_phase_count = sum(isinstance(item, relativePeriod) for item in extensions) timing_count = sum(isinstance(item, timingPhase) for item in extensions) detail_count = sum(isinstance(item, timingPhaseDetail) for item in extensions) + dur_count = sum(isinstance(item, Duration) for item in extensions) - if rel_phase_count > 1 or timing_count > 1 or detail_count > 1: - raise ValueError( - "relativePeriod, timingPhase and timingPhaseDetail can only appear once." # noqa E501 - ) + if rel_phase_count > 1 or timing_count > 1 or detail_count > 1 or dur_count > 1: + raise ValueError("Each extension can only appear once.") if timing_count > 0 and detail_count > 0: raise ValueError( From 46b6a194e5a8aae6b9094d9c2b440da62f9df12b Mon Sep 17 00:00:00 2001 From: Pip Liggins Date: Thu, 29 Aug 2024 15:47:20 +0100 Subject: [PATCH 4/4] Add tests for extensions in medication admin/statement --- .../resources/medicationadministration.py | 5 +++++ fhirflat/util.py | 2 +- .../test_medicationadministration_resource.py | 21 ++++++++++++++++++ tests/test_medicationstatement_resource.py | 22 +++++++++++++++++++ 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/fhirflat/resources/medicationadministration.py b/fhirflat/resources/medicationadministration.py index 83b58a8..e1c453c 100644 --- a/fhirflat/resources/medicationadministration.py +++ b/fhirflat/resources/medicationadministration.py @@ -86,6 +86,11 @@ def validate_extension_contents(cls, extensions): ): raise ValueError("Each extension can can only appear once.") + if tp_count > 0 and tpd_count > 0: + raise ValueError( + "timingPhase and timingPhaseDetail cannot appear together." + ) + return extensions backbone_elements: ClassVar[dict] = { diff --git a/fhirflat/util.py b/fhirflat/util.py index 335e5ef..b7a044d 100644 --- a/fhirflat/util.py +++ b/fhirflat/util.py @@ -17,7 +17,7 @@ from fhirflat.resources.extensions import _ISARICExtension if TYPE_CHECKING: - from .resources.base import FHIRFlatBase + from .resources.base import FHIRFlatBase # pragma: no cover def group_keys(data_keys: list[str] | KeysView) -> dict[str, list[str]]: diff --git a/tests/test_medicationadministration_resource.py b/tests/test_medicationadministration_resource.py index 7aa9a34..8db88f3 100644 --- a/tests/test_medicationadministration_resource.py +++ b/tests/test_medicationadministration_resource.py @@ -3,6 +3,7 @@ import os from fhirflat.resources.medicationadministration import MedicationAdministration import datetime +import pytest MEDS_DICT_INPUT = { "resourceType": "MedicationAdministration", @@ -171,3 +172,23 @@ def test_medicationadministration_from_flat(): ) assert meds == flat_meds + + +@pytest.mark.usefixtures( + "raises_phase_plus_detail_error", "raises_phase_duplicate_error" +) +def test_extension_raises_errors( + raises_phase_plus_detail_error, raises_phase_duplicate_error +): + fhir_input = { + "resourceType": "MedicationAdministration", + "status": "completed", + "medication": {"reference": {"reference": "#med0306"}}, + "subject": {"reference": "Patient/pat1"}, + "occurencePeriod": { + "start": "2015-01-15T04:30:00+01:00", + "end": "2015-01-15T14:30:00+01:00", + }, + } + raises_phase_plus_detail_error(fhir_input, MedicationAdministration) + raises_phase_duplicate_error(fhir_input, MedicationAdministration) diff --git a/tests/test_medicationstatement_resource.py b/tests/test_medicationstatement_resource.py index f5458e1..8e718a6 100644 --- a/tests/test_medicationstatement_resource.py +++ b/tests/test_medicationstatement_resource.py @@ -3,6 +3,7 @@ import os from fhirflat.resources.medicationstatement import MedicationStatement import datetime +import pytest MEDS_DICT_INPUT = { "resourceType": "MedicationStatement", @@ -193,3 +194,24 @@ def test_medicationstatement_from_flat(): flat_meds = MedicationStatement.from_flat("tests/data/medicationstat_flat.parquet") assert meds == flat_meds + + +@pytest.mark.usefixtures("raises_phase_duplicate_error") +def test_extension_raises_errors(raises_phase_duplicate_error): + fhir_input = { + "resourceType": "MedicationStatement", + "status": "recorded", + "medication": { + "concept": { + "coding": [ + { + "system": "http://snomed.info/sct", + "code": "27658006", + "display": "Amoxicillin (product)", + } + ] + } + }, + "subject": {"reference": "Patient/pat1"}, + } + raises_phase_duplicate_error(fhir_input, MedicationStatement)