Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validation and setup script fixes #68

Merged
merged 10 commits into from
Jan 21, 2025
Merged
2 changes: 2 additions & 0 deletions core/core-config-envs.EXAMPLE
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ SPRING_RABBITMQ_PORT=5672
SPRING_RABBITMQ_USERNAME=emap
SPRING_RABBITMQ_PASSWORD=yourstrongpassword
LOGGING_LEVEL_UK_AC_UCL_RITS_INFORM=INFO
CORE_WAVEFORM_RETENTION_HOURS=24
CORE_WAVEFORM_IS_NON_CURRENT_TEST_DATA=true
TZ=Europe/London
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void processMessage(final WaveformMessage msg, final Instant storedFrom)
/**
* To keep the overall database size down to something reasonable, periodically delete old data.
*/
@Scheduled(fixedRate = 60 * 1000)
@Scheduled(fixedRate = 60 * 60 * 1000)
public void deleteOldWaveformData() {
logger.info("deleteOldWaveformData: Checking for old waveform data for deletion");
Instant baselineDatetime;
Expand Down
24 changes: 19 additions & 5 deletions docs/dev/features/waveform_hf_data.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,32 @@ Core:

Waveform Generator:
- `waveform.hl7.send_host`, `waveform.hl7.send_port` - the host and port to send the generated data to
- `test.synthetic.num_patients` - number of different patients (locations) to generate data for
- `test.synthetic.start_datetime` observation date to start generating data from or null to simulate live data
- `waveform.synthetic.num_patients` - number of different patients (locations) to generate data for
- `waveform.synthetic.start_datetime` observation date to start generating data from or null to simulate live data
(see comment in code for more)
- `test.synthetic.end_datetime` if not null, exit the generator when observation date hits this date
- `test.synthetic.warp_factor` How many times real time to generate data at. Live mode implies warp factor = 1.
- `waveform.synthetic.end_datetime` if not null, exit the generator when observation date hits this date
- `waveform.synthetic.warp_factor` How many times real time to generate data at. Live mode implies warp factor = 1.

Waveform Reader:
- `waveform.hl7.listen_port` port inside the container to listen on for waveform HL7 generator
- `waveform.hl7.source_address_allow_list` comma-separated list of source IP addresses to accept connections from.
If the listen contains the value "ALL", then all source IP addresses are allowed.
Enter your Smartlinx server IP address(es) here.
For testing only: if the list contains the value "ALL", then all source IP addresses are allowed. This is
the only form of authentication so don't use this setting in production.
Not currently supported: hostnames or IP ranges.
- `waveform.hl7.test_dump_file` If specified, read messages from this file and then exit - intended for validation

## Container housekeeping (setup script)
The waveform processing feature is enabled or disabled in the global configuration file. I've added
a "features" section for this, and taken the opportunity to also add the `fakeuds` container to make that easier
to turn on and off.

Because the waveform feature flag will include/exclude the relevant docker compose files from
the docker commands it generates, you can continue to
run a simple `emap docker up -d` to bring up a production instance of emap without the waveform containers coming up.
Feature flag implementation is different here than for, say, SDEs because that's a feature within the core
processor, whereas disabling waveform data requires an entire container to be disabled.

## Design details

### Emap DB design
Expand Down
31 changes: 26 additions & 5 deletions emap-setup/emap_runner/docker/docker_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,31 @@ class DockerRunnerException(EMAPRunnerException):
"""Exception for something breaking within docker"""


def first_not_none(*args):
for a in args:
if a is not None:
return a
return None

class DockerRunner:
"""Orchestration for multiple services using docker"""

def __init__(self, project_dir: Path, config: "GlobalConfiguration"):
def __init__(self,
project_dir: Path,
config: "GlobalConfiguration",
enable_waveform=None,
use_fake_waveform=None,
use_fake_uds=None,
):
"""Initialise a docker runner with docker-compose.yml files relative
to the main directory given a specific configuration"""

self.project_dir = project_dir
self.emap_dir = project_dir / "emap"
self.config = config
self.enable_waveform = first_not_none(enable_waveform, self.config.get("features", "waveform"))
self.use_fake_waveform = first_not_none(use_fake_waveform, self.config.get("features", "waveform_generator"))
self.use_fake_uds = first_not_none(use_fake_uds, self.config.get("features", "fake_uds"))

def run(
self,
Expand Down Expand Up @@ -82,12 +97,18 @@ def docker_compose_paths(self) -> List[Path]:

paths = [
self.core_docker_compose_path,
# need to make optional
Path(self.emap_dir, "core", "docker-compose.fakeuds.yml"),
Path(self.emap_dir, "hl7-reader", "docker-compose.yml"),
Path(self.emap_dir, "waveform-reader", "docker-compose.yml"),
Path(self.emap_dir, "waveform-generator", "docker-compose.yml"),
]
# Fakes are for testing only. Waveform is a real feature that is currently off
# by default, except for the waveform generator which is for testing waveform
# data only.
if self.use_fake_uds:
paths.append(Path(self.emap_dir, "core", "docker-compose.fakeuds.yml"))
if self.enable_waveform:
paths.append(Path(self.emap_dir, "waveform-reader", "docker-compose.yml"))
if self.use_fake_waveform:
paths.append(Path(self.emap_dir, "waveform-generator", "docker-compose.yml"))

# allow for hoover and to be optional compose path
if "hoover" in self.config["repositories"]:
paths.append(Path(self.project_dir, "hoover", "docker-compose.yml"))
Expand Down
8 changes: 7 additions & 1 deletion emap-setup/emap_runner/global_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re

import yaml

from typing import Optional
Expand All @@ -19,6 +21,7 @@ class GlobalConfiguration(dict):
"global",
"glowroot",
"common",
"features",
)

def __init__(self, filepath: Path):
Expand Down Expand Up @@ -122,10 +125,13 @@ def _substitute_vars(self, env_file: "EnvironmentFile") -> None:

for i, line in enumerate(env_file.lines):

if line.startswith("#"):
if re.match(r"\s*#", line):
env_file.set_comment_line_at(line, idx=i)
continue

if re.match(r"\s*$", line):
continue

key, value = line.split("=") # e.g. IDS_SCHEMA=schemaname

try:
Expand Down
9 changes: 8 additions & 1 deletion emap-setup/emap_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,14 @@ def validation(self) -> ValidationRunner:
raise ValueError("hoover requested but is missing from repositories in global config")

runner = ValidationRunner(
docker_runner=DockerRunner(project_dir=Path.cwd(), config=self.global_config),
docker_runner=DockerRunner(project_dir=Path.cwd(),
config=self.global_config,
# must enable the compose file if we intend to ask for waveform container
enable_waveform=self.args.use_waveform,
# but never use fake waveform or fake UDS in validation
use_fake_waveform=False,
use_fake_uds=False,
),
time_window=TimeWindow(
start_date=self.args.start_date, end_date=self.args.end_date
),
Expand Down
13 changes: 7 additions & 6 deletions emap-setup/emap_runner/validation/validation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,10 @@ def _run_emap(self) -> None:
the time delay required for RabbitMQ to be there. Should this prove not to be sufficient, then the running order
may need to change, i.e. the data sources to be started first (as background services though!).
"""
_ = Popen(
"sleep 180 && "
+ self.docker.base_docker_compose_command
+ "up -d core",
shell=True,
)
# not sure this sleep is necessary given the healthchecks we have now
time.sleep(180)
self.docker.run("up -d core")
self.docker.run("ps")

if self.use_hl7_reader:
self.docker.run(
Expand All @@ -162,6 +160,9 @@ def _run_emap(self) -> None:
# the waveform *reader* is brought up, which will be reading from an HL7 dump file
# and therefore doesn't require the waveform-generator.
if self.use_waveform:
# While de-orphaning remains unimplemented, must wait for ADT data to
# be present before adding waveform-data. (See issue #36)
self._wait_for_queue_to_empty()
self.docker.run(
"up --exit-code-from waveform-reader waveform-reader",
output_filename=f"{self.log_file_prefix}_waveform-reader.txt",
Expand Down
6 changes: 6 additions & 0 deletions emap-setup/global-configuration-EXAMPLE.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ repositories:
# hoover:
# branch: develop

# Feature flags for not quite ready features, or for turning fake services on and off
features:
waveform: false
waveform_generator: false
fake_uds: false

# Configuration data for the rabbitmq instance used by Spring in the pipeline
rabbitmq:
SPRING_RABBITMQ_HOST: rabbitmq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public IdsOperations(
idsFactory = idsConfiguration.getSessionFactory();
idsEmptyOnInit = getIdsIsEmpty();
logger.info("IdsOperations() idsEmptyOnInit = {}", idsEmptyOnInit);
defaultStartUnid = getFirstMessageUnidFromDate(idsConfiguration.getStartDateTime(), 1);
endUnid = getFirstMessageUnidFromDate(idsConfiguration.getEndDatetime(), defaultStartUnid);
defaultStartUnid = getFirstMessageUnidFromDate(idsConfiguration.getStartDateTime());
endUnid = getFirstMessageUnidFromDate(idsConfiguration.getEndDatetime());

// Progress is stored as the unid (the date info is purely for human convenience),
logger.info(
Expand Down Expand Up @@ -156,22 +156,23 @@ private boolean getIdsIsEmpty() {
* timestamp.
*
* @param fromDateTime the timestamp to start from, or null for no boundary
* @param fromUnid starting unid for filtering
* @return the unid of the first message to be persisted at or after that time,
* or null if there are no such messages or no bound was requested (fromDateTime == null)
*/
private Integer getFirstMessageUnidFromDate(Instant fromDateTime, Integer fromUnid) {
private Integer getFirstMessageUnidFromDate(Instant fromDateTime) {
if (fromDateTime == null) {
// bypass this slow query if no bound was requested
return null;
}
logger.info("Querying IDS for first unid after {}, this can take a while", fromDateTime);
try (Session idsSession = idsFactory.openSession()) {
idsSession.setDefaultReadOnly(true);
// Including `unid` anywhere in this query seems to make it do an index scan on the unid (PK) column
// and thus make it very slow.
List<IdsMaster> msg = idsSession.createQuery(
"select i from IdsMaster i where i.unid >= :fromUnid and i.persistdatetime >= :fromDatetime order by i.unid",
"select i from IdsMaster i where i.persistdatetime >= :fromDatetime order by i.persistdatetime",
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
IdsMaster.class)
.setParameter("fromDatetime", fromDateTime)
.setParameter("fromUnid", fromUnid)
.setMaxResults(1)
.getResultList();
if (msg.isEmpty()) {
Expand Down
2 changes: 1 addition & 1 deletion waveform-generator/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ services:
https_proxy: ${https_proxy}
SERVICE_SRC_DIR: waveform-generator
env_file:
- ../../config/waveform-reader-config-envs
- ../../config/waveform-generator-config-envs
logging:
driver: "json-file"
restart: "no"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class Hl7Generator {
private final Logger logger = LoggerFactory.getLogger(Hl7Generator.class);

@Value("${test.synthetic.num_patients:30}")
@Value("${waveform.synthetic.num_patients:30}")
private int numPatients;


Expand All @@ -44,7 +44,7 @@ public class Hl7Generator {
* In catch-up mode, warp factor can be used to control how quickly data
* will be fed in (measure in multiples of real time).
*/
@Value("${test.synthetic.start_datetime:#{null}}")
@Value("${waveform.synthetic.start_datetime:#{null}}")
private Instant startDatetime;
/**
* Where we are up to in generating data (observation time).
Expand All @@ -64,12 +64,12 @@ private Instant getExpectedProgressDatetime() {

// system time (not observation time) when we started running
private Long monotonicStartTimeNanos = null;
@Value("${test.synthetic.end_datetime:#{null}}")
@Value("${waveform.synthetic.end_datetime:#{null}}")
private Instant endDatetime;
@Value("${test.synthetic.warp_factor:1}")
@Value("${waveform.synthetic.warp_factor:1}")
private int warpFactor;

@Value("${test.synthetic.tcp_client_pool_size:1}")
@Value("${waveform.synthetic.tcp_client_pool_size:1}")
private int tcpClientPoolSize;

/**
Expand Down
7 changes: 7 additions & 0 deletions waveform-generator/waveform-generator-config-envs.EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
WAVEFORM_SYNTHETIC_NUM_PATIENTS=30
WAVEFORM_SYNTHETIC_WARP_FACTOR=6
WAVEFORM_SYNTHETIC_START_DATETIME="2024-01-02T12:00:00Z"
WAVEFORM_SYNTHETIC_END_DATETIME="2024-01-03T12:00:00Z"
WAVEFORM_SYNTHETIC_TCP_CLIENT_POOL_SIZE=1
WAVEFORM_HL7_SEND_HOST="waveform-reader"
WAVEFORM_HL7_SEND_PORT=7777
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,21 @@ List<String> readFromFile(File hl7DumpFile) throws IOException {

@Scheduled(fixedRate = Long.MAX_VALUE) // do once only
void readOnceAndQueueScheduled() throws Hl7ParseException, WaveformCollator.CollationException, IOException {
readOnceAndQueue(hl7DumpFile);
}

void readOnceAndQueue(File hl7DumpFile) throws Hl7ParseException, WaveformCollator.CollationException, IOException {
if (hl7DumpFile == null) {
logger.info("No test HL7 file specified");
return;
}
readOnceAndQueue(hl7DumpFile);
// Not sure how to wait for Publisher to finish, so just sleep for a bit
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
logger.warn("Thread was interrupted", e);
}
System.exit(0);
}

void readOnceAndQueue(File hl7DumpFile) throws Hl7ParseException, WaveformCollator.CollationException, IOException {
List<String> messages = readFromFile(hl7DumpFile);
logger.info("Read {} HL7 messages from test dump file", messages.size());
for (int mi = 0; mi < messages.size(); mi++) {
Expand Down
11 changes: 11 additions & 0 deletions waveform-reader/waveform-reader-config-envs.EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
UDS_JDBC_URL=jdbc:postgresql://fakeuds:5432/fakeuds
UDS_SCHEMA=uds_schema
UDS_USERNAME=inform_user
UDS_PASSWORD=inform
SPRING_RABBITMQ_HOST=rabbitmq
SPRING_RABBITMQ_PORT=5672
SPRING_RABBITMQ_USERNAME=my_name
SPRING_RABBITMQ_PASSWORD=my_pw

WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST=127.0.0.1
WAVEFORM_HL7_TEST_DUMP_FILE=""
Loading