Skip to content

Commit

Permalink
By patient (#48)
Browse files Browse the repository at this point in the history
* by patient export

* Add/fix tests and add comments

* Apply suggestions from code review

Typo and remove unnecessary parameter

Co-authored-by: Elsa Perelli <[email protected]>

* Update import manifest for kickoff-import

* Simplify parameter gathering

* Remove TODO and add note comment

---------

Co-authored-by: Elsa Perelli <[email protected]>
  • Loading branch information
lmd59 and elsaperelli authored Aug 6, 2024
1 parent dd47085 commit 2824aae
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 49 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ The server supports the following query parameters:
- `_outputFormat`: The server supports the following formats: `application/fhir+ndjson`, `application/ndjson+fhir`, `application/ndjson`, `ndjson`
- `_typeFilter`: Filters the response to only include resources that meet the criteria of the specified comma-delimited FHIR REST queries. Returns an error for queries specified by the client that are unsupported by the server. Supports queries on the ValueSets (`type:in`, `code:in`, etc.) of a given resource type.
- `patient`: Only applicable to POST requests for group-level and patient-level requests. When provided, the server SHALL NOT return resources in the patient compartment definition belonging to patients outside the list. Can support multiple patient references in a single request.
- `_bySubject`: Only applicable for group-level and patient-level requests. Creates export results, separating resources into files based on what subject they are associated with (rather than based on type). The only `_bySubject` value supported is `Patient`. This will result in an ndjson file for each patient in the returned data. If the `_type` parameter is used in conjunction with this parameter, `Patient` must be one of the types included in the passed value list.
- `_elements`: Filters the content of the responses to omit unlisted, non-mandatory elements from the resources returned. These elements should be provided in the form `[resource type].[element name]` (e.g., `Patient.id`) which only filters the contents of those specified resources or in the form `[element name]` (e.g., `id`) which filters the contents of all of the returned resources.

#### `_elements` Query Parameter
Expand Down
11 changes: 5 additions & 6 deletions src/server/exportWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ const exportQueue = new Queue('export', {

// This handler pulls down the jobs on Redis to handle
exportQueue.process(async job => {
// Payload of createJob exists on job.data
const { clientEntry, types, typeFilter, patient, systemLevelExport, patientIds, elements } = job.data;
console.log(`export-worker-${process.pid}: Processing Request: ${clientEntry}`);
console.log(`export-worker-${process.pid}: Processing Request: ${job.data.clientEntry}`);
await client.connect();
// Call the existing export ndjson function that writes the files

const result = await exportToNDJson(clientEntry, types, typeFilter, patient, systemLevelExport, patientIds, elements);
// Payload of createJob exists on job.data
const result = await exportToNDJson(job.data);
if (result) {
console.log(`export-worker-${process.pid}: Completed Export Request: ${clientEntry}`);
console.log(`export-worker-${process.pid}: Completed Export Request: ${job.data.clientEntry}`);
} else {
console.log(`export-worker-${process.pid}: Failed Export Request: ${clientEntry}`);
console.log(`export-worker-${process.pid}: Failed Export Request: ${job.data.clientEntry}`);
}
await client.close();
});
12 changes: 12 additions & 0 deletions src/services/bulkstatus.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ async function kickoffImport(request, reply) {
]
};
});
// check if bulk status is byPatient, if so, add inputdetails
if (bulkStatus.byPatient) {
importManifest.parameter.push({
name: 'inputDetails',
part: [
{
name: 'subjectType',
valueCode: 'Patient'
}
]
});
}

// TODO: add provenance?
const headers = {
Expand Down
48 changes: 40 additions & 8 deletions src/services/export.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,30 @@ const bulkExport = async (request, reply) => {
})
);
}
if (parameters._bySubject) {
reply.code(400).send(
createOperationOutcome('The "_bySubject" parameter cannot be used in a system-level export request.', {
issueCode: 400,
severity: 'error'
})
);
}
if (validateExportParams(parameters, reply)) {
request.log.info('Base >>> $export');
const clientEntry = await addPendingBulkExportRequest();

const types = request.query._type?.split(',') || parameters._type?.split(',');
const types = parameters._type?.split(',');

const elements = request.query._elements?.split(',') || parameters._elements?.split(',');
const elements = parameters._elements?.split(',');

// Enqueue a new job into Redis for handling
const job = {
clientEntry: clientEntry,
types: types,
typeFilter: request.query._typeFilter,
systemLevelExport: true,
elements: elements
elements: elements,
byPatient: parameters._bySubject === 'Patient'
};
await exportQueue.createJob(job).save();
reply.code(202).header('Content-location', `${process.env.BULK_BASE_URL}/bulkstatus/${clientEntry}`).send();
Expand Down Expand Up @@ -65,7 +74,7 @@ const patientBulkExport = async (request, reply) => {
await validatePatientReferences(parameters.patient, reply);
}
request.log.info('Patient >>> $export');
const clientEntry = await addPendingBulkExportRequest();
const clientEntry = await addPendingBulkExportRequest(parameters._bySubject === 'Patient');

let types = request.query._type?.split(',') || parameters._type?.split(',');
if (types) {
Expand All @@ -83,7 +92,8 @@ const patientBulkExport = async (request, reply) => {
typeFilter: parameters._typeFilter,
patient: parameters.patient,
systemLevelExport: false,
elements: elements
elements: elements,
byPatient: parameters._bySubject === 'Patient'
};
await exportQueue.createJob(job).save();
reply.code(202).header('Content-location', `${process.env.BULK_BASE_URL}/bulkstatus/${clientEntry}`).send();
Expand Down Expand Up @@ -122,7 +132,7 @@ const groupBulkExport = async (request, reply) => {
return splitRef[splitRef.length - 1];
});

const clientEntry = await addPendingBulkExportRequest();
const clientEntry = await addPendingBulkExportRequest(parameters._bySubject === 'Patient');
let types = request.query._type?.split(',') || parameters._type?.split(',');
if (types) {
types = filterPatientResourceTypes(request, reply, types);
Expand All @@ -140,7 +150,8 @@ const groupBulkExport = async (request, reply) => {
patient: parameters.patient,
systemLevelExport: false,
patientIds: patientIds,
elements: elements
elements: elements,
byPatient: parameters._bySubject === 'Patient'
};
await exportQueue.createJob(job).save();
reply.code(202).header('Content-location', `${process.env.BULK_BASE_URL}/bulkstatus/${clientEntry}`).send();
Expand Down Expand Up @@ -172,6 +183,16 @@ function validateExportParams(parameters, reply) {
}
}

if (parameters._bySubject && parameters._bySubject !== 'Patient') {
reply.code(400).send(
createOperationOutcome(`Server does not support the _bySubject parameter for values other than Patient.`, {
issueCode: 400,
severity: 'error'
})
);
return false;
}

if (parameters._type) {
// type filter is comma-delimited
const requestTypes = parameters._type.split(',');
Expand All @@ -194,6 +215,17 @@ function validateExportParams(parameters, reply) {
);
return false;
}
if (parameters._bySubject === 'Patient' && !requestTypes.includes('Patient')) {
reply
.code(400)
.send(
createOperationOutcome(
`When _type is specified with _bySubject Patient, the Patient type must be included in the _type parameter.`,
{ issueCode: 400, severity: 'error' }
)
);
return false;
}
}

if (parameters._typeFilter) {
Expand Down Expand Up @@ -280,7 +312,7 @@ function validateExportParams(parameters, reply) {

let unrecognizedParams = [];
Object.keys(parameters).forEach(param => {
if (!['_outputFormat', '_type', '_typeFilter', 'patient', '_elements'].includes(param)) {
if (!['_outputFormat', '_type', '_typeFilter', 'patient', '_elements', '_bySubject'].includes(param)) {
unrecognizedParams.push(param);
}
});
Expand Down
94 changes: 76 additions & 18 deletions src/util/exportToNDJson.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ const buildSearchParamList = resourceType => {
* If the _type parameter doesn't exist, the function will simply export all resource types included in the supportedResources list.
* If the _typeFilter parameter is defined but the _type parameter is *not* defined, the function will export all resource types
* included in the supportedResources list, but the resource types specified in the _typeFilter query will be filtered.
* @param {string} clientId an id to add to the file name so the client making the request can be tracked
* @param {Object} jobOptions options object allowing for the following members:
* @param {string} clientEntry an id to add to the file name so the client making the request can be tracked
* @param {Array} types Array of types to be queried for, retrieved from request params
* @param {string} typeFilter String of comma separated FHIR REST search queries
* @param {string | Array} patient Patient references from the "patient" param, used to filter results
* @param {boolean} systemLevelExport boolean flag from job that signals whether request is for system-level export (determines filtering)
* @param {Array} patientIds Array of patient ids for patients relevant to this export (undefined if all patients)
* @param {Array} elements Array of elements parameters that indicate how to omit any unlisted, non-mandatory elements
* @param {boolean} byPatient boolean flag from job that signals whether resulting files should be grouped by patient (versus by type)
*/
const exportToNDJson = async (clientId, types, typeFilter, patient, systemLevelExport, patientIds, elements) => {
const exportToNDJson = async jobOptions => {
const { clientEntry, types, typeFilter, patient, systemLevelExport, patientIds, elements, byPatient } = jobOptions;
try {
const dirpath = './tmp/';
fs.mkdirSync(dirpath, { recursive: true });
Expand Down Expand Up @@ -185,19 +189,73 @@ const exportToNDJson = async (clientId, types, typeFilter, patient, systemLevelE
return splitRef[splitRef.length - 1];
});

let docs = exportTypes.map(async collectionName => {
return getDocuments(
collectionName,
searchParameterQueries[collectionName],
valueSetQueries[collectionName],
patientParamIds || patientIds,
elementsQueries[collectionName]
);
});
let docs;
if (byPatient) {
// use parameter patient or group patient ids or pull all patient ids from database
const ids =
patientParamIds ||
patientIds ||
(
await getDocuments(
'Patient',
searchParameterQueries['Patient'],
valueSetQueries['Patient'],
patientParamIds || patientIds,
elementsQueries['Patient']
)
).document.map(p => p.id);

docs = ids.map(async patientId => {
// create patient-based subject header
const subjectHeader = {
resourceType: 'Parameters',
parameter: [
{
name: 'subject',
valueReference: {
reference: `Patient/${patientId}`
}
}
]
};
// for each patient, collect resource documents from each export type collection
const typeDocs = exportTypes.map(async collectionName => {
return (
await getDocuments(
collectionName,
searchParameterQueries[collectionName],
valueSetQueries[collectionName],
[patientId],
elementsQueries[collectionName]
)
).document;
});

//flatten all exportType arrays into a single array
const patientDocuments = (await Promise.all(typeDocs)).flat();
// append subject header as the first resource
patientDocuments.unshift(subjectHeader);
return {
// use the patient id as the document name (will be "{patientId}.ndjson")
name: patientId,
document: patientDocuments
};
});
} else {
docs = exportTypes.map(async collectionName => {
return getDocuments(
collectionName,
searchParameterQueries[collectionName],
valueSetQueries[collectionName],
patientParamIds || patientIds,
elementsQueries[collectionName]
);
});
}
docs = await Promise.all(docs);
docs.forEach(doc => {
if (doc.document) {
writeToFile(doc.document, doc.collectionName, clientId);
writeToFile(doc.document, doc.name, clientEntry);
}
});

Expand All @@ -217,10 +275,10 @@ const exportToNDJson = async (clientId, types, typeFilter, patient, systemLevelE
*/

// mark bulk status job as complete after all files have been written
await updateBulkExportStatus(clientId, BULKSTATUS_COMPLETED);
await updateBulkExportStatus(clientEntry, BULKSTATUS_COMPLETED);
return true;
} catch (e) {
await updateBulkExportStatus(clientId, BULKSTATUS_FAILED, { message: e.message, code: 500 });
await updateBulkExportStatus(clientEntry, BULKSTATUS_FAILED, { message: e.message, code: 500 });
return false;
}
};
Expand Down Expand Up @@ -317,21 +375,21 @@ const getDocuments = async (collectionName, searchParameterQueries, valueSetQuer
});
}

return { document: docs, collectionName: collectionName };
return { document: docs, name: collectionName };
};

/**
* Writes the contents of a mongo document to an ndjson file with the appropriate resource
* name, stored in a directory under the client's id
* @param {Object} doc A mongodb document containing fhir resources
* @param {string} type The fhir resourceType contained in the mongo document
* @param {string} filebase The base filename: either the fhir resourceType or patient id
* @param {string} clientId The id of the client making the export request
* @returns
*/
const writeToFile = function (doc, type, clientId) {
const writeToFile = function (doc, filebase, clientId) {
let dirpath = './tmp/' + clientId;
fs.mkdirSync(dirpath, { recursive: true });
const filename = path.join(dirpath, `${type}.ndjson`);
const filename = path.join(dirpath, `${filebase}.ndjson`);

let lineCount = 0;
if (Object.keys(doc).length > 0) {
Expand Down
6 changes: 4 additions & 2 deletions src/util/mongo.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ const findResourcesWithAggregation = async (query, resourceType, options = {}) =
/**
* Called as a result of export request. Adds a new clientId to db
* which can be queried to get updates on the status of the bulk export
* @param {boolean} byPatient indicates whether this export request groups data by patient (versus by type)
* @returns the id of the inserted client
*/
const addPendingBulkExportRequest = async () => {
const addPendingBulkExportRequest = async (byPatient = false) => {
const collection = db.collection('bulkExportStatuses');
const clientId = uuidv4();
const bulkExportClient = {
Expand All @@ -120,7 +121,8 @@ const addPendingBulkExportRequest = async () => {
numberOfRequestsInWindow: 0,
timeOfFirstValidRequest: null,
error: {},
warnings: []
warnings: [],
byPatient: byPatient
};
await collection.insertOne(bulkExportClient);
return clientId;
Expand Down
7 changes: 7 additions & 0 deletions test/fixtures/testBulkStatus.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,12 @@
"status": "Completed",
"error": {},
"warnings": []
},
{
"id": "BY_PATIENT",
"status": "Completed",
"error": {},
"warnings": [],
"byPatient": true
}
]
8 changes: 8 additions & 0 deletions test/fixtures/testEncounter.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,13 @@
"id": "testEncounter",
"subject": {
"reference": "Patient/testPatient"
},
"type": {
"coding": [
{
"system": "http://example.com",
"code": "example-code-1"
}
]
}
}
10 changes: 9 additions & 1 deletion test/fixtures/testPatient.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
{
"resourceType": "Patient",
"id": "testPatient"
"id": "testPatient",
"maritalStatus": {
"coding": [
{
"system": "http://example.com",
"code": "example-code-1"
}
]
}
}
10 changes: 9 additions & 1 deletion test/fixtures/testServiceRequest.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
{
"resourceType": "ServiceRequest",
"id": "testServiceRequest"
"id": "testServiceRequest",
"code": {
"coding": [
{
"system": "http://example.com",
"code": "example-code-1"
}
]
}
}
Loading

0 comments on commit 2824aae

Please sign in to comment.