Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

By patient #48

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ The server supports the following query parameters:
- `_outputFormat`: The server supports the following formats: `application/fhir+ndjson`, `application/ndjson+fhir`, `application/ndjson`, `ndjson`
- `_typeFilter`: Filters the response to only include resources that meet the criteria of the specified comma-delimited FHIR REST queries. Returns an error for queries specified by the client that are unsupported by the server. Supports queries on the ValueSets (`type:in`, `code:in`, etc.) of a given resource type.
- `patient`: Only applicable to POST requests for group-level and patient-level requests. When provided, the server SHALL NOT return resources in the patient compartment definition belonging to patients outside the list. Can support multiple patient references in a single request.
- `_bySubject`: Only applicable for group-level and patient-level requests. Creates export results, separating resources into files based on what subject they are associated with (rather than based on type). The only `_bySubject` value supported is `Patient`. This will result in an ndjson file for each patient in the returned data. If the `_type` parameter is used in conjunction with this parameter, `Patient` must be one of the types included in the passed value list.
lmd59 marked this conversation as resolved.
Show resolved Hide resolved
- `_elements`: Filters the content of the responses to omit unlisted, non-mandatory elements from the resources returned. These elements should be provided in the form `[resource type].[element name]` (e.g., `Patient.id`) which only filters the contents of those specified resources or in the form `[element name]` (e.g., `id`) which filters the contents of all of the returned resources.

#### `_elements` Query Parameter
Expand Down
11 changes: 5 additions & 6 deletions src/server/exportWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ const exportQueue = new Queue('export', {

// This handler pulls down the jobs on Redis to handle
exportQueue.process(async job => {
// Payload of createJob exists on job.data
const { clientEntry, types, typeFilter, patient, systemLevelExport, patientIds, elements } = job.data;
console.log(`export-worker-${process.pid}: Processing Request: ${clientEntry}`);
console.log(`export-worker-${process.pid}: Processing Request: ${job.data.clientEntry}`);
await client.connect();
// Call the existing export ndjson function that writes the files

const result = await exportToNDJson(clientEntry, types, typeFilter, patient, systemLevelExport, patientIds, elements);
// Payload of createJob exists on job.data
const result = await exportToNDJson(job.data);
if (result) {
console.log(`export-worker-${process.pid}: Completed Export Request: ${clientEntry}`);
console.log(`export-worker-${process.pid}: Completed Export Request: ${job.data.clientEntry}`);
} else {
console.log(`export-worker-${process.pid}: Failed Export Request: ${clientEntry}`);
console.log(`export-worker-${process.pid}: Failed Export Request: ${job.data.clientEntry}`);
}
await client.close();
});
12 changes: 12 additions & 0 deletions src/services/bulkstatus.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,36 @@
if (!bulkStatus) {
reply.code(404).send(new Error(`Could not find bulk export request with id: ${clientId}`));
}
if (bulkStatus.status === BULKSTATUS_COMPLETED) {
const parameters = gatherParams(request.method, request.query, request.body, reply);
if (parameters.receiver) {
const responseData = await getNDJsonURLs(reply, clientId);
const importManifest = {
resourceType: 'Parameters'
};
importManifest.parameter = responseData.map(exportFile => {
return {
name: 'input',
part: [
{
name: 'url',
valueUrl: exportFile.url
}
]
};
});
// check if bulk status is byPatient, if so, add inputdetails
if (bulkStatus.byPatient) {
importManifest.parameter.push({
name: 'inputDetails',
part: [
{
name: 'subjectType',
valueCode: 'Patient'
}
]
});

Check warning on line 57 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
}

Check warning on line 58 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

Check warning on line 58 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch

// TODO: add provenance?
const headers = {
Expand Down
48 changes: 40 additions & 8 deletions src/services/export.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,30 @@ const bulkExport = async (request, reply) => {
})
);
}
if (parameters._bySubject) {
reply.code(400).send(
createOperationOutcome('The "_bySubject" parameter cannot be used in a system-level export request.', {
lmd59 marked this conversation as resolved.
Show resolved Hide resolved
issueCode: 400,
severity: 'error'
})
);
}
if (validateExportParams(parameters, reply)) {
request.log.info('Base >>> $export');
const clientEntry = await addPendingBulkExportRequest();

const types = request.query._type?.split(',') || parameters._type?.split(',');
const types = parameters._type?.split(',');

const elements = request.query._elements?.split(',') || parameters._elements?.split(',');
const elements = parameters._elements?.split(',');

// Enqueue a new job into Redis for handling
const job = {
clientEntry: clientEntry,
types: types,
typeFilter: request.query._typeFilter,
systemLevelExport: true,
elements: elements
elements: elements,
byPatient: parameters._bySubject === 'Patient'
};
await exportQueue.createJob(job).save();
reply.code(202).header('Content-location', `${process.env.BULK_BASE_URL}/bulkstatus/${clientEntry}`).send();
Expand Down Expand Up @@ -65,7 +74,7 @@ const patientBulkExport = async (request, reply) => {
await validatePatientReferences(parameters.patient, reply);
}
request.log.info('Patient >>> $export');
const clientEntry = await addPendingBulkExportRequest();
const clientEntry = await addPendingBulkExportRequest(parameters._bySubject === 'Patient');

let types = request.query._type?.split(',') || parameters._type?.split(',');
if (types) {
Expand All @@ -83,7 +92,8 @@ const patientBulkExport = async (request, reply) => {
typeFilter: parameters._typeFilter,
patient: parameters.patient,
systemLevelExport: false,
elements: elements
elements: elements,
byPatient: parameters._bySubject === 'Patient'
};
await exportQueue.createJob(job).save();
reply.code(202).header('Content-location', `${process.env.BULK_BASE_URL}/bulkstatus/${clientEntry}`).send();
Expand Down Expand Up @@ -122,7 +132,7 @@ const groupBulkExport = async (request, reply) => {
return splitRef[splitRef.length - 1];
});

const clientEntry = await addPendingBulkExportRequest();
const clientEntry = await addPendingBulkExportRequest(parameters._bySubject === 'Patient');
let types = request.query._type?.split(',') || parameters._type?.split(',');
if (types) {
types = filterPatientResourceTypes(request, reply, types);
Expand All @@ -140,7 +150,8 @@ const groupBulkExport = async (request, reply) => {
patient: parameters.patient,
systemLevelExport: false,
patientIds: patientIds,
elements: elements
elements: elements,
byPatient: parameters._bySubject === 'Patient'
};
await exportQueue.createJob(job).save();
reply.code(202).header('Content-location', `${process.env.BULK_BASE_URL}/bulkstatus/${clientEntry}`).send();
Expand Down Expand Up @@ -172,6 +183,16 @@ function validateExportParams(parameters, reply) {
}
}

if (parameters._bySubject && parameters._bySubject !== 'Patient') {
reply.code(400).send(
createOperationOutcome(`Server does not support the _bySubject parameter for values other than Patient.`, {
issueCode: 400,
severity: 'error'
})
);
return false;
}

if (parameters._type) {
// type filter is comma-delimited
const requestTypes = parameters._type.split(',');
Expand All @@ -194,6 +215,17 @@ function validateExportParams(parameters, reply) {
);
return false;
}
if (parameters._bySubject === 'Patient' && !requestTypes.includes('Patient')) {
reply
.code(400)
.send(
createOperationOutcome(
`When _type is specified with _bySubject Patient, the Patient type must be included in the _type parameter.`,
{ issueCode: 400, severity: 'error' }
)
);
return false;
}
}

if (parameters._typeFilter) {
Expand Down Expand Up @@ -280,7 +312,7 @@ function validateExportParams(parameters, reply) {

let unrecognizedParams = [];
Object.keys(parameters).forEach(param => {
if (!['_outputFormat', '_type', '_typeFilter', 'patient', '_elements'].includes(param)) {
if (!['_outputFormat', '_type', '_typeFilter', 'patient', '_elements', '_bySubject'].includes(param)) {
unrecognizedParams.push(param);
}
});
Expand Down
94 changes: 76 additions & 18 deletions src/util/exportToNDJson.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ const buildSearchParamList = resourceType => {
* If the _type parameter doesn't exist, the function will simply export all resource types included in the supportedResources list.
* If the _typeFilter parameter is defined but the _type parameter is *not* defined, the function will export all resource types
* included in the supportedResources list, but the resource types specified in the _typeFilter query will be filtered.
* @param {string} clientId an id to add to the file name so the client making the request can be tracked
* @param {Object} jobOptions options object allowing for the following members:
* @param {string} clientEntry an id to add to the file name so the client making the request can be tracked
* @param {Array} types Array of types to be queried for, retrieved from request params
* @param {string} typeFilter String of comma separated FHIR REST search queries
* @param {string | Array} patient Patient references from the "patient" param, used to filter results
* @param {boolean} systemLevelExport boolean flag from job that signals whether request is for system-level export (determines filtering)
* @param {Array} patientIds Array of patient ids for patients relevant to this export (undefined if all patients)
* @param {Array} elements Array of elements parameters that indicate how to omit any unlisted, non-mandatory elements
* @param {boolean} byPatient boolean flag from job that signals whether resulting files should be grouped by patient (versus by type)
*/
const exportToNDJson = async (clientId, types, typeFilter, patient, systemLevelExport, patientIds, elements) => {
const exportToNDJson = async jobOptions => {
const { clientEntry, types, typeFilter, patient, systemLevelExport, patientIds, elements, byPatient } = jobOptions;
try {
const dirpath = './tmp/';
fs.mkdirSync(dirpath, { recursive: true });
Expand Down Expand Up @@ -185,19 +189,73 @@ const exportToNDJson = async (clientId, types, typeFilter, patient, systemLevelE
return splitRef[splitRef.length - 1];
});

let docs = exportTypes.map(async collectionName => {
return getDocuments(
collectionName,
searchParameterQueries[collectionName],
valueSetQueries[collectionName],
patientParamIds || patientIds,
elementsQueries[collectionName]
);
});
let docs;
if (byPatient) {
// use parameter patient or group patient ids or pull all patient ids from database
const ids =
patientParamIds ||
patientIds ||
(
await getDocuments(
'Patient',
searchParameterQueries['Patient'],
valueSetQueries['Patient'],
patientParamIds || patientIds,
elementsQueries['Patient']
)
).document.map(p => p.id);

docs = ids.map(async patientId => {
// create patient-based subject header
const subjectHeader = {
resourceType: 'Parameters',
parameter: [
{
name: 'subject',
valueReference: {
reference: `Patient/${patientId}`
}
}
]
};
// for each patient, collect resource documents from each export type collection
const typeDocs = exportTypes.map(async collectionName => {
return (
await getDocuments(
collectionName,
searchParameterQueries[collectionName],
valueSetQueries[collectionName],
[patientId],
elementsQueries[collectionName]
)
).document;
});

//flatten all exportType arrays into a single array
const patientDocuments = (await Promise.all(typeDocs)).flat();
// append subject header as the first resource
patientDocuments.unshift(subjectHeader);
return {
// use the patient id as the document name (will be "{patientId}.ndjson")
name: patientId,
document: patientDocuments
};
});
} else {
docs = exportTypes.map(async collectionName => {
return getDocuments(
collectionName,
searchParameterQueries[collectionName],
valueSetQueries[collectionName],
patientParamIds || patientIds,
elementsQueries[collectionName]
);
});
}
docs = await Promise.all(docs);
docs.forEach(doc => {
if (doc.document) {
writeToFile(doc.document, doc.collectionName, clientId);
writeToFile(doc.document, doc.name, clientEntry);
}
});

Expand All @@ -217,10 +275,10 @@ const exportToNDJson = async (clientId, types, typeFilter, patient, systemLevelE
*/

// mark bulk status job as complete after all files have been written
await updateBulkExportStatus(clientId, BULKSTATUS_COMPLETED);
await updateBulkExportStatus(clientEntry, BULKSTATUS_COMPLETED);
return true;
} catch (e) {
await updateBulkExportStatus(clientId, BULKSTATUS_FAILED, { message: e.message, code: 500 });
await updateBulkExportStatus(clientEntry, BULKSTATUS_FAILED, { message: e.message, code: 500 });
return false;
}
};
Expand Down Expand Up @@ -317,21 +375,21 @@ const getDocuments = async (collectionName, searchParameterQueries, valueSetQuer
});
}

return { document: docs, collectionName: collectionName };
return { document: docs, name: collectionName };
};

/**
* Writes the contents of a mongo document to an ndjson file with the appropriate resource
* name, stored in a directory under the client's id
* @param {Object} doc A mongodb document containing fhir resources
* @param {string} type The fhir resourceType contained in the mongo document
* @param {string} filebase The base filename: either the fhir resourceType or patient id
* @param {string} clientId The id of the client making the export request
* @returns
*/
const writeToFile = function (doc, type, clientId) {
const writeToFile = function (doc, filebase, clientId) {
let dirpath = './tmp/' + clientId;
fs.mkdirSync(dirpath, { recursive: true });
const filename = path.join(dirpath, `${type}.ndjson`);
const filename = path.join(dirpath, `${filebase}.ndjson`);

let lineCount = 0;
if (Object.keys(doc).length > 0) {
Expand Down
6 changes: 4 additions & 2 deletions src/util/mongo.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ const findResourcesWithAggregation = async (query, resourceType, options = {}) =
/**
* Called as a result of export request. Adds a new clientId to db
* which can be queried to get updates on the status of the bulk export
* @param {boolean} byPatient indicates whether this export request groups data by patient (versus by type)
* @returns the id of the inserted client
*/
const addPendingBulkExportRequest = async () => {
const addPendingBulkExportRequest = async (byPatient = false) => {
lmd59 marked this conversation as resolved.
Show resolved Hide resolved
const collection = db.collection('bulkExportStatuses');
const clientId = uuidv4();
const bulkExportClient = {
Expand All @@ -120,7 +121,8 @@ const addPendingBulkExportRequest = async () => {
numberOfRequestsInWindow: 0,
timeOfFirstValidRequest: null,
error: {},
warnings: []
warnings: [],
byPatient: byPatient
};
await collection.insertOne(bulkExportClient);
return clientId;
Expand Down
7 changes: 7 additions & 0 deletions test/fixtures/testBulkStatus.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,12 @@
"status": "Completed",
"error": {},
"warnings": []
},
{
"id": "BY_PATIENT",
"status": "Completed",
"error": {},
"warnings": [],
"byPatient": true
}
]
8 changes: 8 additions & 0 deletions test/fixtures/testEncounter.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,13 @@
"id": "testEncounter",
"subject": {
"reference": "Patient/testPatient"
},
"type": {
"coding": [
{
"system": "http://example.com",
"code": "example-code-1"
}
]
}
}
10 changes: 9 additions & 1 deletion test/fixtures/testPatient.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
{
"resourceType": "Patient",
"id": "testPatient"
"id": "testPatient",
"maritalStatus": {
"coding": [
{
"system": "http://example.com",
"code": "example-code-1"
}
]
}
}
10 changes: 9 additions & 1 deletion test/fixtures/testServiceRequest.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
{
"resourceType": "ServiceRequest",
"id": "testServiceRequest"
"id": "testServiceRequest",
"code": {
"coding": [
{
"system": "http://example.com",
"code": "example-code-1"
}
]
}
}
Loading
Loading