From cd04a46d9915f63e9cd307c9b87be7ceb8cafa08 Mon Sep 17 00:00:00 2001 From: lmd59 Date: Thu, 20 Jun 2024 13:03:58 -0400 Subject: [PATCH 1/4] Initial kickoff-import --- README.md | 10 ++++ src/server/app.js | 3 +- src/services/bulkstatus.service.js | 73 +++++++++++++++++++++++- src/services/export.service.js | 49 +--------------- src/util/serviceUtils.js | 52 +++++++++++++++++ test/services/bulkstatus.service.test.js | 47 ++++++++++++++- 6 files changed, 183 insertions(+), 51 deletions(-) create mode 100644 src/util/serviceUtils.js diff --git a/README.md b/README.md index 0e1206f..561b113 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,16 @@ Alternatively, a POST request (`POST [fhir base]/$export`) can be sent. The expo For more information on the export endpoints, read this documentation on the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export/index.html#request-flow). +#### Bulk Status + +This server supports the bulk status endpoint in support of the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export/index.html#request-flow). + +Endpoint: `GET [fhir base]/bulkstatus/[client id]` + +The server additionally supports a related convenience endpoint which kicks off an `$import` operation for an existing export request. The exported data is selected for import to a data receiver server. This import server location should be specifed with parameters using a FHIR [Parameters Resource](http://hl7.org/fhir/R4/parameters.html) with name `receiver` in the request body. The server will respond with the same bulk status information according to the progress of the existing export workflow. + +Endpoint: `POST [fhir base]/bulkstatus/[client id]/kickoff-import` + ## Supported Query Parameters The server supports the following query parameters: diff --git a/src/server/app.js b/src/server/app.js index e1b27f6..a736704 100644 --- a/src/server/app.js +++ b/src/server/app.js @@ -2,7 +2,7 @@ const fastify = require('fastify'); const cors = require('@fastify/cors'); const { bulkExport, patientBulkExport, groupBulkExport } = require('../services/export.service'); -const { checkBulkStatus } = require('../services/bulkstatus.service'); +const { checkBulkStatus, kickoffImport } = require('../services/bulkstatus.service'); const { returnNDJsonContent } = require('../services/ndjson.service'); const { groupSearchById, groupSearch, groupCreate, groupUpdate } = require('../services/group.service'); const { uploadTransactionOrBatchBundle } = require('../services/bundle.service'); @@ -19,6 +19,7 @@ function build(opts) { app.post('/Patient/$export', patientBulkExport); app.get('/Group/:groupId/$export', groupBulkExport); app.post('/Group/:groupId/$export', groupBulkExport); + app.get('/bulkstatus/:clientId/kickoff-import', kickoffImport); app.get('/bulkstatus/:clientId', checkBulkStatus); app.get('/:clientId/:fileName', returnNDJsonContent); app.get('/Group/:groupId', groupSearchById); diff --git a/src/services/bulkstatus.service.js b/src/services/bulkstatus.service.js index 3814ee1..f985021 100644 --- a/src/services/bulkstatus.service.js +++ b/src/services/bulkstatus.service.js @@ -8,11 +8,82 @@ const { const fs = require('fs'); const path = require('path'); const { createOperationOutcome } = require('../util/errorUtils'); +const { gatherParams } = require('../util/serviceUtils'); +const axios = require('axios'); /** The time a client is expected to wait between bulkstatus requests in seconds*/ const RETRY_AFTER = 1; /** The number of requests we allow inside the retry after window before throwing a 429 error */ const REQUEST_TOLERANCE = 10; +/** + * Kicks off an $import request to the data receiver specified in the passed parameters. + * @param {*} request the request object passed in by the user + * @param {*} reply the response object + */ +async function kickoffImport(request, reply) { + const clientId = request.params.clientId; + const bulkStatus = await getBulkExportStatus(clientId); + if (!bulkStatus) { + reply.code(404).send(new Error(`Could not find bulk export request with id: ${clientId}`)); + } + if (bulkStatus.status === BULKSTATUS_COMPLETED){ + const parameters = gatherParams(request.method, request.query, request.body, reply); + if(parameters.receiver){ + + const responseData = await getNDJsonURLs(reply, clientId); + const importManifest = { + "resourceType": "Parameters" + }; + importManifest.parameter = responseData.map(exportFile =>{ + return { + "name": "input", + "part": [ + { + "name": "url", + "valueUrl": exportFile.url + } + ] + }; + }); + + + // TODO: add provenance? + const headers = { + 'Accept': 'application/fhir+json', + 'Content-Type': 'application/fhir+json' + }; + try { + // on success, pass through the response + const results = await axios.post(parameters.receiver, importManifest, { headers }); + reply.code(results.status).send(results.body); + } catch (e) { + // on fail, pass through wrapper error 400 that contains contained resource for the operationoutcome from the receiver + const receiverOutcome = JSON.parse(e.message); + const outcome = createOperationOutcome(`Import request for id ${clientId} to receiver ${parameters.receiver} failed with the contained error.`, { + issueCode: 400, + severity: 'error' + }); + outcome.contained = [receiverOutcome]; + reply.code(400).send(outcome); + } + }else{ + reply.code(400).send( + createOperationOutcome('The kickoff-import endpoint requires a receiver location be specified in the request Parameters.', { + issueCode: 400, + severity: 'error' + }) + ); + } + }else{ + reply.code(400).send( + createOperationOutcome(`Export request with id ${clientId} is not yet complete`, { + issueCode: 400, + severity: 'error' + }) + ); + } +} + /** * Checks the status of the bulk export request. * @param {*} request the request object passed in by the user @@ -115,4 +186,4 @@ async function getNDJsonURLs(reply, clientId) { return output; } -module.exports = { checkBulkStatus }; +module.exports = { checkBulkStatus, kickoffImport }; diff --git a/src/services/export.service.js b/src/services/export.service.js index 63fcb1b..9490ff6 100644 --- a/src/services/export.service.js +++ b/src/services/export.service.js @@ -4,6 +4,7 @@ const exportQueue = require('../resources/exportQueue'); const patientResourceTypes = require('../compartment-definition/patientExportResourceTypes.json'); const { createOperationOutcome } = require('../util/errorUtils'); const { verifyPatientsInGroup } = require('../util/groupUtils'); +const { gatherParams } = require('../util/serviceUtils'); /** * Exports data from a FHIR server, whether or not it is associated with a patient. @@ -297,54 +298,6 @@ function validateExportParams(parameters, reply) { return true; } -/** - * Pulls query parameters from both the url query and request body and creates a new parameters map - * @param {string} method the request method (POST, GET, etc.) - * @param {Object} query the query terms on the request URL - * @param {Object} body http request body - * @param {Object} reply the response object - * @returns {Object} an object containing a combination of request parameters from both sources - */ -const gatherParams = (method, query, body, reply) => { - if (method === 'POST' && Object.keys(query).length > 0) { - reply.code(400).send( - createOperationOutcome('Parameters must be specified in a request body for POST requests.', { - issueCode: 400, - severity: 'error' - }) - ); - } - if (body) { - if (!body.resourceType || body.resourceType !== 'Parameters') { - reply.code(400).send( - createOperationOutcome('Parameters must be specified in a request body of resourceType "Parameters."', { - issueCode: 400, - severity: 'error' - }) - ); - } - } - const params = { ...query }; - if (body && body.parameter) { - body.parameter.reduce((acc, e) => { - if (!e.resource) { - if (e.name === 'patient') { - if (!acc[e.name]) { - acc[e.name] = [e.valueReference]; - } else { - acc[e.name].push(e.valueReference); - } - } else { - // For now, all usable params are expected to be stored under one of these fives keys - acc[e.name] = e.valueDate || e.valueString || e.valueId || e.valueCode || e.valueReference; - } - } - return acc; - }, params); - } - return params; -}; - /** * Checks provided types against the recommended resource types for patient-level export. * Filters resource types that do not appear in the patient compartment definition and throws diff --git a/src/util/serviceUtils.js b/src/util/serviceUtils.js new file mode 100644 index 0000000..4d416ba --- /dev/null +++ b/src/util/serviceUtils.js @@ -0,0 +1,52 @@ + +const { createOperationOutcome } = require('../util/errorUtils'); + +/** + * Pulls query parameters from both the url query and request body and creates a new parameters map + * @param {string} method the request method (POST, GET, etc.) + * @param {Object} query the query terms on the request URL + * @param {Object} body http request body + * @param {Object} reply the response object + * @returns {Object} an object containing a combination of request parameters from both sources + */ +function gatherParams (method, query, body, reply){ + if (method === 'POST' && Object.keys(query).length > 0) { + reply.code(400).send( + createOperationOutcome('Parameters must be specified in a request body for POST requests.', { + issueCode: 400, + severity: 'error' + }) + ); + } + if (body) { + if (!body.resourceType || body.resourceType !== 'Parameters') { + reply.code(400).send( + createOperationOutcome('Parameters must be specified in a request body of resourceType "Parameters."', { + issueCode: 400, + severity: 'error' + }) + ); + } + } + const params = { ...query }; + if (body && body.parameter) { + body.parameter.reduce((acc, e) => { + if (!e.resource) { + if (e.name === 'patient') { + if (!acc[e.name]) { + acc[e.name] = [e.valueReference]; + } else { + acc[e.name].push(e.valueReference); + } + } else { + // For now, all usable params are expected to be stored under one of these fives keys + acc[e.name] = e.valueDate || e.valueString || e.valueId || e.valueCode || e.valueReference; + } + } + return acc; + }, params); + } + return params; +} + +module.exports = { gatherParams }; \ No newline at end of file diff --git a/test/services/bulkstatus.service.test.js b/test/services/bulkstatus.service.test.js index 1383183..7b5e4e2 100644 --- a/test/services/bulkstatus.service.test.js +++ b/test/services/bulkstatus.service.test.js @@ -7,8 +7,53 @@ const fs = require('fs'); // import queue to close open handles after tests pass // TODO: investigate why queues are leaving open handles in this file const queue = require('../../src/resources/exportQueue'); +const clientId = 'testClient'; + +// describe('kickoffImport logic', () => { +// beforeAll(async () => { +// await bulkStatusSetup(); +// fs.mkdirSync(`tmp/${clientId}`, { recursive: true }); +// fs.closeSync(fs.openSync(`tmp/${clientId}/Patient.ndjson`, 'w')); +// }); + +// beforeEach(async () => { +// await app.ready(); +// }); + +// test('check 200 returned for successful kickoff', async () => { +// // TODO: mock data receiver response +// await createTestResource(testPatient, 'Patient'); +// await supertest(app.server) +// .post(`/bulkstatus/${clientId}/kickoff-import`) +// .send({ +// resourceType: 'Parameters', +// parameter: [ +// { +// name: 'receiver', +// valueString: 'http://localhost:3001/4_0_1' +// } +// ] +// }) +// .expect(200) +// .then(response => { +// expect(response.headers.expires).toBeDefined(); +// expect(response.headers['content-type']).toEqual('application/json; charset=utf-8'); +// expect(response.body.output).toEqual([ +// { type: 'Patient', url: `http://localhost:3000/${clientId}/Patient.ndjson` } +// ]); +// }); +// }); + +// afterAll(async () => { +// await cleanUpDb(); +// fs.rmSync(`tmp/${clientId}`, { recursive: true, force: true }); +// }); + +// afterEach(async () => { +// await queue.close(); +// }); +// }); describe('checkBulkStatus logic', () => { - const clientId = 'testClient'; beforeAll(async () => { await bulkStatusSetup(); From 48999e875a2a49cd1938e985f150d52e28cf3aba Mon Sep 17 00:00:00 2001 From: LaurenD Date: Fri, 28 Jun 2024 15:36:01 -0400 Subject: [PATCH 2/4] Fix outcome handling and remove test --- package.json | 5 +++ src/server/app.js | 2 +- src/services/bulkstatus.service.js | 13 +++++-- test/services/bulkstatus.service.test.js | 48 +----------------------- 4 files changed, 16 insertions(+), 52 deletions(-) diff --git a/package.json b/package.json index c377e04..1769688 100644 --- a/package.json +++ b/package.json @@ -52,5 +52,10 @@ "jest": "^27.3.1", "prettier": "^2.4.1", "typescript": "^4.4.4" + }, + "jest": { + "moduleNameMapper": { + "^axios$": "axios/dist/node/axios.cjs" + } } } diff --git a/src/server/app.js b/src/server/app.js index a736704..f74f369 100644 --- a/src/server/app.js +++ b/src/server/app.js @@ -19,7 +19,7 @@ function build(opts) { app.post('/Patient/$export', patientBulkExport); app.get('/Group/:groupId/$export', groupBulkExport); app.post('/Group/:groupId/$export', groupBulkExport); - app.get('/bulkstatus/:clientId/kickoff-import', kickoffImport); + app.post('/bulkstatus/:clientId/kickoff-import', kickoffImport); app.get('/bulkstatus/:clientId', checkBulkStatus); app.get('/:clientId/:fileName', returnNDJsonContent); app.get('/Group/:groupId', groupSearchById); diff --git a/src/services/bulkstatus.service.js b/src/services/bulkstatus.service.js index f985021..f1f2f19 100644 --- a/src/services/bulkstatus.service.js +++ b/src/services/bulkstatus.service.js @@ -49,16 +49,21 @@ async function kickoffImport(request, reply) { // TODO: add provenance? const headers = { - 'Accept': 'application/fhir+json', - 'Content-Type': 'application/fhir+json' + 'accept': 'application/fhir+json', + 'content-type': 'application/fhir+json' }; try { // on success, pass through the response const results = await axios.post(parameters.receiver, importManifest, { headers }); - reply.code(results.status).send(results.body); + reply.code(results.status).header('content-location',results.headers['content-location']).send(results.body); } catch (e) { // on fail, pass through wrapper error 400 that contains contained resource for the operationoutcome from the receiver - const receiverOutcome = JSON.parse(e.message); + let receiverOutcome; + if(e.response.data.resourceType === 'OperationOutcome'){ + receiverOutcome = e.response.data; + }else{ + receiverOutcome = createOperationOutcome(e.message, {issueCode: e.status, severity: 'error'}); + } const outcome = createOperationOutcome(`Import request for id ${clientId} to receiver ${parameters.receiver} failed with the contained error.`, { issueCode: 400, severity: 'error' diff --git a/test/services/bulkstatus.service.test.js b/test/services/bulkstatus.service.test.js index 7b5e4e2..9089b41 100644 --- a/test/services/bulkstatus.service.test.js +++ b/test/services/bulkstatus.service.test.js @@ -7,54 +7,8 @@ const fs = require('fs'); // import queue to close open handles after tests pass // TODO: investigate why queues are leaving open handles in this file const queue = require('../../src/resources/exportQueue'); -const clientId = 'testClient'; - -// describe('kickoffImport logic', () => { -// beforeAll(async () => { -// await bulkStatusSetup(); -// fs.mkdirSync(`tmp/${clientId}`, { recursive: true }); -// fs.closeSync(fs.openSync(`tmp/${clientId}/Patient.ndjson`, 'w')); -// }); - -// beforeEach(async () => { -// await app.ready(); -// }); - -// test('check 200 returned for successful kickoff', async () => { -// // TODO: mock data receiver response -// await createTestResource(testPatient, 'Patient'); -// await supertest(app.server) -// .post(`/bulkstatus/${clientId}/kickoff-import`) -// .send({ -// resourceType: 'Parameters', -// parameter: [ -// { -// name: 'receiver', -// valueString: 'http://localhost:3001/4_0_1' -// } -// ] -// }) -// .expect(200) -// .then(response => { -// expect(response.headers.expires).toBeDefined(); -// expect(response.headers['content-type']).toEqual('application/json; charset=utf-8'); -// expect(response.body.output).toEqual([ -// { type: 'Patient', url: `http://localhost:3000/${clientId}/Patient.ndjson` } -// ]); -// }); -// }); - -// afterAll(async () => { -// await cleanUpDb(); -// fs.rmSync(`tmp/${clientId}`, { recursive: true, force: true }); -// }); - -// afterEach(async () => { -// await queue.close(); -// }); -// }); describe('checkBulkStatus logic', () => { - + const clientId = 'testClient'; beforeAll(async () => { await bulkStatusSetup(); fs.mkdirSync(`tmp/${clientId}`, { recursive: true }); From 5ddc1352f6c4cfb5d5eb31c6795b92b4dec264ae Mon Sep 17 00:00:00 2001 From: LaurenD Date: Fri, 28 Jun 2024 16:49:08 -0400 Subject: [PATCH 3/4] prettier fix --- src/services/bulkstatus.service.js | 54 ++++++++++++++++-------------- src/util/serviceUtils.js | 5 ++- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/src/services/bulkstatus.service.js b/src/services/bulkstatus.service.js index f1f2f19..066b2ca 100644 --- a/src/services/bulkstatus.service.js +++ b/src/services/bulkstatus.service.js @@ -26,60 +26,64 @@ async function kickoffImport(request, reply) { if (!bulkStatus) { reply.code(404).send(new Error(`Could not find bulk export request with id: ${clientId}`)); } - if (bulkStatus.status === BULKSTATUS_COMPLETED){ + if (bulkStatus.status === BULKSTATUS_COMPLETED) { const parameters = gatherParams(request.method, request.query, request.body, reply); - if(parameters.receiver){ - + if (parameters.receiver) { const responseData = await getNDJsonURLs(reply, clientId); const importManifest = { - "resourceType": "Parameters" + resourceType: 'Parameters' }; - importManifest.parameter = responseData.map(exportFile =>{ + importManifest.parameter = responseData.map(exportFile => { return { - "name": "input", - "part": [ + name: 'input', + part: [ { - "name": "url", - "valueUrl": exportFile.url + name: 'url', + valueUrl: exportFile.url } ] }; }); - // TODO: add provenance? const headers = { - 'accept': 'application/fhir+json', + accept: 'application/fhir+json', 'content-type': 'application/fhir+json' }; try { // on success, pass through the response const results = await axios.post(parameters.receiver, importManifest, { headers }); - reply.code(results.status).header('content-location',results.headers['content-location']).send(results.body); + reply.code(results.status).header('content-location', results.headers['content-location']).send(results.body); } catch (e) { // on fail, pass through wrapper error 400 that contains contained resource for the operationoutcome from the receiver let receiverOutcome; - if(e.response.data.resourceType === 'OperationOutcome'){ + if (e.response.data.resourceType === 'OperationOutcome') { receiverOutcome = e.response.data; - }else{ - receiverOutcome = createOperationOutcome(e.message, {issueCode: e.status, severity: 'error'}); + } else { + receiverOutcome = createOperationOutcome(e.message, { issueCode: e.status, severity: 'error' }); } - const outcome = createOperationOutcome(`Import request for id ${clientId} to receiver ${parameters.receiver} failed with the contained error.`, { - issueCode: 400, - severity: 'error' - }); + const outcome = createOperationOutcome( + `Import request for id ${clientId} to receiver ${parameters.receiver} failed with the contained error.`, + { + issueCode: 400, + severity: 'error' + } + ); outcome.contained = [receiverOutcome]; reply.code(400).send(outcome); } - }else{ + } else { reply.code(400).send( - createOperationOutcome('The kickoff-import endpoint requires a receiver location be specified in the request Parameters.', { - issueCode: 400, - severity: 'error' - }) + createOperationOutcome( + 'The kickoff-import endpoint requires a receiver location be specified in the request Parameters.', + { + issueCode: 400, + severity: 'error' + } + ) ); } - }else{ + } else { reply.code(400).send( createOperationOutcome(`Export request with id ${clientId} is not yet complete`, { issueCode: 400, diff --git a/src/util/serviceUtils.js b/src/util/serviceUtils.js index 4d416ba..9d28173 100644 --- a/src/util/serviceUtils.js +++ b/src/util/serviceUtils.js @@ -1,4 +1,3 @@ - const { createOperationOutcome } = require('../util/errorUtils'); /** @@ -9,7 +8,7 @@ const { createOperationOutcome } = require('../util/errorUtils'); * @param {Object} reply the response object * @returns {Object} an object containing a combination of request parameters from both sources */ -function gatherParams (method, query, body, reply){ +function gatherParams(method, query, body, reply) { if (method === 'POST' && Object.keys(query).length > 0) { reply.code(400).send( createOperationOutcome('Parameters must be specified in a request body for POST requests.', { @@ -49,4 +48,4 @@ function gatherParams (method, query, body, reply){ return params; } -module.exports = { gatherParams }; \ No newline at end of file +module.exports = { gatherParams }; From 614dbe84c555c378767c41502b52f60b7db863cd Mon Sep 17 00:00:00 2001 From: Chris Hossenlopp Date: Mon, 1 Jul 2024 17:40:37 -0400 Subject: [PATCH 4/4] Update bulkdata ig urls in readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 561b113..b316cbc 100644 --- a/README.md +++ b/README.md @@ -142,11 +142,11 @@ Endpoint: `GET [fhir base]/$export` Alternatively, a POST request (`POST [fhir base]/$export`) can be sent. The export parameters must be supplied using a FHIR [Parameters Resource](http://hl7.org/fhir/R4/parameters.html) in the request body. -For more information on the export endpoints, read this documentation on the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export/index.html#request-flow). +For more information on the export endpoints, read this documentation on the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export.html#bulk-data-export-operation-request-flow). #### Bulk Status -This server supports the bulk status endpoint in support of the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export/index.html#request-flow). +This server supports the bulk status endpoint in support of the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export.html#bulk-data-export-operation-request-flow). Endpoint: `GET [fhir base]/bulkstatus/[client id]`