Skip to content

Commit

Permalink
Merge pull request #102 from smart-on-fhir/mikix/oracle-support2
Browse files Browse the repository at this point in the history
oracle: actually let you connect against an Oracle server
  • Loading branch information
mikix authored Dec 19, 2022
2 parents d668d29 + 58adb7c commit 3b622ef
Show file tree
Hide file tree
Showing 18 changed files with 493 additions and 185 deletions.
4 changes: 3 additions & 1 deletion cumulus/deid/mstool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import subprocess # nosec: B404
import sys

from cumulus import errors

MSTOOL_CMD = 'Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool'


Expand Down Expand Up @@ -38,4 +40,4 @@ def run_mstool(input_dir: str, output_dir: str) -> None:
)
except subprocess.CalledProcessError as exc:
print(f'An error occurred while de-identifying the input resources:\n\n{exc.stderr}', file=sys.stderr)
raise SystemExit(1) from exc
raise SystemExit(errors.MSTOOL_FAILED) from exc
11 changes: 11 additions & 0 deletions cumulus/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Exception classes and error handling"""

# Error return codes, mostly just distinguished for the benefit of tests.
# These start at 10 just to leave some room for future use.
SQL_USER_MISSING = 10
SQL_PASSWORD_MISSING = 11
MSTOOL_FAILED = 12
MSTOOL_MISSING = 13
CTAKES_MISSING = 14
SMART_CREDENTIALS_MISSING = 15
BULK_EXPORT_FAILED = 16
6 changes: 3 additions & 3 deletions cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from fhirclient.models.patient import Patient
from fhirclient.models.resource import Resource

from cumulus import common, context, ctakes, deid, formats, loaders, store
from cumulus import common, context, ctakes, deid, errors, formats, loaders, store
from cumulus.config import JobConfig, JobSummary

###############################################################################
Expand Down Expand Up @@ -302,7 +302,7 @@ def check_ctakes() -> None:
print(f'A running cTAKES server was not found at:\n {ctakes_url}\n\n'
'Please set the URL_CTAKES_REST environment variable to your server.',
file=sys.stderr)
raise SystemExit(1)
raise SystemExit(errors.CTAKES_MISSING)


def check_mstool() -> None:
Expand All @@ -314,7 +314,7 @@ def check_mstool() -> None:
'Please see https://github.com/microsoft/Tools-for-Health-Data-Anonymization\n'
'and install it into your PATH.',
file=sys.stderr)
raise SystemExit(1)
raise SystemExit(errors.MSTOOL_MISSING)


def check_requirements() -> None:
Expand Down
16 changes: 8 additions & 8 deletions cumulus/loaders/fhir/fhir_ndjson.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import tempfile

from cumulus import common, store
from cumulus import common, errors, store
from cumulus.loaders import base
from cumulus.loaders.fhir.backend_service import BackendServiceServer, FatalError
from cumulus.loaders.fhir.bulk_export import BulkExporter
Expand Down Expand Up @@ -48,14 +48,14 @@ class Dir:

def _load_from_bulk_export(self) -> tempfile.TemporaryDirectory:
# First, check that the extra arguments we need were provided
errors = []
error_list = []
if not self.client_id:
errors.append('You must provide a client ID with --smart-client-id to connect to a SMART FHIR server.')
error_list.append('You must provide a client ID with --smart-client-id to connect to a SMART FHIR server.')
if not self.jwks:
errors.append('You must provide a JWKS file with --smart-jwks to connect to a SMART FHIR server.')
if errors:
print('\n'.join(errors), file=sys.stderr)
raise SystemExit(1)
error_list.append('You must provide a JWKS file with --smart-jwks to connect to a SMART FHIR server.')
if error_list:
print('\n'.join(error_list), file=sys.stderr)
raise SystemExit(errors.SMART_CREDENTIALS_MISSING)

tmpdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with

Expand All @@ -73,6 +73,6 @@ def _load_from_bulk_export(self) -> tempfile.TemporaryDirectory:
bulk_exporter.export()
except FatalError as exc:
print(str(exc), file=sys.stderr)
raise SystemExit(2) from exc # just to differentiate from the 1 system exit above in tests
raise SystemExit(errors.BULK_EXPORT_FAILED) from exc

return tmpdir
108 changes: 84 additions & 24 deletions cumulus/loaders/i2b2/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,28 @@
import json
import os
import tempfile
from functools import partial
from typing import Callable, Iterable, Iterator, TypeVar

from fhirclient.models.resource import Resource

from cumulus import common
from cumulus.loaders.base import Loader
from cumulus.loaders.i2b2 import extract, schema, transform
from cumulus.loaders.i2b2.oracle import extract as oracle_extract

AnyDimension = TypeVar('AnyDimension', bound=schema.Dimension)
I2b2ExtractorCallable = Callable[[], Iterable[schema.Dimension]]
CsvToI2b2Callable = Callable[[str], Iterable[schema.Dimension]]
I2b2ToFhirCallable = Callable[[AnyDimension], Resource]


class I2b2Loader(Loader):
"""
Loader for i2b2 csv data.
Loader for i2b2 data.
Expected format is any number of csv files in the following subdirectories:
Expected format is either a tcp:// URL pointing at an Oracle server or a local folder,
holding any number of csv files in the following subdirectories:
- csv_diagnosis
- csv_lab
- csv_note
Expand All @@ -29,61 +33,117 @@ class I2b2Loader(Loader):
"""

def load_all(self) -> tempfile.TemporaryDirectory:
if self.root.protocol in ['tcp']:
return self._load_all_from_oracle()

return self._load_all_from_csv()

def _load_all_with_extractors(
self,
conditions: I2b2ExtractorCallable,
observations: I2b2ExtractorCallable,
documentreferences: I2b2ExtractorCallable,
patients: I2b2ExtractorCallable,
encounters: I2b2ExtractorCallable,
) -> tempfile.TemporaryDirectory:
"""
Load i2b2 content into a local folder as ndjson
Argument names are short to encourage treating them as kwargs for easier readability.
"""
tmpdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with

self._loop(
'csv_diagnosis',
os.path.join(tmpdir.name, 'Condition.ndjson'),
extract.extract_csv_observation_facts,
conditions(),
transform.to_fhir_condition,
os.path.join(tmpdir.name, 'Condition.ndjson'),
)

self._loop(
'csv_lab',
os.path.join(tmpdir.name, 'Observation.ndjson'),
extract.extract_csv_observation_facts,
observations(),
transform.to_fhir_observation_lab,
os.path.join(tmpdir.name, 'Observation.ndjson'),
)

self._loop(
'csv_note',
os.path.join(tmpdir.name, 'DocumentReference.ndjson'),
extract.extract_csv_observation_facts,
documentreferences(),
transform.to_fhir_documentreference,
os.path.join(tmpdir.name, 'DocumentReference.ndjson'),
)

self._loop(
'csv_patient',
os.path.join(tmpdir.name, 'Patient.ndjson'),
extract.extract_csv_patients,
patients(),
transform.to_fhir_patient,
os.path.join(tmpdir.name, 'Patient.ndjson'),
)

self._loop(
'csv_visit',
os.path.join(tmpdir.name, 'Encounter.ndjson'),
extract.extract_csv_visits,
encounters(),
transform.to_fhir_encounter,
os.path.join(tmpdir.name, 'Encounter.ndjson'),
)

return tmpdir

def _loop(self, folder: str, output_path: str, extractor: CsvToI2b2Callable, to_fhir: I2b2ToFhirCallable) -> None:
def _loop(self, i2b2_entries: Iterable[schema.Dimension], to_fhir: I2b2ToFhirCallable, output_path: str) -> None:
"""Takes one kind of i2b2 resource, loads them all up, and writes out a FHIR ndjson file"""
with open(output_path, 'w', encoding='utf8') as output_file:
csv_files = common.list_csv(self.root.joinpath(folder))
i2b2_entries = self._extract_from_files(extractor, csv_files)
fhir_resources = (to_fhir(x) for x in i2b2_entries)
fhir_resources = (to_fhir(x) for x in i2b2_entries)

ids = set() # keep track of every ID we've seen so far, because sometimes i2b2 can have duplicates

with open(output_path, 'w', encoding='utf8') as output_file:
# Now write each FHIR resource line by line to the output
# (we do this all line by line via generators to avoid loading everything in memory at once)
for resource in fhir_resources:
if resource.id in ids:
continue
ids.add(resource.id)
json.dump(resource.as_json(), output_file)
output_file.write('\n')

###################################################################################################################
#
# CSV code
#
###################################################################################################################

@staticmethod
def _extract_from_files(extractor: CsvToI2b2Callable, csv_files: Iterable[str]) -> Iterator[schema.Dimension]:
"""Generator method that lazily loads input csv files"""
def _extract_csv_files(extractor: CsvToI2b2Callable, csv_files: Iterable[str]) -> Iterator[schema.Dimension]:
"""Generator method that lazily loads a list of input csv files"""
for csv_file in csv_files:
for entry in extractor(csv_file):
yield entry

def _extract_csv_dir(self, folder: str, extractor: CsvToI2b2Callable) -> Iterator[schema.Dimension]:
"""Generator method that lazily loads all input csv files in the given folder"""
csv_files = common.list_csv(folder)
return self._extract_csv_files(extractor, csv_files)

def _load_all_from_csv(self) -> tempfile.TemporaryDirectory:
path = self.root.path
return self._load_all_with_extractors(
conditions=partial(self._extract_csv_dir, os.path.join(path, 'csv_diagnosis'),
extract.extract_csv_observation_facts),
observations=partial(self._extract_csv_dir, os.path.join(path, 'csv_lab'),
extract.extract_csv_observation_facts),
documentreferences=partial(self._extract_csv_dir, os.path.join(path, 'csv_note'),
extract.extract_csv_observation_facts),
patients=partial(self._extract_csv_dir, os.path.join(path, 'csv_patient'), extract.extract_csv_patients),
encounters=partial(self._extract_csv_dir, os.path.join(path, 'csv_visit'), extract.extract_csv_visits),
)

###################################################################################################################
#
# Oracle SQL server code
#
###################################################################################################################

def _load_all_from_oracle(self) -> tempfile.TemporaryDirectory:
path = self.root.path
return self._load_all_with_extractors(
conditions=partial(oracle_extract.list_observation_fact, path, 'Diagnosis'),
observations=partial(oracle_extract.list_observation_fact, path, 'Lab View'),
documentreferences=partial(oracle_extract.list_observation_fact, path, 'Notes'),
patients=partial(oracle_extract.list_patient, path),
encounters=partial(oracle_extract.list_visit, path),
)
33 changes: 19 additions & 14 deletions cumulus/loaders/i2b2/oracle/connect.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,41 @@
"""Connects to oracle database"""

import getpass
import logging
import os
import sys

import oracledb

from cumulus import errors

def get_data_source_name() -> str:
return os.environ.get('I2B2_SQL_DSN', None)


def get_user() -> str:
def _get_user() -> str:
"""
:return: user for I2b2 database
"""
return os.environ.get('I2B2_SQL_USER', None)
user = os.environ.get('CUMULUS_SQL_USER')
if not user:
print('To connect to an Oracle SQL server, please set the environment variable CUMULUS_SQL_USER',
file=sys.stderr)
raise SystemExit(errors.SQL_USER_MISSING)
return user


def get_password() -> str:
def _get_password() -> str:
"""
:return: password for the DSN
"""
pwd = os.environ.get('I2B2_SQL_PASS', None)
if not pwd or len(pwd) < 4:
user = get_user()
return getpass.getpass(f'Enter password for I2B2_SQL_USER {user}: ')
pwd = os.environ.get('CUMULUS_SQL_PASSWORD')
if not pwd:
print('To connect to an Oracle SQL server, please set the environment variable CUMULUS_SQL_PASSWORD',
file=sys.stderr)
raise SystemExit(errors.SQL_PASSWORD_MISSING)
return pwd


def connect() -> oracledb.Connection:
def connect(dsn: str) -> oracledb.Connection:
"""
:return: connection to oracle database
"""
logging.info('Attempting to connect to %s', get_data_source_name())
return oracledb.connect(user=get_user(), password=get_password(), dsn=get_data_source_name())
logging.info('Attempting to connect to %s', dsn)
return oracledb.connect(user=_get_user(), password=_get_password(), dsn=dsn)
Loading

0 comments on commit 3b622ef

Please sign in to comment.