Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(#138): move polling to openhim channel config #139

Open
wants to merge 7 commits into
base: openmrs-mediator
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cht-config/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"test-targets": "npm run eslint && TZ=Africa/Nairobi mocha --reporter progress test/targets/*.spec.js --timeout 10000",
"test-contact-summary": "npm run eslint && TZ=Africa/Nairobi mocha --reporter progress test/contact-summary/*.spec.js --timeout 10000",
"test-unit": "TZ=Africa/Nairobi mocha --recursive --reporter spec test --timeout 20000",
"deploy": "wait-on http://api:5988/ && sleep 100 && sh ./script.sh"
"deploy": "wait-on http://api:5988/ && sh ./script.sh"
},
"devDependencies": {
"chai": "^4.2.0",
Expand Down
3 changes: 3 additions & 0 deletions docker/docker-compose.cht-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ services:
- COUCH_URL=http://${COUCHDB_USER:-admin}:${COUCHDB_PASSWORD:-password}@haproxy:${HAPROXY_PORT:-5984}/medic
- BUILDS_URL=${MARKET_URL_READ:-https://staging.dev.medicmobile.org}/${BUILDS_SERVER:-_couch/builds_4}
- UPGRADE_SERVICE_URL=${UPGRADE_SERVICE_URL:-http://localhost:5100}
ports:
- "5988:5988"
logging:
driver: "local"
options:
Expand Down Expand Up @@ -117,6 +119,7 @@ services:
- "COUCHDB_PASSWORD=${COUCHDB_PASSWORD:-password}"
depends_on:
- couchdb
- api
networks:
- cht-net

Expand Down
10 changes: 5 additions & 5 deletions docker/docker-compose.mediator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ services:
- "OPENHIM_API_URL=${OPENHIM_API_URL:-https://openhim-core:8080}"
- "PORT=${PORT:-6000}"
- "FHIR_URL=${FHIR_URL:-http://openhim-core:5001/fhir}"
- "FHIR_USERNAME=${MEDIATORS_USERNAME:-interop-client}"
- "FHIR_PASSWORD=${MEDIATORS_PASSWORD:-interop-password}"
- "OPENMRS_URL=${OPENMRS_CHANNEL_URL}"
- "OPENMRS_USERNAME=${MEDIATORS_USERNAME}"
- "OPENMRS_PASSWORD=${MEDIATORS_PASSWORD}"
- "FHIR_USERNAME=${FHIR_USERNAME:-interop-client}"
- "FHIR_PASSWORD=${FHIR_PASSWORD:-interop-password}"
- "OPENMRS_CHANNEL_URL=${OPENMRS_CHANNEL_URL:-http://openhim-core:5001/openmrs}"
- "OPENMRS_CHANNEL_USERNAME=${OPENMRS_CHANNEL_USERNAME:-interop-client}"
- "OPENMRS_CHANNEL_PASSWORD=${OPENMRS_CHANNEL_PASSWORD:-interop-password}"
- "CHT_URL=${CHT_URL:-https://nginx}"
- "CHT_USERNAME=${CHT_USERNAME:-admin}"
- "CHT_PASSWORD=${CHT_PASSWORD:-password}"
Expand Down
20 changes: 12 additions & 8 deletions mediator/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as dotenv from 'dotenv';
dotenv.config();

export const PORT = process.env.PORT || 6000;
const REQUEST_TIMEOUT = Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000'));

export const OPENHIM = {
username: getEnvironmentVariable('OPENHIM_USERNAME', '[email protected]'),
Expand All @@ -11,27 +12,30 @@ export const OPENHIM = {
};

export const FHIR = {
url: getEnvironmentVariable('FHIR_URL', 'http://openhim-core:5001/fhir'),
url: getEnvironmentVariable('FHIR_URL', 'https://openhim-core:5001/fhir'),
username: getEnvironmentVariable('FHIR_USERNAME', 'interop-client'),
password: getEnvironmentVariable('FHIR_PASSWORD', 'interop-password'),
timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000'))
timeout: REQUEST_TIMEOUT
};

export const CHT = {
url: getEnvironmentVariable('CHT_URL', 'https://nginx'),
username: getEnvironmentVariable('CHT_USERNAME', 'admin'),
password: getEnvironmentVariable('CHT_PASSWORD', 'password'),
timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000'))
timeout: REQUEST_TIMEOUT
};

export const OPENMRS = {
url: getEnvironmentVariable('OPENMRS_URL', ''),
username: getEnvironmentVariable('OPENMRS_USERNAME', ''),
password: getEnvironmentVariable('OPENMRS_PASSWORD', ''),
timeout: Number(getEnvironmentVariable('REQUEST_TIMEOUT', '5000'))
url: getEnvironmentVariable('OPENMRS_CHANNEL_URL', 'https://openhim-core:5001/openmrs'),
username: getEnvironmentVariable('OPENMRS_CHANNEL_USERNAME', 'interop-client'),
password: getEnvironmentVariable('OPENMRS_CHANNEL_PASSWORD', 'interop-password'),
timeout: REQUEST_TIMEOUT
};

export const SYNC_INTERVAL = getEnvironmentVariable('SYNC_INTERVAL', '60000');
// hard code sync interval to 1 minute because it is hard coded in mediator config
export const SYNC_INTERVAL = '60';
// how far back shoudl the sync look for new resources

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the default sync period to the comment in a human-readable manner would be helpful. e.g. How far back should the sync look for new resources. Defaults to 1 hour.

export const SYNC_PERIOD = getEnvironmentVariable('SYNC_PERIOD', '3600');

function getEnvironmentVariable(env: string, def: string) {
if (process.env.NODE_ENV === 'test') {
Expand Down
21 changes: 20 additions & 1 deletion mediator/config/openmrs_mediator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,30 @@ export const openMRSMediatorConfig = {
version: '1.0.0',
name: 'OpenMRS Mediator',
description: 'A mediator to sync CHT data with OpenMRS',
defaultChannelConfig: [
{
name: 'OpenMRS Sync',
urlPattern: '^/trigger$',
routes: [
{
name: 'OpenMRS polling Mediator',
host: 'mediator',
path: '/openmrs/sync',
port: 6000,
primary: true,
type: 'http',
},
],
allow: ['interop'],
type: 'polling',
pollingSchedule: '1 minute'
},
],
endpoints: [
{
name: 'OpenMRS Mediator',
host: 'mediator',
path: '/',
path: '/openmrs/sync',
port: '6000',
primary: true,
type: 'http',
Expand Down
8 changes: 5 additions & 3 deletions mediator/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ OPENHIM_API_URL = "https://openhim-core:8080"
PORT = 6000

FHIR_URL = http://openhim-core:5001/fhir
OPENMRS_URL = http://openhim-core:5001/openmrs
FHIR_USERNAME = interop-client
FHIR_PASSWORD = interop-password

MEDIATORS_USERNAME = interop-client
MEDIATORS_PASSWORD = interop-password
OPENMRS_CHANNEL_URL = http://openhim-core:5001/openmrs
OPENMRS_CHANNEL_USERNAME = interop-client
OPENMRS_CHANNEL_PASSWORD = interop-password

CHT_URL = http://nginx
CHT_USERNAME = admin
Expand Down
24 changes: 8 additions & 16 deletions mediator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ import { mediatorConfig } from './config/mediator';
import { openMRSMediatorConfig } from './config/openmrs_mediator';
import { logger } from './logger';
import bodyParser from 'body-parser';
import {PORT, OPENHIM, SYNC_INTERVAL, OPENMRS} from './config';
import {PORT, OPENHIM, OPENMRS} from './config';
import patientRoutes from './src/routes/patient';
import serviceRequestRoutes from './src/routes/service-request';
import encounterRoutes from './src/routes/encounter';
import organizationRoutes from './src/routes/organization';
import endpointRoutes from './src/routes/endpoint';
import chtRoutes from './src/routes/cht';
import openMRSRoutes from './src/routes/openmrs';
import { registerMediatorCallback } from './src/utils/openhim';
import { syncPatients, syncEncounters } from './src/utils/openmrs_sync'
import os from 'os';

const {registerMediator} = require('openhim-mediator-utils');
Expand All @@ -21,11 +21,12 @@ const app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({extended: true}));

/*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this code rather than commenting it out.

app.get('*', (_: Request, res: Response) => {
const osUptime = os.uptime();
const processUptime = process.uptime();
res.send({status: 'success', osuptime: osUptime, processuptime: processUptime});
});
});*/

// routes for valid fhir resources
app.use('/patient', patientRoutes);
Expand All @@ -34,30 +35,21 @@ app.use('/encounter', encounterRoutes);
app.use('/organization', organizationRoutes);
app.use('/endpoint', endpointRoutes);

// routes for cht docs
// routes for CHT docs
app.use('/cht', chtRoutes);

// routes for OpenMRS
app.use('/openmrs', openMRSRoutes);

if (process.env.NODE_ENV !== 'test') {
app.listen(PORT, () => logger.info(`Server listening on port ${PORT}`));

// TODO => inject the 'port' and 'http scheme' into 'mediatorConfig'
registerMediator(OPENHIM, mediatorConfig, registerMediatorCallback);

// if OPENMRS is specified, register its mediator
// and start the sync background task
if (OPENMRS.url) {
registerMediator(OPENHIM, openMRSMediatorConfig, registerMediatorCallback);
// start patient and ecnounter sync in the background
setInterval(async () => {
try {
const startTime = new Date();
startTime.setHours(startTime.getHours() - 1);
await syncPatients(startTime);
await syncEncounters(startTime);
} catch (error: any) {
logger.error(error);
}
}, Number(SYNC_INTERVAL));
}
}

Expand Down
13 changes: 1 addition & 12 deletions mediator/src/controllers/cht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,7 @@ export async function createPatient(chtPatientDoc: any) {
}

const fhirPatient = buildFhirPatientFromCht(chtPatientDoc.doc);
const patientResponse = await getFHIRPatientResource(fhirPatient.id || '');
if (patientResponse.status != 200){
// any error, just return it to caller
return patientResponse;
} else if (patientResponse.data.total > 0) {
// updates not currently supported
return patientResponse;
} else {
// create or update in the FHIR Server
// even for create, sends a PUT request
return updateFhirResource({ ...fhirPatient, resourceType: 'Patient' });
}
return updateFhirResource({ ...fhirPatient, resourceType: 'Patient' });
}

export async function updatePatientIds(chtFormDoc: any) {
Expand Down
18 changes: 18 additions & 0 deletions mediator/src/controllers/openmrs.ts
witash marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { logger } from '../../logger';
import { syncPatients, syncEncounters } from '../utils/openmrs_sync'
import { SYNC_PERIOD } from '../../config'

export async function sync() {
try {
let now = Date.now();
let syncPeriod = parseInt(SYNC_PERIOD, 10);
let startTime = new Date(now - syncPeriod);

await syncPatients(startTime);
await syncEncounters(startTime);
witash marked this conversation as resolved.
Show resolved Hide resolved
return { status: 200, data: { message: `OpenMRS sync completed successfully`} };
} catch(error: any) {
logger.error(error);
return { status: 500, data: { message: `Error during OpenMRS Sync`} };
}
}
12 changes: 12 additions & 0 deletions mediator/src/routes/openmrs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Router } from 'express';
import { requestHandler } from '../utils/request';
import { sync } from '../controllers/openmrs'

const router = Router();

router.get(
'/sync',
requestHandler((req) => sync())
);

export default router;
4 changes: 0 additions & 4 deletions mediator/src/routes/tests/cht.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ jest.mock('axios');

describe('POST /cht/patient', () => {
it('accepts incoming request with valid patient resource', async () => {
jest.spyOn(fhir, 'getFHIRPatientResource').mockResolvedValueOnce({
data: {},
status: 200,
});
jest.spyOn(fhir, 'updateFhirResource').mockResolvedValueOnce({
data: {},
status: 200,
Expand Down
35 changes: 35 additions & 0 deletions mediator/src/routes/tests/openmrs.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import request from 'supertest';
import app from '../../..';
import * as openmrs_sync from '../../utils/openmrs_sync';
import axios from 'axios';

jest.mock('axios');

describe('GET /openmrs/sync', () => {
it('calls syncPatients and syncEncouners', async () => {
jest.spyOn(openmrs_sync, 'syncPatients').mockImplementation(async (startTime) => {
});

jest.spyOn(openmrs_sync, 'syncEncounters').mockImplementation(async (startTime) => {
});

const res = await request(app).get('/openmrs/sync').send();

expect(res.status).toBe(200);

expect(openmrs_sync.syncPatients).toHaveBeenCalled();
expect(openmrs_sync.syncEncounters).toHaveBeenCalled();
});

it('returns 500 if syncPatients throws an error', async () => {
jest.spyOn(openmrs_sync, 'syncPatients').mockImplementation(async (startTime) => {
throw new Error('Sync Failed');
});

const res = await request(app).get('/openmrs/sync').send();

expect(res.status).toBe(500);

expect(openmrs_sync.syncPatients).toHaveBeenCalled();
});
});
2 changes: 1 addition & 1 deletion mediator/src/utils/fhir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export function copyIdToNamedIdentifier(fromResource: any, toResource: fhir4.Pat
}

export function getIdType(resource: fhir4.Patient | fhir4.Encounter, idType: fhir4.CodeableConcept): string{
return resource?.identifier?.find((id: any) => id?.type.text == idType.text)?.value || '';
return resource?.identifier?.find((id: any) => id?.type?.text == idType.text)?.value || '';
}

export function addId(resource: fhir4.Patient | fhir4.Encounter, idType: fhir4.CodeableConcept, value: string){
Expand Down
4 changes: 4 additions & 0 deletions mediator/src/utils/openmrs.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { OPENMRS } from '../../config';
import axios from 'axios';
import { logger } from '../../logger';
import https from 'https';

const axiosOptions = {
auth: {
username: OPENMRS.username,
password: OPENMRS.password,
},
httpsAgent: new https.Agent({
rejectUnauthorized: false,
}),
timeout: OPENMRS.timeout
};

Expand Down
35 changes: 19 additions & 16 deletions mediator/src/utils/openmrs_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,34 @@ export async function compare(
// get the key for each resource and create a Map
const fhirIds = new Map(comparison.fhirResources.map(resource => [getKey(resource), resource]));

function isValidDate(resource: fhir4.Resource) {
// if lastUpdated is missing or invalid, cannot proceed, throw an error
if (!resource.meta?.lastUpdated) {
throw new Error("Last updated missing");
}
const lastUpdated = new Date(resource.meta.lastUpdated);
if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) {
throw new Error("Invalid date format");
}

// don't sync resources created with 2 * SYNC_INTERVAL of start time
const syncWindow = (Number(SYNC_INTERVAL) * 1000) * 2
const diff = lastUpdated.getTime() - startTime.getTime();
return diff > syncWindow;
}

comparison.openMRSResources.forEach((openMRSResource) => {
const key = getKey(openMRSResource);
if (fhirIds.has(key)) {
// ok so the fhir server already has it
results.toupdate.push(openMRSResource);
fhirIds.delete(key);
} else {
const lastUpdated = new Date(openMRSResource.meta?.lastUpdated!);
if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) {
throw new Error("Invalid date format");
}
const diff = lastUpdated.getTime() - startTime.getTime();
if (diff > (Number(SYNC_INTERVAL) * 2)){
results.incoming.push(openMRSResource);
}
} else if (isValidDate(openMRSResource)){
results.incoming.push(openMRSResource);
}
});

fhirIds.forEach((resource, key) => {
const lastUpdated = new Date(resource.meta?.lastUpdated || '');
if (isNaN(lastUpdated.getTime()) || isNaN(startTime.getTime())) {
throw new Error("Invalid date format");
}
const diff = lastUpdated.getTime() - startTime.getTime();
if (diff > (Number(SYNC_INTERVAL) * 2)){
if (isValidDate(resource)) {
results.outgoing.push(resource);
}
});
Expand Down
Loading
Loading