Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kickoff import #47

Merged
merged 4 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,17 @@ Endpoint: `GET [fhir base]/$export`

Alternatively, a POST request (`POST [fhir base]/$export`) can be sent. The export parameters must be supplied using a FHIR [Parameters Resource](http://hl7.org/fhir/R4/parameters.html) in the request body.

For more information on the export endpoints, read this documentation on the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export/index.html#request-flow).
For more information on the export endpoints, read this documentation on the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export.html#bulk-data-export-operation-request-flow).

#### Bulk Status

This server supports the bulk status endpoint in support of the [Export Request Flow](https://hl7.org/fhir/uv/bulkdata/export.html#bulk-data-export-operation-request-flow).

Endpoint: `GET [fhir base]/bulkstatus/[client id]`

The server additionally supports a related convenience endpoint which kicks off an `$import` operation for an existing export request. The exported data is selected for import to a data receiver server. This import server location should be specifed with parameters using a FHIR [Parameters Resource](http://hl7.org/fhir/R4/parameters.html) with name `receiver` in the request body. The server will respond with the same bulk status information according to the progress of the existing export workflow.

Endpoint: `POST [fhir base]/bulkstatus/[client id]/kickoff-import`

## Supported Query Parameters

Expand Down
5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,10 @@
"jest": "^27.3.1",
"prettier": "^2.4.1",
"typescript": "^4.4.4"
},
"jest": {
"moduleNameMapper": {
"^axios$": "axios/dist/node/axios.cjs"
}
}
}
3 changes: 2 additions & 1 deletion src/server/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const fastify = require('fastify');
const cors = require('@fastify/cors');

const { bulkExport, patientBulkExport, groupBulkExport } = require('../services/export.service');
const { checkBulkStatus } = require('../services/bulkstatus.service');
const { checkBulkStatus, kickoffImport } = require('../services/bulkstatus.service');
const { returnNDJsonContent } = require('../services/ndjson.service');
const { groupSearchById, groupSearch, groupCreate, groupUpdate } = require('../services/group.service');
const { uploadTransactionOrBatchBundle } = require('../services/bundle.service');
Expand All @@ -19,6 +19,7 @@ function build(opts) {
app.post('/Patient/$export', patientBulkExport);
app.get('/Group/:groupId/$export', groupBulkExport);
app.post('/Group/:groupId/$export', groupBulkExport);
app.post('/bulkstatus/:clientId/kickoff-import', kickoffImport);
app.get('/bulkstatus/:clientId', checkBulkStatus);
app.get('/:clientId/:fileName', returnNDJsonContent);
app.get('/Group/:groupId', groupSearchById);
Expand Down
82 changes: 81 additions & 1 deletion src/services/bulkstatus.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,91 @@
const fs = require('fs');
const path = require('path');
const { createOperationOutcome } = require('../util/errorUtils');
const { gatherParams } = require('../util/serviceUtils');
const axios = require('axios');
/** The time a client is expected to wait between bulkstatus requests in seconds*/
const RETRY_AFTER = 1;
/** The number of requests we allow inside the retry after window before throwing a 429 error */
const REQUEST_TOLERANCE = 10;

/**
* Kicks off an $import request to the data receiver specified in the passed parameters.
* @param {*} request the request object passed in by the user
* @param {*} reply the response object
*/
async function kickoffImport(request, reply) {
const clientId = request.params.clientId;
const bulkStatus = await getBulkExportStatus(clientId);
if (!bulkStatus) {
reply.code(404).send(new Error(`Could not find bulk export request with id: ${clientId}`));
}
if (bulkStatus.status === BULKSTATUS_COMPLETED) {
const parameters = gatherParams(request.method, request.query, request.body, reply);
if (parameters.receiver) {
const responseData = await getNDJsonURLs(reply, clientId);
const importManifest = {
resourceType: 'Parameters'
};

Check warning on line 35 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
importManifest.parameter = responseData.map(exportFile => {

Check warning on line 36 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🕹️ Function is not covered

Warning! Not covered function
return {
name: 'input',
part: [
{
name: 'url',
valueUrl: exportFile.url
}
]
};

Check warning on line 45 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
});

Check warning on line 46 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

// TODO: add provenance?
const headers = {
accept: 'application/fhir+json',
'content-type': 'application/fhir+json'
};

Check warning on line 52 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
try {
// on success, pass through the response
const results = await axios.post(parameters.receiver, importManifest, { headers });
reply.code(results.status).header('content-location', results.headers['content-location']).send(results.body);

Check warning on line 56 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
} catch (e) {
// on fail, pass through wrapper error 400 that contains contained resource for the operationoutcome from the receiver
let receiverOutcome;
if (e.response.data.resourceType === 'OperationOutcome') {
receiverOutcome = e.response.data;
} else {
receiverOutcome = createOperationOutcome(e.message, { issueCode: e.status, severity: 'error' });

Check warning on line 63 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
}

Check warning on line 64 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

Check warning on line 64 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch

Check warning on line 64 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch
const outcome = createOperationOutcome(
`Import request for id ${clientId} to receiver ${parameters.receiver} failed with the contained error.`,
{
issueCode: 400,
severity: 'error'
}
);

Check warning on line 71 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
outcome.contained = [receiverOutcome];
reply.code(400).send(outcome);
}

Check warning on line 74 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
} else {
reply.code(400).send(
createOperationOutcome(
'The kickoff-import endpoint requires a receiver location be specified in the request Parameters.',
{
issueCode: 400,
severity: 'error'
}
)
);

Check warning on line 84 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
}

Check warning on line 85 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

Check warning on line 85 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch

Check warning on line 85 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch
} else {
reply.code(400).send(
createOperationOutcome(`Export request with id ${clientId} is not yet complete`, {
issueCode: 400,
severity: 'error'
})
);
}

Check warning on line 93 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

Check warning on line 93 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch

Check warning on line 93 in src/services/bulkstatus.service.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch
}

/**
* Checks the status of the bulk export request.
* @param {*} request the request object passed in by the user
Expand Down Expand Up @@ -115,4 +195,4 @@
return output;
}

module.exports = { checkBulkStatus };
module.exports = { checkBulkStatus, kickoffImport };
49 changes: 1 addition & 48 deletions src/services/export.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const exportQueue = require('../resources/exportQueue');
const patientResourceTypes = require('../compartment-definition/patientExportResourceTypes.json');
const { createOperationOutcome } = require('../util/errorUtils');
const { verifyPatientsInGroup } = require('../util/groupUtils');
const { gatherParams } = require('../util/serviceUtils');

/**
* Exports data from a FHIR server, whether or not it is associated with a patient.
Expand Down Expand Up @@ -297,54 +298,6 @@ function validateExportParams(parameters, reply) {
return true;
}

/**
* Pulls query parameters from both the url query and request body and creates a new parameters map
* @param {string} method the request method (POST, GET, etc.)
* @param {Object} query the query terms on the request URL
* @param {Object} body http request body
* @param {Object} reply the response object
* @returns {Object} an object containing a combination of request parameters from both sources
*/
const gatherParams = (method, query, body, reply) => {
if (method === 'POST' && Object.keys(query).length > 0) {
reply.code(400).send(
createOperationOutcome('Parameters must be specified in a request body for POST requests.', {
issueCode: 400,
severity: 'error'
})
);
}
if (body) {
if (!body.resourceType || body.resourceType !== 'Parameters') {
reply.code(400).send(
createOperationOutcome('Parameters must be specified in a request body of resourceType "Parameters."', {
issueCode: 400,
severity: 'error'
})
);
}
}
const params = { ...query };
if (body && body.parameter) {
body.parameter.reduce((acc, e) => {
if (!e.resource) {
if (e.name === 'patient') {
if (!acc[e.name]) {
acc[e.name] = [e.valueReference];
} else {
acc[e.name].push(e.valueReference);
}
} else {
// For now, all usable params are expected to be stored under one of these fives keys
acc[e.name] = e.valueDate || e.valueString || e.valueId || e.valueCode || e.valueReference;
}
}
return acc;
}, params);
}
return params;
};

/**
* Checks provided types against the recommended resource types for patient-level export.
* Filters resource types that do not appear in the patient compartment definition and throws
Expand Down
51 changes: 51 additions & 0 deletions src/util/serviceUtils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
const { createOperationOutcome } = require('../util/errorUtils');

/**
* Pulls query parameters from both the url query and request body and creates a new parameters map
* @param {string} method the request method (POST, GET, etc.)
* @param {Object} query the query terms on the request URL
* @param {Object} body http request body
* @param {Object} reply the response object
* @returns {Object} an object containing a combination of request parameters from both sources
*/
function gatherParams(method, query, body, reply) {
if (method === 'POST' && Object.keys(query).length > 0) {
reply.code(400).send(
createOperationOutcome('Parameters must be specified in a request body for POST requests.', {
issueCode: 400,
severity: 'error'
})
);
}
if (body) {
if (!body.resourceType || body.resourceType !== 'Parameters') {
reply.code(400).send(
createOperationOutcome('Parameters must be specified in a request body of resourceType "Parameters."', {
issueCode: 400,
severity: 'error'
})
);
}
}
const params = { ...query };
if (body && body.parameter) {
body.parameter.reduce((acc, e) => {
if (!e.resource) {
if (e.name === 'patient') {
if (!acc[e.name]) {
acc[e.name] = [e.valueReference];
} else {
acc[e.name].push(e.valueReference);
}
} else {
// For now, all usable params are expected to be stored under one of these fives keys
acc[e.name] = e.valueDate || e.valueString || e.valueId || e.valueCode || e.valueReference;
}
}
return acc;
}, params);
}
return params;
}

module.exports = { gatherParams };
1 change: 0 additions & 1 deletion test/services/bulkstatus.service.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const fs = require('fs');
const queue = require('../../src/resources/exportQueue');
describe('checkBulkStatus logic', () => {
const clientId = 'testClient';

beforeAll(async () => {
await bulkStatusSetup();
fs.mkdirSync(`tmp/${clientId}`, { recursive: true });
Expand Down
Loading