Skip to content

Commit

Permalink
#45 - Enable CQL in direct broker
Browse files Browse the repository at this point in the history
- append "flare" to method name "DirectSpringConfig.directWebClientFlare" to clarify that it is only associated with the flare client
- replace custom "internal/json" media type from mock server response in DirectBrokerClientFlareIT and replace it with standard application/json header
- replace HashMap with EnumMap in DirectBrokerClient.DirectQuery
- rename SITE_1_NAME to SITE_NAME_LOCAL to be consistent with SITE_ID_LOCAL (and make both constants final)
- make the fhirClient a local var in the setup method in DirectBrokerClientCqlIT
- don't return anything from DirectBrokerClientCqlIT.createDummyPatient
- catch and wrap BaseServerResponseException when trying to read measure report from fhir server
- catch BaseServerResponseException instead of just FhirClientConnectionException when creating library and measure
- throw SiteNotFoundException instead of returning an empty string when the site name for an invalid id is requested
- add constructor for QueryDefinitionNotFoundException without QueryMediaType parameter to be thrown in DSFQueryManager, and change the queryMediaType Parameter from String to the respective enum
- move Measure.json and Library.json (as well as the 2 cql "query" files) to the direct broker package and use getResourceAsStream
- remove backendquery mapping from DirectBrokerClient and move the backendquery id directly to DirectBrokerClient.DirectQuery
- extract fhir connection from DirectBrokerClientCql to separate class FhirConnector
  • Loading branch information
michael-82 committed Jan 27, 2023
1 parent ccdb060 commit 89f0742
Show file tree
Hide file tree
Showing 17 changed files with 373 additions and 203 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package de.numcodex.feasibility_gui_backend.query.broker;

import de.numcodex.feasibility_gui_backend.query.QueryMediaType;

/**
* Indicates that a query does not contain the necessary query definition.
*/
public class QueryDefinitionNotFoundException extends Exception {

public QueryDefinitionNotFoundException(String queryId, String queryMediaType) {
public QueryDefinitionNotFoundException(String queryId) {
super("Query with ID '" + queryId
+ "' does not contain any query definitions of a known type." );
}

public QueryDefinitionNotFoundException(String queryId, QueryMediaType queryMediaType) {
super("Query with ID '" + queryId
+ "' does not contain a query definition for the mandatory type: " + queryMediaType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

import de.numcodex.feasibility_gui_backend.query.QueryMediaType;
import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient;
import de.numcodex.feasibility_gui_backend.query.broker.QueryDefinitionNotFoundException;
import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException;
import de.numcodex.feasibility_gui_backend.query.broker.SiteNotFoundException;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatus;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusListener;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatusUpdate;
import de.numcodex.feasibility_gui_backend.query.persistence.BrokerClientType;
import java.util.Collections;
import java.util.HashMap;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -23,13 +24,12 @@

public abstract class DirectBrokerClient implements BrokerClient {

static final String SITE_ID_LOCAL = "1";
private static final String SITE_ID_LOCAL = "1";

static final String SITE_1_NAME = "Local Server";
private static final String SITE_NAME_LOCAL = "Local Server";

protected List<QueryStatusListener> listeners;
protected Map<String, DirectQuery> brokerQueries;
protected Map<String, Long> brokerToBackendQueryIdMapping;

@Value("${app.broker.direct.obfuscateResultCount:false}")
protected boolean obfuscateResultCount;
Expand All @@ -46,10 +46,9 @@ public void addQueryStatusListener(QueryStatusListener queryStatusListener) {

@Override
public String createQuery(Long backendQueryId) {
var brokerQuery = DirectQuery.create();
var brokerQuery = DirectQuery.create(backendQueryId);
var brokerQueryId = brokerQuery.getQueryId();
brokerQueries.put(brokerQueryId, brokerQuery);
brokerToBackendQueryIdMapping.put(brokerQueryId, backendQueryId);

return brokerQueryId;
}
Expand All @@ -76,8 +75,12 @@ public List<String> getResultSiteIds(String brokerQueryId) throws QueryNotFoundE
return findQuery(brokerQueryId).hasResult() ? Collections.singletonList(SITE_ID_LOCAL) : Collections.emptyList();
}

public String getSiteName(String siteId) {
return SITE_ID_LOCAL.equals(siteId) ? SITE_1_NAME : "";
public String getSiteName(String siteId) throws SiteNotFoundException {
if (SITE_ID_LOCAL.equals(siteId)) {
return SITE_NAME_LOCAL;
} else {
throw new SiteNotFoundException("No site with id " + siteId + " found." );
}
}


Expand Down Expand Up @@ -109,26 +112,16 @@ protected int obfuscate(int resultCount) {
}
}

/**
* Updates a query status in all registered listeners.
* @param queryId the id of the query to update
* @param queryStatus the {@link QueryStatus} to publish to the listeners
*/
protected void updateQueryStatus(String queryId, QueryStatus queryStatus) {
var statusUpdate = new QueryStatusUpdate(this, queryId, SITE_ID_LOCAL, queryStatus);
var associatedBackendQueryId = brokerToBackendQueryIdMapping.get(queryId);
listeners.forEach(
l -> l.onClientUpdate(associatedBackendQueryId, statusUpdate)
);
}

/**
* Updates a query status in all registered listeners.
* @param query the query to update
* @param queryStatus the {@link QueryStatus} to publish to the listeners
*/
protected void updateQueryStatus(DirectQuery query, QueryStatus queryStatus) {
updateQueryStatus(query.getQueryId(), queryStatus);
var statusUpdate = new QueryStatusUpdate(this, query.getQueryId(), SITE_ID_LOCAL, queryStatus);
listeners.forEach(
l -> l.onClientUpdate(query.getBackendQueryId(), statusUpdate)
);
}

/**
Expand All @@ -138,22 +131,26 @@ public static class DirectQuery {

@Getter
private final String queryId;
@Getter
private final Long backendQueryId;
private final Map<QueryMediaType, String> queryDefinitions;
@Setter
private Integer result;

private DirectQuery(String queryId) {
private DirectQuery(String queryId, Long backendQueryId) {
this.queryId = queryId;
queryDefinitions = new HashMap<>();
this.backendQueryId = backendQueryId;
queryDefinitions = new EnumMap<>(QueryMediaType.class);
}

/**
* Creates a new {@link DirectQuery} with a random UUID as a query ID.
*
* @param backendQueryId The query id in the backend
* @return The created query.
*/
public static DirectQuery create() {
return new DirectQuery(UUID.randomUUID().toString());
public static DirectQuery create(Long backendQueryId) {
return new DirectQuery(UUID.randomUUID().toString(), backendQueryId);
}

/**
Expand All @@ -176,8 +173,11 @@ public void addQueryDefinition(QueryMediaType queryMediaType, String content) {
* @return The query in its string representation or null if there is no query definition
* associated with the specified mime type.
*/
public String getQueryDefinition(QueryMediaType queryMediaType) {
return queryDefinitions.get(queryMediaType);
public String getQueryDefinition(QueryMediaType queryMediaType)
throws QueryDefinitionNotFoundException {
return Optional.ofNullable(queryDefinitions.get(queryMediaType))
.orElseThrow(() -> new QueryDefinitionNotFoundException(queryId, queryMediaType)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,184 +1,61 @@
package de.numcodex.feasibility_gui_backend.query.broker.direct;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.client.api.IGenericClient;
import ca.uhn.fhir.rest.client.exceptions.FhirClientConnectionException;
import ca.uhn.fhir.rest.param.DateParam;
import ca.uhn.fhir.rest.param.StringParam;
import de.numcodex.feasibility_gui_backend.query.broker.BrokerClient;
import de.numcodex.feasibility_gui_backend.query.broker.QueryDefinitionNotFoundException;
import de.numcodex.feasibility_gui_backend.query.collect.QueryStatus;
import de.numcodex.feasibility_gui_backend.query.broker.QueryNotFoundException;
import java.io.InputStream;
import lombok.extern.slf4j.Slf4j;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Library;
import org.hl7.fhir.r4.model.Measure;
import org.hl7.fhir.r4.model.MeasureReport;
import org.hl7.fhir.r4.model.Parameters;

import java.io.IOException;
import java.util.*;

import static de.numcodex.feasibility_gui_backend.query.QueryMediaType.CQL;
import static de.numcodex.feasibility_gui_backend.query.collect.QueryStatus.COMPLETED;
import static de.numcodex.feasibility_gui_backend.query.collect.QueryStatus.FAILED;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hl7.fhir.r4.model.Bundle.BundleType.TRANSACTION;
import static org.hl7.fhir.r4.model.Bundle.HTTPVerb.POST;

/**
* A {@link BrokerClient} to be used to directly communicate with a CQL-capable FHIR Server instance
* without the need for using any middleware (Aktin or DSF).
*/
@Slf4j
public class DirectBrokerClientCql extends DirectBrokerClient {
private final FhirContext fhirContext;
private final IGenericClient fhirClient;
private final FhirConnector fhirConnector;

/**
* Creates a new {@link DirectBrokerClientCql} instance that uses the given web client to
* communicate with a CQL capable FHIR server instance.
*
* @param fhirContext A FHIR context.
* @param fhirClient A FHIR client, configured with the correct url
* @param fhirConnector A FHIR connector.
*/
public DirectBrokerClientCql(FhirContext fhirContext, IGenericClient fhirClient) {
this.fhirContext = Objects.requireNonNull(fhirContext);
this.fhirClient = Objects.requireNonNull(fhirClient);
public DirectBrokerClientCql(FhirConnector fhirConnector) {
this.fhirConnector = Objects.requireNonNull(fhirConnector);
listeners = new ArrayList<>();
brokerQueries = new HashMap<>();
brokerToBackendQueryIdMapping = new HashMap<>();
}

@Override
public void publishQuery(String brokerQueryId)
throws QueryNotFoundException, IOException, QueryDefinitionNotFoundException {
var query = findQuery(brokerQueryId);
var queryContent = Optional.ofNullable(query.getQueryDefinition(CQL))
.orElseThrow(() -> new QueryDefinitionNotFoundException(query.getQueryId(),
CQL.getRepresentation())
);
var queryContent = query.getQueryDefinition(CQL);

updateQueryStatus(query, QueryStatus.EXECUTING);
String measureUri;
var libraryUri = "urn:uuid" + UUID.randomUUID();
var measureUri = "urn:uuid" + UUID.randomUUID();
MeasureReport measureReport;
try {
measureUri = createMeasureAndLibrary(queryContent);
Bundle bundle = fhirConnector.createBundle(queryContent, libraryUri, measureUri);
fhirConnector.transmitBundle(bundle);
measureReport = fhirConnector.evaluateMeasure(measureUri);
} catch (IOException e) {
updateQueryStatus(query, FAILED);
throw new IOException(
"An error occurred while publishing the query with ID: " + query.getQueryId()
+ " while trying to create measure and library", e);
throw (e);
}
var measureReport = evaluateMeasure(measureUri);

var resultCount = measureReport.getGroupFirstRep().getPopulationFirstRep().getCount();
query.setResult(obfuscateResultCount ? obfuscate(resultCount) : resultCount);
updateQueryStatus(query, COMPLETED);
}

/**
* Create FHIR {@link Measure} and {@link Library} resources and transmit them in a bundled transaction.
* @param cql the plaintext cql definition
* @return the randomly generated identifier of the {@link Measure} resource
*/
private String createMeasureAndLibrary(String cql) throws IOException {
var libraryUri = "urn:uuid" + UUID.randomUUID();
var library = appendCql(parseResource(Library.class,
getResourceFileAsString("query/cql/Library.json")).setUrl(libraryUri), cql);
var measureUri = "urn:uuid" + UUID.randomUUID();
var measure = parseResource(Measure.class,
getResourceFileAsString("query/cql/Measure.json"))
.setUrl(measureUri)
.addLibrary(libraryUri);
var bundle = createBundle(library, measure);

try {
fhirClient.transaction().withBundle(bundle).execute();
} catch (FhirClientConnectionException e) {
throw new IOException(e);
}

return measureUri;
}

/**
* Get the {@link MeasureReport} for a previously transmitted {@link Measure}
* @param measureUri the identifier of the {@link Measure}
* @return the retrieved {@link MeasureReport} from the server
*/
private MeasureReport evaluateMeasure(String measureUri) {
return fhirClient.operation()
.onType(Measure.class)
.named("evaluate-measure")
.withSearchParameter(Parameters.class, "measure", new StringParam(measureUri))
.andSearchParameter("periodStart", new DateParam("1900"))
.andSearchParameter("periodEnd", new DateParam("2100"))
.useHttpGet()
.returnResourceType(MeasureReport.class)
.execute();
}

/**
* Read file contents as String
* @param fileName name of the resource file
* @return the String contents of the file
*/
public static String getResourceFileAsString(String fileName) throws IOException {
InputStream is = getResourceFileAsInputStream(fileName);
if (is != null) {
return new String(is.readAllBytes(), UTF_8);
} else {
throw new RuntimeException("File not found in classpath: " + fileName);
}
}

/**
* Read file contents as {@link InputStream}
* @param fileName name of the resource file
* @return an {@link InputStream} of the file
*/
public static InputStream getResourceFileAsInputStream(String fileName) {
ClassLoader classLoader = DirectBrokerClientCql.class.getClassLoader();
return classLoader.getResourceAsStream(fileName);
}

/**
* Parse a String as an {@link IBaseResource} implementation
* @param type the concrete {@link IBaseResource} implementation class to parse to
* @param input the {@link String} to parse
* @return the wanted {@link IBaseResource} implementation object
* @param <T> any implementation of {@link IBaseResource}
*/
private <T extends IBaseResource> T parseResource(Class<T> type, String input) {
var parser = fhirContext.newJsonParser();
return type.cast(parser.parseResource(input));
}

/**
* Add the CQL query to a {@link Library}
* @param library the {@link Library} to add the CQL string to
* @param cql the CQL string to add
* @return the {@link Library} with the added CQL
*/
private Library appendCql(Library library, String cql) {
library.getContentFirstRep().setContentType(CQL.getRepresentation());
library.getContentFirstRep().setData(cql.getBytes(UTF_8));
return library;
}

/**
* Create a {@link Bundle} of a {@link Library} and a {@link Measure}
* @param library the {@link Library} to add to the {@link Bundle}
* @param measure the {@link Measure} to add to the {@link Bundle}
* @return the {@link Bundle}, consisting of the given {@link Library} and {@link Measure}
*/
private static Bundle createBundle(Library library, Measure measure) {
var bundle = new Bundle();
bundle.setType(TRANSACTION);
bundle.addEntry().setResource(library).getRequest().setMethod(POST).setUrl("Library");
bundle.addEntry().setResource(measure).getRequest().setMethod(POST).setUrl("Measure");
return bundle;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,12 @@ public DirectBrokerClientFlare(WebClient webClient) {
this.webClient = Objects.requireNonNull(webClient);
listeners = new ArrayList<>();
brokerQueries = new HashMap<>();
brokerToBackendQueryIdMapping = new HashMap<>();
}

@Override
public void publishQuery(String brokerQueryId) throws QueryNotFoundException, QueryDefinitionNotFoundException, IOException {
var query = findQuery(brokerQueryId);
var structuredQueryContent = Optional.ofNullable(query.getQueryDefinition(STRUCTURED_QUERY))
.orElseThrow(() -> new QueryDefinitionNotFoundException(brokerQueryId,
STRUCTURED_QUERY.getRepresentation()
));
var structuredQueryContent = query.getQueryDefinition(STRUCTURED_QUERY);

try {
webClient.post()
Expand All @@ -54,11 +50,11 @@ public void publishQuery(String brokerQueryId) throws QueryNotFoundException, Qu
.map(Integer::valueOf)
.doOnError(error -> {
log.error(error.getMessage(), error);
updateQueryStatus(brokerQueryId, FAILED);
updateQueryStatus(query, FAILED);
})
.subscribe(val -> {
query.setResult(obfuscateResultCount ? obfuscate(val) : val);
updateQueryStatus(brokerQueryId, COMPLETED);
updateQueryStatus(query, COMPLETED);
});
} catch (Exception e) {
throw new IOException("An error occurred while publishing the query with ID: " + brokerQueryId, e);
Expand Down
Loading

0 comments on commit 89f0742

Please sign in to comment.