Skip to content

Commit

Permalink
Merge pull request #244 from medizininformatik-initiative/release-v2.5.0
Browse files Browse the repository at this point in the history
Release v2.5.0
  • Loading branch information
alexanderkiel authored Jan 15, 2025
2 parents c7ba7f5 + 18c401c commit 63093a6
Show file tree
Hide file tree
Showing 23 changed files with 205 additions and 120 deletions.
4 changes: 2 additions & 2 deletions .github/integration-test/basic-auth/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
data-store:
image: "samply/blaze:0.30"
image: "samply/blaze:0.31"
healthcheck:
test: ["CMD-SHELL", "curl --fail -s http://localhost:8080/health"]
interval: "5s"
Expand Down Expand Up @@ -37,7 +37,7 @@ services:
depends_on:
- data-store
proxy:
image: "nginx:1.27.2"
image: "nginx:1.27.3"
volumes:
- "./nginx.conf:/etc/nginx/nginx.conf"
- "./proxy.htpasswd:/etc/auth/.htpasswd"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
data-store:
image: "samply/blaze:0.30"
image: "samply/blaze:0.31"
healthcheck:
test: ["CMD-SHELL", "curl --fail -s http://localhost:8080/health"]
interval: "5s"
Expand Down
2 changes: 1 addition & 1 deletion .github/integration-test/no-auth/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
data-store:
image: "samply/blaze:0.30"
image: "samply/blaze:0.31"
healthcheck:
test: ["CMD-SHELL", "curl --fail -s http://localhost:8080/health"]
interval: "5s"
Expand Down
4 changes: 2 additions & 2 deletions .github/integration-test/oauth/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ services:
volumes:
- "./realm-test.json:/opt/keycloak/data/import/realm-test.json"
proxy:
image: "nginx:1.27.2"
image: "nginx:1.27.3"
healthcheck:
test: ["CMD-SHELL", "curl --fail -s http://localhost:8080"]
interval: "5s"
Expand All @@ -72,7 +72,7 @@ services:
keycloak:
condition: service_healthy
data-store:
image: "samply/blaze:0.30"
image: "samply/blaze:0.31"
healthcheck:
test: ["CMD-SHELL", "curl --fail -s http://localhost:8080/health"]
interval: "5s"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
run: mvn -B verify

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## v2.5.0

### Enhancements

* Add Query Tracing to Logging ([#240](https://github.com/medizininformatik-initiative/flare/issues/240))

The full changelog can be found [here](https://github.com/medizininformatik-initiative/flare/milestone/16?closed=1).

## v2.4.1

### Enhancements
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM eclipse-temurin:21.0.5_11-jre
FROM eclipse-temurin:21.0.5_11-jre-jammy@sha256:5f8358c9d5615c18e95728e8b8528bda7ff40a7a5da2ac9a35b7a01f5d9b231a

RUN apt-get update && apt-get upgrade -y && \
apt-get purge wget libbinutils libctf0 libctf-nobfd0 libncurses6 -y && \
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
flare:
image: ghcr.io/medizininformatik-initiative/flare:2.4.1
image: ghcr.io/medizininformatik-initiative/flare:2.5.0
ports:
- 8080:8080
volumes:
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.0</version>
<version>3.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<groupId>de.medizininformatik-initiative</groupId>
<artifactId>flare</artifactId>
<version>2.4.1</version>
<version>2.5.0</version>

<name>Flare</name>
<description>Flare</description>
Expand Down Expand Up @@ -61,7 +61,7 @@
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>9.7.3</version>
<version>9.8.4</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package de.medizininformatikinitiative.flare.rest;

import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class IdGenerator {

public UUID generateRandom() {
return UUID.randomUUID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import java.util.Objects;
import java.util.Set;

import static java.util.Objects.requireNonNull;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
Expand All @@ -33,14 +33,15 @@ public class QueryController {
private static final Logger logger = LoggerFactory.getLogger(QueryController.class);

private final StructuredQueryService queryService;
private final IdGenerator queryIdGenerator;

public QueryController(StructuredQueryService queryService) {
this.queryService = Objects.requireNonNull(queryService);
public QueryController(StructuredQueryService queryService, IdGenerator queryIdGenerator) {
this.queryService = requireNonNull(queryService);
this.queryIdGenerator = requireNonNull(queryIdGenerator);
}

@Bean
public RouterFunction<ServerResponse> queryRouter(@Value("${flare.cohort.enabled}") boolean cohortEnabled) {

var route = route(POST("query/execute").and(accept(MEDIA_TYPE_SQ)), this::execute)
.andRoute(POST("query/translate").and(accept(MEDIA_TYPE_SQ)), this::translate);

Expand All @@ -53,40 +54,42 @@ public RouterFunction<ServerResponse> queryRouter(@Value("${flare.cohort.enabled

public Mono<ServerResponse> execute(ServerRequest request) {
var startNanoTime = System.nanoTime();
logger.debug("Execute feasibility query");
var queryId = queryIdGenerator.generateRandom();
logger.debug("Execute feasibility query {}", queryId);
return request.bodyToMono(StructuredQuery.class)
.flatMap(queryService::execute).map(Set::size)
.flatMap(query -> queryService.execute(queryId, query)).map(Set::size)
.flatMap(count -> {
logger.debug("Finished feasibility query returning cohort size {} in {} seconds.", count,
logger.debug("Finished feasibility query {} returning cohort size {} in {} seconds.", queryId, count,
"%.1f".formatted(Util.durationSecondsSince(startNanoTime)));
return ok().bodyValue(count);
})
.onErrorResume(MappingException.class, e -> {
logger.warn("Mapping error: {}", e.getMessage());
logger.warn("Mapping error in feasibility query {}: {}", queryId, e.getMessage());
return badRequest().bodyValue(new Error(e.getMessage()));
})
.onErrorResume(WebClientRequestException.class, e -> {
logger.error("Service not available because of downstream web client errors: {}", e.getMessage());
logger.error("Service not available in feasibility query {} because of downstream web client errors: {}", queryId, e.getMessage());
return status(503).bodyValue(new Error(e.getMessage()));
});
}

public Mono<ServerResponse> executeCohort(ServerRequest request) {
var startNanoTime = System.nanoTime();
logger.debug("Execute cohort query");
var queryId = queryIdGenerator.generateRandom();
logger.debug("Execute cohort query {}", queryId);
return request.bodyToMono(StructuredQuery.class)
.flatMap(queryService::execute)
.flatMap(query -> queryService.execute(queryId, query))
.flatMap(population -> {
logger.debug("Finished cohort query returning cohort of {} patient IDs in {} seconds.", population.size(),
logger.debug("Finished cohort query {} returning cohort of {} patient IDs in {} seconds.", queryId, population.size(),
"%.1f".formatted(Util.durationSecondsSince(startNanoTime)));
return ok().bodyValue(population);
})
.onErrorResume(MappingException.class, e -> {
logger.warn("Mapping error: {}", e.getMessage());
logger.warn("Mapping error in cohort query {}: {}", queryId, e.getMessage());
return badRequest().bodyValue(new Error(e.getMessage()));
})
.onErrorResume(WebClientRequestException.class, e -> {
logger.error("Service not available because of downstream web client errors: {}", e.getMessage());
logger.error("Service not available in cohort query {} because of downstream web client errors: {}", queryId, e.getMessage());
return status(503).bodyValue(new Error(e.getMessage()));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Clock;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;

import static de.medizininformatikinitiative.flare.model.fhir.QueryParams.stringValue;
Expand Down Expand Up @@ -48,9 +49,9 @@ public void init() {
}

@Override
public Mono<Population> execute(Query query, boolean ignoreCache) {
public Mono<Population> execute(UUID id, Query query, boolean ignoreCache) {
var startNanoTime = System.nanoTime();
logger.debug("Execute query: {}", query);
logger.debug("Execute query as part of query {}: {}", id, query);
return client.post()
.uri("/{type}/_search", query.type())
.contentType(APPLICATION_FORM_URLENCODED)
Expand All @@ -66,9 +67,9 @@ public Mono<Population> execute(Query query, boolean ignoreCache) {
.flatMap(bundle -> Flux.fromStream(bundle.entry().stream().flatMap(e -> e.resource().patientId().stream())))
.collect(Collectors.toSet())
.map(patientIds -> Population.copyOf(patientIds).withCreated(clock.instant()))
.doOnNext(p -> logger.debug("Finished query `{}` returning {} patients in {} seconds.", query, p.size(),
.doOnNext(p -> logger.debug("Finished query `{}` as part of query {} returning {} patients in {} seconds.", query, id, p.size(),
"%.1f".formatted(Util.durationSecondsSince(startNanoTime))))
.doOnError(e -> logger.error("Error while executing query `{}`: {}", query, e.getMessage()));
.doOnError(e -> logger.error("Error while executing query `{}` as part of query {}: {}", query, id, e.getMessage()));
}

private static boolean shouldRetry(HttpStatusCode code) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -54,16 +55,16 @@ public void init() throws RocksDBException {
}

@Override
public Mono<Population> execute(Query query, boolean ignoreCache) {
public Mono<Population> execute(UUID id, Query query, boolean ignoreCache) {
if (ignoreCache) {
return executeQuery(query, true);
return executeQuery(id, query, true);
} else {
return internalGet(query).switchIfEmpty(Mono.defer(() -> executeQuery(query, false)));
return internalGet(query).switchIfEmpty(Mono.defer(() -> executeQuery(id, query, false)));
}
}

private Mono<Population> executeQuery(Query query, boolean ignoreCache) {
return fhirQueryService.execute(query, ignoreCache).doOnNext(population -> put(query, population));
private Mono<Population> executeQuery(UUID id, Query query, boolean ignoreCache) {
return fhirQueryService.execute(id, query, ignoreCache).doOnNext(population -> put(query, population));
}

private Mono<Population> internalGet(Query query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,28 @@
import de.medizininformatikinitiative.flare.model.fhir.Query;
import reactor.core.publisher.Mono;

import java.util.UUID;

public interface FhirQueryService {

Mono<Population> execute(Query query, boolean ignoreCache);
/**
* Executes {@code query}.
*
* @param id the ID of the query used for tracing purposes
* @param query the query to execute
* @param ignoreCache if the cache shouldn't be used
* @return the population result
*/
Mono<Population> execute(UUID id, Query query, boolean ignoreCache);

default Mono<Population> execute(Query query) {
return execute(query, false);
/**
* Executes {@code query} using a potentially cache.
*
* @param id the ID of the query used for tracing purposes
* @param query the query to execute
* @return the population result
*/
default Mono<Population> execute(UUID id, Query query) {
return execute(id, query, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

Expand All @@ -21,12 +23,12 @@ public class MemCachingFhirQueryService implements FhirQueryService {

private static final Logger logger = LoggerFactory.getLogger(MemCachingFhirQueryService.class);

private static final Weigher<Query, Population> WEIGHER = (key, value) ->
key.toString().length() + value.memSize();
private static final Weigher<QueryWrapper, Population> WEIGHER = (key, value) ->
key.query.toString().length() + value.memSize();

private final FhirQueryService fhirQueryService;
private final Config config;
private AsyncLoadingCache<Query, Population> cache;
private AsyncLoadingCache<QueryWrapper, Population> cache;

public MemCachingFhirQueryService(FhirQueryService fhirQueryService, Config config) {
this.fhirQueryService = requireNonNull(fhirQueryService);
Expand All @@ -46,9 +48,9 @@ public void init() {
}

@Override
public Mono<Population> execute(Query query, boolean ignoreCache) {
logger.trace("Try loading population for query `{}` from memory.", query);
return Mono.fromFuture(cache.get(query));
public Mono<Population> execute(UUID id, Query query, boolean ignoreCache) {
logger.trace("Try loading population for query `{}` part of query {} from memory.", query, id);
return Mono.fromFuture(cache.get(new QueryWrapper(id, query)));
}

public CacheStats stats() {
Expand All @@ -67,21 +69,43 @@ public CacheStats stats() {
public record Config(long sizeInMebibytes, Duration expire, Duration refresh) {
}

private class CacheLoader implements AsyncCacheLoader<Query, Population> {
public record CacheStats(long estimatedEntryCount, long maxMemoryMiB, long usedMemoryMiB, long hitCount,
long missCount, long evictionCount, long loadSuccessCount, long loadFailureCount,
long totalLoadTimeNanos) {
}

private record QueryWrapper(UUID id, Query query) {

private QueryWrapper {
requireNonNull(id);
requireNonNull(query);
}

@Override
public CompletableFuture<Population> asyncLoad(Query query, Executor executor) {
return fhirQueryService.execute(query).toFuture();
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
QueryWrapper that = (QueryWrapper) o;
return Objects.equals(query, that.query);
}

@Override
public CompletableFuture<Population> asyncReload(Query query, Population oldValue, Executor executor) {
return fhirQueryService.execute(query, true).toFuture();
public int hashCode() {
return Objects.hashCode(query);
}
}

public record CacheStats(long estimatedEntryCount, long maxMemoryMiB, long usedMemoryMiB, long hitCount,
long missCount, long evictionCount, long loadSuccessCount, long loadFailureCount,
long totalLoadTimeNanos) {
private class CacheLoader implements AsyncCacheLoader<QueryWrapper, Population> {

@Override
public CompletableFuture<Population> asyncLoad(QueryWrapper query, Executor executor) {
logger.trace("Cache miss for query `{}` part of query {}.", query.query, query.id);
return fhirQueryService.execute(query.id, query.query).toFuture();
}

@Override
public CompletableFuture<Population> asyncReload(QueryWrapper query, Population oldValue, Executor executor) {
logger.trace("Refresh query `{}`.", query.query);
return fhirQueryService.execute(query.id, query.query, true).toFuture();
}
}
}
Loading

0 comments on commit 63093a6

Please sign in to comment.