Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TransferProcessRunner #270

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions api/src/main/java/care/smith/fts/api/cda/BundleSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
import care.smith.fts.api.TransferProcessStep;
import care.smith.fts.api.TransferProcessStepFactory;
import care.smith.fts.api.TransportBundle;
import reactor.core.publisher.Flux;
import jakarta.validation.constraints.NotNull;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;

public interface BundleSender extends TransferProcessStep {

Mono<Result> send(Flux<TransportBundle> bundles);
Mono<ResponseEntity<Void>> send(@NotNull TransportBundle bundle);

interface Factory<C> extends TransferProcessStepFactory<BundleSender, Config, C> {}

record Config() {}

record Result(int bundleCount) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,4 @@ void deserialization() throws JsonProcessingException {
void testInstantiateConfig() {
assertThat(new BundleSender.Config()).isNotNull();
}

@Test
void testInstantiateResult() {
assertThat(new BundleSender.Result(0)).isNotNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import care.smith.fts.api.*;
import care.smith.fts.api.cda.BundleSender;
import care.smith.fts.api.cda.BundleSender.Result;
import care.smith.fts.api.cda.CohortSelector;
import care.smith.fts.api.cda.DataSelector;
import care.smith.fts.api.cda.Deidentificator;
Expand All @@ -13,6 +12,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -70,22 +70,24 @@ public void execute() {
cohortSelector
.selectCohort(pids)
.doOnError(e -> phase.set(Phase.ERROR))
.flatMap(this::executePatient)
.doOnComplete(() -> phase.set(Phase.COMPLETED))
.onErrorComplete()
.subscribe();
}

private Mono<Result> executePatient(ConsentedPatient patient) {
return dataSelector
.select(patient)
.map(b -> new ConsentedPatientBundle(b, patient))
.flatMap(
patient ->
dataSelector.select(patient).map(b -> new ConsentedPatientBundle(b, patient)))
.flatMap(deidentificator::deidentify)
.as(bundleSender::send)
.doOnNext(r -> sentBundles.getAndAdd(r.bundleCount()))
.flatMap(bundleSender::send)
.doOnNext(r -> sentBundles.incrementAndGet())
.doOnError(e -> skippedPatients.incrementAndGet())
.doOnError(e -> log.error("Skipping patient: {}", e.getMessage()))
.onErrorResume(e -> Mono.just(new Result(0)));
// .doOnError(e -> phase.set(Phase.ERROR))
.onErrorResume(e -> Mono.just(ResponseEntity.ok().build()))
.doOnComplete(
() -> {
if (phase.get() != Phase.ERROR) {
phase.set(Phase.COMPLETED);
}
})
.onErrorComplete()
.subscribe();
}

public Status status(String processId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import care.smith.fts.util.MediaTypes;
import care.smith.fts.util.error.TransferProcessException;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.validation.constraints.NotNull;
import java.net.URI;
import java.time.Duration;
import java.util.Map;
Expand All @@ -25,7 +26,6 @@
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
Expand All @@ -42,12 +42,8 @@ public RDABundleSender(
}

@Override
public Mono<Result> send(Flux<TransportBundle> bundles) {
return bundles
.map(RDABundleSender::toPlainBundle)
.flatMap(this::sendBundle)
.reduce(0, (res, resp) -> res + 1)
.map(Result::new);
public Mono<ResponseEntity<Void>> send(@NotNull TransportBundle bundle) {
return sendBundle(toPlainBundle(bundle));
}

private static Bundle toPlainBundle(TransportBundle transportBundle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

import care.smith.fts.api.*;
import care.smith.fts.api.ConsentedPatient;
import care.smith.fts.api.cda.BundleSender;
import java.util.List;
import org.hl7.fhir.r4.model.Bundle;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.http.ResponseEntity;

class DefaultTransferProcessRunnerTest {

Expand All @@ -28,14 +28,13 @@ void setUp() {

@Test
void runMockTestSuccessfully() throws InterruptedException {
BundleSender.Result result = new BundleSender.Result(1);
var process =
new TransferProcessDefinition(
"test",
pids -> fromIterable(List.of(PATIENT)),
p -> fromIterable(List.of(new Bundle())),
b -> just(new TransportBundle(new Bundle(), "tIDMapName")),
b -> just(result));
b -> just(ResponseEntity.ok().build()));

var processId = runner.start(process, List.of());
sleep(500L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static care.smith.fts.util.FhirUtils.toBundle;
import static care.smith.fts.util.auth.HttpClientAuthMethod.AuthMethod.NONE;
import static java.util.stream.Stream.generate;
import static org.mockserver.matchers.Times.once;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
Expand All @@ -12,18 +11,13 @@
import static org.springframework.http.HttpStatus.BAD_REQUEST;
import static org.springframework.http.HttpStatus.CREATED;
import static org.springframework.http.HttpStatus.OK;
import static reactor.core.publisher.Flux.fromIterable;
import static reactor.core.publisher.Flux.fromStream;
import static reactor.test.StepVerifier.create;

import care.smith.fts.api.ConsentedPatient;
import care.smith.fts.api.TransportBundle;
import care.smith.fts.api.cda.BundleSender;
import care.smith.fts.test.MockServerUtil;
import care.smith.fts.util.HttpClientConfig;
import care.smith.fts.util.error.TransferProcessException;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.List;
import java.util.stream.Stream;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Patient;
Expand All @@ -36,8 +30,8 @@
import org.mockserver.junit.jupiter.MockServerExtension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.client.*;
import reactor.core.publisher.Flux;

@SpringBootTest
@ExtendWith(MockitoExtension.class)
Expand All @@ -47,7 +41,6 @@ class RDABundleSenderTest {
@Autowired MeterRegistry meterRegistry;

private static final String PATIENT_ID = "patient-102931";
private static final ConsentedPatient PATIENT = new ConsentedPatient(PATIENT_ID);

private final HttpClientConfig server = new HttpClientConfig("http://localhost", NONE);
private final RDABundleSenderConfig config = new RDABundleSenderConfig(server, "example");
Expand All @@ -60,15 +53,17 @@ void setUp(MockServerClient mockServer, @Autowired WebClient.Builder builder) {
client = server.createClient(builder, null);
}

@Test
void nullBundleErrors(MockServerClient mockServer) {
mockServer.when(request().withMethod("POST")).respond(response().withStatusCode(OK.value()));
var bundleSender = new RDABundleSender(config, client, meterRegistry);

create(bundleSender.send(fromStream(generate(() -> null))))
.expectError(NullPointerException.class)
.verify();
}
//
// @Test
// void nullBundleErrors(MockServerClient mockServer) {
//
// mockServer.when(request().withMethod("POST")).respond(response().withStatusCode(OK.value()));
// var bundleSender = new RDABundleSender(config, client, meterRegistry);
//
// create(bundleSender.send(new TransportBundle(null, "tIDMapName")))
// .expectError(NullPointerException.class)
// .verify();
// }

@Test
void badRequest(MockServerClient mockServer) {
Expand All @@ -78,7 +73,7 @@ void badRequest(MockServerClient mockServer) {

var bundleSender = new RDABundleSender(config, client, meterRegistry);

create(bundleSender.send(Flux.just(new TransportBundle(new Bundle(), "tIDMapName"))))
create(bundleSender.send(new TransportBundle(new Bundle(), "tIDMapName")))
.expectError(WebClientResponseException.class)
.verify();
}
Expand All @@ -93,7 +88,7 @@ void contentLocationIsNull(MockServerClient mockServer) {
var bundleSender = new RDABundleSender(config, client, meterRegistry);

var bundle = Stream.of(new Patient().setId(PATIENT_ID)).collect(toBundle());
create(bundleSender.send(fromIterable(List.of(new TransportBundle(bundle, "tIDMapName")))))
create(bundleSender.send(new TransportBundle(bundle, "tIDMapName")))
.expectErrorMessage("Missing Content-Location")
.verify();
}
Expand All @@ -108,7 +103,7 @@ void contentLocationIsEmpty(MockServerClient mockServer) {
var bundleSender = new RDABundleSender(config, client, meterRegistry);

var bundle = Stream.of(new Patient().setId(PATIENT_ID)).collect(toBundle());
create(bundleSender.send(fromIterable(List.of(new TransportBundle(bundle, "tIDMapName")))))
create(bundleSender.send(new TransportBundle(bundle, "tIDMapName")))
.expectErrorMessage("Missing Content-Location")
.verify();
}
Expand All @@ -128,8 +123,8 @@ void bundleSent(MockServerClient mockServer) {
var bundleSender = new RDABundleSender(config, client, meterRegistry);

var bundle = Stream.of(new Patient().setId(PATIENT_ID)).collect(toBundle());
create(bundleSender.send(fromIterable(List.of(new TransportBundle(bundle, "tIDMapName")))))
.expectNext(new BundleSender.Result(1))
create(bundleSender.send(new TransportBundle(bundle, "tIDMapName")))
.expectNext(ResponseEntity.ok().contentLength(0).header("Connection", "keep-alive").build())
.verifyComplete();
}

Expand All @@ -144,7 +139,7 @@ void withStatusUnequalAcceptedInWaitForRDACompleted(MockServerClient mockServer)

var bundleSender = new RDABundleSender(config, client, meterRegistry);
var bundle = Stream.of(new Patient().setId(PATIENT_ID)).collect(toBundle());
create(bundleSender.send(fromIterable(List.of(new TransportBundle(bundle, "tIDMapName")))))
create(bundleSender.send(new TransportBundle(bundle, "tIDMapName")))
.expectErrorMessage("Require ACCEPTED status")
.verify();
}
Expand All @@ -170,8 +165,8 @@ void withNumberFormatExceptionInGetRetryAfter(MockServerClient mockServer) {

var bundleSender = new RDABundleSender(config, client, meterRegistry);
var bundle = Stream.of(new Patient().setId(PATIENT_ID)).collect(toBundle());
create(bundleSender.send(fromIterable(List.of(new TransportBundle(bundle, "tIDMapName")))))
.expectNext(new BundleSender.Result(1))
create(bundleSender.send(new TransportBundle(bundle, "tIDMapName")))
.expectNext(ResponseEntity.ok().contentLength(0).header("Connection", "keep-alive").build())
.verifyComplete();
}

Expand All @@ -196,8 +191,8 @@ void withNumberFormatExceptionInGetRetryAfterWithParsingException(MockServerClie

var bundleSender = new RDABundleSender(config, client, meterRegistry);
var bundle = Stream.of(new Patient().setId(PATIENT_ID)).collect(toBundle());
create(bundleSender.send(fromIterable(List.of(new TransportBundle(bundle, "tIDMapName")))))
.expectNext(new BundleSender.Result(1))
create(bundleSender.send(new TransportBundle(bundle, "tIDMapName")))
.expectNext(ResponseEntity.ok().contentLength(0).header("Connection", "keep-alive").build())
.verifyComplete();
}

Expand All @@ -215,7 +210,7 @@ void withNumberFormatExceptionInGetRetryAfterWithRetriesExhausted(MockServerClie

var bundleSender = new RDABundleSender(config, client, meterRegistry);
var bundle = Stream.of(new Patient().setId(PATIENT_ID)).collect(toBundle());
create(bundleSender.send(fromIterable(List.of(new TransportBundle(bundle, "tIDMapName")))))
create(bundleSender.send(new TransportBundle(bundle, "tIDMapName")))
.expectError(TransferProcessException.class)
.verify();
}
Expand Down