Skip to content

Commit

Permalink
Merge pull request #66 from SAFEHR-data/sk/waveform-dev
Browse files Browse the repository at this point in the history
Live waveform data in Emap
  • Loading branch information
jeremyestein authored Jan 22, 2025
2 parents 04bd6cc + ff9e0cb commit d7f74bc
Show file tree
Hide file tree
Showing 91 changed files with 4,382 additions and 178 deletions.
31 changes: 30 additions & 1 deletion .github/workflows/emap-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main, develop ]
branches: [ main, develop, sk/waveform-dev ]
repository_dispatch:
permissions:
contents: read
Expand All @@ -18,6 +18,7 @@ jobs:
emap-interchange: ${{ steps.filter.outputs.emap-interchange }}
core: ${{ steps.filter.outputs.core }}
hl7-reader: ${{ steps.filter.outputs.hl7-reader }}
waveform-reader: ${{ steps.filter.outputs.waveform-reader }}
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
Expand All @@ -44,6 +45,11 @@ jobs:
- 'emap-interchange/**'
- 'emap-star/**'
- 'hl7-reader/**'
waveform-reader:
- '.github/**'
- 'emap-checker.xml'
- 'emap-interchange/**'
- 'waveform-reader/**'
emap-star-tests:
needs: [filter]
runs-on: ubuntu-latest
Expand Down Expand Up @@ -138,3 +144,26 @@ jobs:
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'
waveform-reader-tests:
needs: [filter]
runs-on: ubuntu-latest
if: needs.filter.outputs.waveform-reader == 'true'
steps:
- uses: actions/checkout@v3
- name: Set up java
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
cache: 'maven'
- name: Build emap-interchange
working-directory: emap-interchange
run: mvn clean install
- name: Run waveform-reader tests
working-directory: waveform-reader
run: mvn clean test
- name: Publish Test Report
uses: mikepenz/action-junit-report@v2
if: success() || failure()
with:
report_paths: '**/target/surefire-reports/TEST-*.xml'
3 changes: 3 additions & 0 deletions cassandra-config-envs.EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# these settings work on the GAE but if the process is getting OOM killed you can reduce them
MAX_HEAP_SIZE=4G
HEAP_NEWSIZE=800M
2 changes: 1 addition & 1 deletion core/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ SHELL ["/bin/bash", "-c"]
# Install zip & unzip for glowroot
RUN apt update; apt install -yy zip
# Set up the Maven proxy settings
COPY core/set_mvn_proxy.sh /app/
COPY docker/set_mvn_proxy.sh /app/
# Download and extract glowroot
WORKDIR /app/core
RUN curl -s https://api.github.com/repos/glowroot/glowroot/releases/latest \
Expand Down
2 changes: 2 additions & 0 deletions core/core-config-envs.EXAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ SPRING_RABBITMQ_PORT=5672
SPRING_RABBITMQ_USERNAME=emap
SPRING_RABBITMQ_PASSWORD=yourstrongpassword
LOGGING_LEVEL_UK_AC_UCL_RITS_INFORM=INFO
CORE_WAVEFORM_RETENTION_HOURS=24
CORE_WAVEFORM_IS_NON_CURRENT_TEST_DATA=true
TZ=Europe/London
7 changes: 5 additions & 2 deletions core/docker-compose.fakeuds.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
version: '3.2'
services:
fakeuds:
image: postgres:10.5-alpine
image: postgres:11-alpine
environment:
POSTGRES_DB: fakeuds
# should be from global config?
POSTGRES_USER: inform_user
POSTGRES_PASSWORD: inform
volumes:
- postgres-data-fakeuds:/var/lib/postgresql/data
- ./init/uds:/docker-entrypoint-initdb.d
restart: unless-stopped
ports:
- "${FAKEUDS_PORT}:5432"
Expand Down
19 changes: 13 additions & 6 deletions core/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.2'
services:
core:
build:
Expand All @@ -15,8 +14,10 @@ services:
driver: "json-file"
restart: on-failure
depends_on:
- glowroot-central
- rabbitmq
glowroot-central:
condition: service_started
rabbitmq:
condition: service_healthy
rabbitmq:
image: rabbitmq:management
env_file:
Expand All @@ -25,12 +26,18 @@ services:
- "${RABBITMQ_PORT}:5672"
- "${RABBITMQ_ADMIN_PORT}:15672"
restart: on-failure
healthcheck:
# rabbitmq server crashes if any rabbitmq-diagnostics cmd is run very soon
# after starting, so can't check too aggressively here
test: rabbitmq-diagnostics -q check_running
interval: 30s
timeout: 10s
retries: 3
cassandra:
image: cassandra
restart: on-failure
environment:
- MAX_HEAP_SIZE=4G
- HEAP_NEWSIZE=800M
env_file:
- ../../config/cassandra-config-envs
glowroot-central:
build:
context: ..
Expand Down
3 changes: 3 additions & 0 deletions core/init/uds/uds-init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- For initialising a local DB in the case where the UDS/IDS does not exist (eg. Emap in a box),
-- NOT a fake for unit testing
CREATE SCHEMA IF NOT EXISTS uds_schema;
1 change: 0 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@
<version>${checkstyle.plugin.version}</version>
<configuration>
<configLocation>../emap-checker.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<linkXRef>false</linkXRef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.EnableScheduling;
import uk.ac.ucl.rits.inform.datasinks.emapstar.exceptions.MessageIgnoredException;
import uk.ac.ucl.rits.inform.datasinks.emapstar.repos.IdsEffectLogging;
import uk.ac.ucl.rits.inform.datasinks.emapstar.repos.IdsEffectLoggingRepository;
Expand All @@ -34,6 +35,7 @@
"uk.ac.ucl.rits.inform.datasinks",
"uk.ac.ucl.rits.inform.informdb"})
@EnableCaching
@EnableScheduling
public class App {
private static final Logger logger = LoggerFactory.getLogger(App.class);

Expand Down Expand Up @@ -71,7 +73,7 @@ public static void main(String[] args) {
* @throws IOException if rabbitmq channel has a problem
*/
@Profile("default")
@RabbitListener(queues = {"hl7Queue", "databaseExtracts", "extensionProjects"})
@RabbitListener(queues = "#{'${core.rabbitmq.listen_queues}'.split(',')}")
public void receiveMessage(EmapOperationMessage msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
IdsEffectLogging idsEffectLogging = new IdsEffectLogging();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import uk.ac.ucl.rits.inform.datasinks.emapstar.dataprocessors.FormProcessor;
import uk.ac.ucl.rits.inform.datasinks.emapstar.dataprocessors.LabProcessor;
import uk.ac.ucl.rits.inform.datasinks.emapstar.dataprocessors.PatientStateProcessor;
import uk.ac.ucl.rits.inform.datasinks.emapstar.dataprocessors.WaveformProcessor;
import uk.ac.ucl.rits.inform.interchange.AdvanceDecisionMessage;
import uk.ac.ucl.rits.inform.interchange.ConsultMetadata;
import uk.ac.ucl.rits.inform.interchange.ConsultRequest;
import uk.ac.ucl.rits.inform.interchange.EmapOperationMessageProcessingException;
import uk.ac.ucl.rits.inform.interchange.EmapOperationMessageProcessor;
import uk.ac.ucl.rits.inform.interchange.location.DepartmentMetadata;
import uk.ac.ucl.rits.inform.interchange.location.LocationMetadata;
import uk.ac.ucl.rits.inform.interchange.PatientAllergy;
import uk.ac.ucl.rits.inform.interchange.PatientInfection;
import uk.ac.ucl.rits.inform.interchange.PatientProblem;
Expand All @@ -39,8 +38,11 @@
import uk.ac.ucl.rits.inform.interchange.form.FormQuestionMetadataMsg;
import uk.ac.ucl.rits.inform.interchange.lab.LabMetadataMsg;
import uk.ac.ucl.rits.inform.interchange.lab.LabOrderMsg;
import uk.ac.ucl.rits.inform.interchange.location.DepartmentMetadata;
import uk.ac.ucl.rits.inform.interchange.location.LocationMetadata;
import uk.ac.ucl.rits.inform.interchange.visit_observations.Flowsheet;
import uk.ac.ucl.rits.inform.interchange.visit_observations.FlowsheetMetadata;
import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage;

import javax.annotation.PostConstruct;
import java.time.Instant;
Expand Down Expand Up @@ -69,6 +71,8 @@ public class InformDbOperations implements EmapOperationMessageProcessor {
private AdvanceDecisionProcessor advanceDecisionProcessor;
@Autowired
private FormProcessor formProcessor;
@Autowired
private WaveformProcessor waveformProcessor;

@Value("${features.sde:false}")
private boolean sdeFeatureEnabled;
Expand Down Expand Up @@ -103,6 +107,12 @@ public void processMessage(PatientAllergy msg) throws EmapOperationMessageProces
patientStateProcessor.processMessage(msg, storedFrom);
}

@Override
public void processMessage(WaveformMessage msg) throws EmapOperationMessageProcessingException {
Instant storedFrom = Instant.now();
waveformProcessor.processMessage(msg, storedFrom);
}

/**
* @param msg the ADT message to process
* @throws EmapOperationMessageProcessingException if message cannot be processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import uk.ac.ucl.rits.inform.informdb.visit_recordings.VisitObservationTypeAudit;
import uk.ac.ucl.rits.inform.interchange.visit_observations.Flowsheet;
import uk.ac.ucl.rits.inform.interchange.visit_observations.FlowsheetMetadata;
import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage;

import javax.annotation.Resource;
import java.time.Instant;
Expand Down Expand Up @@ -122,6 +123,21 @@ public void processFlowsheet(Flowsheet msg, HospitalVisit visit, Instant storedF
}
}

/**
* Process the parts of a waveform message that relate to its visit observation type.
* @param msg waveform msg from which to extract VisitObservationType data
* @param storedFrom stored from
* @return new or existing VisitObservationType
*/
public VisitObservationType getOrCreateFromWaveform(WaveformMessage msg, Instant storedFrom) {
VisitObservationType observationType = cache.getOrCreatePersistedObservationType(
msg.getSourceStreamId(), msg.getSourceStreamId(), "waveform", msg.getObservationTime(), storedFrom);
observationType.setName(msg.getMappedStreamDescription());
observationType.setIsRealTime(true);
// Might want to consider updating/auditing if the name/description changes. See Issue #71
return observationType;
}

/**
* If mapping between internal id and interface id already exists, nothing (?) needs to change.
* @param interfaceId Identifier of observation type.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package uk.ac.ucl.rits.inform.datasinks.emapstar.controllers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import uk.ac.ucl.rits.inform.datasinks.emapstar.exceptions.MessageIgnoredException;
import uk.ac.ucl.rits.inform.datasinks.emapstar.repos.LocationVisitRepository;
import uk.ac.ucl.rits.inform.datasinks.emapstar.repos.visit_observations.WaveformRepository;
import uk.ac.ucl.rits.inform.informdb.movement.LocationVisit;
import uk.ac.ucl.rits.inform.informdb.visit_recordings.VisitObservationType;
import uk.ac.ucl.rits.inform.informdb.visit_recordings.Waveform;
import uk.ac.ucl.rits.inform.interchange.InterchangeValue;
import uk.ac.ucl.rits.inform.interchange.visit_observations.WaveformMessage;

import java.time.Instant;
import java.util.List;
import java.util.Optional;

/**
* Controller for Waveform specific information.
* @author Jeremy Stein
*/
@Component
public class WaveformController {
private final Logger logger = LoggerFactory.getLogger(getClass());

private final WaveformRepository waveformRepository;
private final LocationVisitRepository locationVisitRepository;

WaveformController(
WaveformRepository waveformRepository,
LocationVisitRepository locationVisitRepository
) {
this.waveformRepository = waveformRepository;
this.locationVisitRepository = locationVisitRepository;
}

/**
* Process waveform data message.
* @param msg the interchange message
* @param visitObservationType to associate with this waveform data
* @param storedFrom stored from timestamp
* @throws MessageIgnoredException if message not processed
*/
@Transactional
public void processWaveform(
WaveformMessage msg,
VisitObservationType visitObservationType,
Instant storedFrom) throws MessageIgnoredException {
InterchangeValue<List<Double>> interchangeValue = msg.getNumericValues();
if (!interchangeValue.isSave()) {
throw new MessageIgnoredException("Updating/deleting waveform data is not supported");
}
// All given values are put into one new row. It's the responsibility of whoever is
// generating the message to choose an appropriate size of array.
List<Double> numericValues = interchangeValue.get();
Instant observationTime = msg.getObservationTime();
// Try to find the visit. We don't have enough information to create the visit if it doesn't already exist.
Optional<LocationVisit> inferredLocationVisit =
locationVisitRepository.findLocationVisitByLocationAndTime(observationTime, msg.getMappedLocationString());
// XXX: will have to do some sanity checks here to be sure that the HL7 feed hasn't gone down.
// See issue #36, and here for discussion:
// https://github.com/UCLH-DHCT/emap/blob/jeremy/hf-data/docs/dev/features/waveform_hf_data.md#core-processor-logic-orphan-data-problem
Waveform dataRow = new Waveform(
observationTime,
observationTime,
storedFrom);
inferredLocationVisit.ifPresent(dataRow::setLocationVisitId);
Double[] valuesAsArray = numericValues.toArray(new Double[0]);
dataRow.setSamplingRate(msg.getSamplingRate());
dataRow.setSourceLocation(msg.getSourceLocationString());
dataRow.setVisitObservationTypeId(visitObservationType);
dataRow.setUnit(msg.getUnit());
dataRow.setValuesArray(valuesAsArray);
waveformRepository.save(dataRow);
}

/**
* Delete waveform data before the cutoff date.
* @param olderThanCutoff cutoff date
* @return number of rows deleted
*/
@Transactional
public int deleteOldWaveformData(Instant olderThanCutoff) {
return waveformRepository.deleteAllInBatchByObservationDatetimeBefore(olderThanCutoff);
}

/**
* @return Return observation datetime of most recent waveform data.
*/
public Instant mostRecentObservationDatatime() {
return waveformRepository.mostRecentObservationDatatime();
}
}
Loading

0 comments on commit d7f74bc

Please sign in to comment.