diff --git a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java index 7aa3ace6e..564a07829 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import java.io.IOException; @@ -56,6 +57,8 @@ public class HeatPump extends AbstractHvacDevice { private final boolean reverseFan; private final Duration modeChangeDelay; + private final Scheduler scheduler; + /** * Requested device state. * @@ -144,6 +147,8 @@ public HeatPump( logger.warn("using default mode change delay of {}", DEFAULT_MODE_CHANGE_DELAY); return DEFAULT_MODE_CHANGE_DELAY; }); + + scheduler = Schedulers.newSingle("HeatPump(" + getAddress() + ")"); } @Override @@ -168,7 +173,7 @@ public Flux> compute(Flux> setMode(HvacMode mode, boolean need return Flux .concat(condenserOff, forceMode) + .doOnNext(s -> logger.debug("{}: setMode: {}", getAddress(), s.getValue().command)) .doOnComplete(() -> logger.info("{}: mode changed to: {}", getAddress(), mode)); } @@ -243,14 +249,20 @@ private Flux> stopCondenser() { .just(new StateCommand(switchRunning, reverseRunning)) .doOnNext(ignore -> logger.info("{}: stopping the condenser", getAddress())) .flatMap(this::setState) + .doOnNext(ignore -> logger.warn("{}: letting the hardware settle for modeChangeDelay={}", getAddress(), modeChangeDelay)) + + // VT: FIXME: This doesn't work where as it should (see test cases) and allows the next main sequence element to jump ahead, why? +// .delayElements(modeChangeDelay, scheduler) +// .publishOn(scheduler) + .flatMap(ignore -> Mono.create(sink -> { - // Can't afford to just call delayElement() of Flux or Mono, that will change the scheduler - logger.warn("{}: letting the hardware settle for modeChangeDelay={}", getAddress(), modeChangeDelay); + // VT: NOTE: Calling delayElement() of Flux or Mono breaks things, need to figure out why try { // VT: FIXME: Need to find a lasting solution for this // For now, this should be fine as long as the output from this flux is used in a sane way. logger.warn("{}: BLOCKING WAIT FOR {}", getAddress(), modeChangeDelay); Thread.sleep(modeChangeDelay.toMillis()); + logger.warn("{}: blocking wait for {} DONE", getAddress(), modeChangeDelay); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); logger.warn("interrupted, nothing we can do about it", ex); @@ -345,6 +357,7 @@ private Flux> setState(HvacCommand command) { return Flux .zip(runningFlux, fanFlux) + .doOnNext(z -> logger.debug("{}: zip(running, fan) received: ({}, {})", getAddress(), z.getT1(), z.getT2())) .map(pair -> // If we're here, this means that the operation was carried out successfully new Signal<>(clock.instant(), diff --git a/dz3r-model/src/main/java/net/sf/dz3r/signal/hvac/HvacDeviceStatus.java b/dz3r-model/src/main/java/net/sf/dz3r/signal/hvac/HvacDeviceStatus.java index 79136fe9b..e39d4aa08 100644 --- a/dz3r-model/src/main/java/net/sf/dz3r/signal/hvac/HvacDeviceStatus.java +++ b/dz3r-model/src/main/java/net/sf/dz3r/signal/hvac/HvacDeviceStatus.java @@ -28,6 +28,6 @@ public HvacDeviceStatus(HvacCommand command, Duration uptime) { @Override public String toString() { - return "{requested=" + command + ", uptime=" + uptime + "}"; + return "{command=" + command + ", uptime=" + uptime + "}"; } } diff --git a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java index 17e25514e..ddbfa31cd 100644 --- a/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java +++ b/dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java @@ -3,9 +3,13 @@ import net.sf.dz3r.model.HvacMode; import net.sf.dz3r.signal.Signal; import net.sf.dz3r.signal.hvac.HvacCommand; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.tools.agent.ReactorDebugAgent; @@ -16,6 +20,9 @@ class HeatPumpTest { + private final Logger logger = LogManager.getLogger(); + private final Duration delay = Duration.ofMillis(500); + @BeforeAll static void init() { ReactorDebugAgent.init(); @@ -35,7 +42,7 @@ void empty() { // NOSONAR It's not complex, it's just mundane switchMode, false, switchRunning, false, switchFan, false, - Duration.ofSeconds(1)); + delay); Flux> sequence = Flux.empty(); var result = d.compute(sequence).log(); @@ -73,7 +80,7 @@ void demandBeforeMode() { // NOSONAR It's not complex, it's just mundane switchMode, false, switchRunning, false, switchFan, false, - Duration.ofSeconds(1)); + delay); var sequence = Flux.just( // This will fail new Signal(Instant.now(), new HvacCommand(null, 0.8, null)), @@ -138,7 +145,7 @@ void setMode() { // NOSONAR It's not complex, it's just mundane switchMode, false, switchRunning, false, switchFan, false, - Duration.ofSeconds(1)); + delay); var sequence = Flux.just( new Signal(Instant.now(), new HvacCommand(HvacMode.HEATING, 0.8, null)) ); @@ -189,7 +196,7 @@ void changeMode() { // NOSONAR It's not complex, it's just mundane switchMode, false, switchRunning, false, switchFan, false, - Duration.ofSeconds(1)); + delay); var sequence = Flux.just( new Signal(Instant.now(), new HvacCommand(HvacMode.HEATING, 0.8, null)), new Signal(Instant.now(), new HvacCommand(HvacMode.COOLING, 0.7, null)) @@ -265,7 +272,7 @@ void boot() { // NOSONAR It's not complex, it's just mundane switchMode, false, switchRunning, false, switchFan, false, - Duration.ofSeconds(1)); + delay); var sequence = Flux.just( new Signal(Instant.now(), new HvacCommand(null, 0d, null)), new Signal(Instant.now(), new HvacCommand(HvacMode.COOLING, null, null)), @@ -319,4 +326,102 @@ void boot() { // NOSONAR It's not complex, it's just mundane }) .verifyComplete(); } + + @Test + void delayElementsFromFlux1() { + + var s = Schedulers.newSingle("single"); + var head = Flux.range(0, 3).publishOn(s); + var detour = Flux.just(42).delayElements(delay, s); + var tail = Flux.range(3, 3); + + var result = Flux.concat( + head, + detour.flatMap(Flux::just), + tail) + .doOnNext(e -> logger.info("{}", e)); + + StepVerifier + .create(result) + .assertNext(e -> assertThat(e).isZero()) + .assertNext(e -> assertThat(e).isEqualTo(1)) + .assertNext(e -> assertThat(e).isEqualTo(2)) + .assertNext(e -> assertThat(e).isEqualTo(42)) + .assertNext(e -> assertThat(e).isEqualTo(3)) + .assertNext(e -> assertThat(e).isEqualTo(4)) + .assertNext(e -> assertThat(e).isEqualTo(5)) + .verifyComplete(); + } + + @Test + void delayElementsFromFlux2() { + + var s = Schedulers.newSingle("single"); + var head = Flux.range(0, 3).publishOn(s); + var detour = Flux.just(42).delayElements(delay); + var tail = Flux.range(3, 3); + + var result = Flux.concat( + head, + detour.flatMap(Flux::just).publishOn(s), + tail) + .doOnNext(e -> logger.info("{}", e)); + + StepVerifier + .create(result) + .assertNext(e -> assertThat(e).isZero()) + .assertNext(e -> assertThat(e).isEqualTo(1)) + .assertNext(e -> assertThat(e).isEqualTo(2)) + .assertNext(e -> assertThat(e).isEqualTo(42)) + .assertNext(e -> assertThat(e).isEqualTo(3)) + .assertNext(e -> assertThat(e).isEqualTo(4)) + .assertNext(e -> assertThat(e).isEqualTo(5)) + .verifyComplete(); + } + + @Test + void delayElementsFromMono1() { + + var s = Schedulers.newSingle("single"); + var head = Flux.range(0, 3).publishOn(s); + var detour = Flux.just(-1).flatMap(ignore -> Mono.just(42)).delayElements(delay, s); + var tail = Flux.range(3, 3); + + var result = Flux.concat(head, detour, tail) + .doOnNext(e -> logger.info("{}", e)); + + StepVerifier + .create(result) + .assertNext(e -> assertThat(e).isZero()) + .assertNext(e -> assertThat(e).isEqualTo(1)) + .assertNext(e -> assertThat(e).isEqualTo(2)) + .assertNext(e -> assertThat(e).isEqualTo(42)) + .assertNext(e -> assertThat(e).isEqualTo(3)) + .assertNext(e -> assertThat(e).isEqualTo(4)) + .assertNext(e -> assertThat(e).isEqualTo(5)) + .verifyComplete(); + } + + @Test + void delayElementsFromMono2() { + + var s = Schedulers.newSingle("single"); + var head = Flux.range(0, 3).publishOn(s); + var detour = Flux.just(-1).flatMap(ignore -> Mono.just(42).delayElement(delay)).publishOn(s); + var tail = Flux.range(3, 3); + + var result = Flux.concat(head, detour, tail) + .doOnNext(e -> logger.info("{}", e)); + + StepVerifier + .create(result) + .assertNext(e -> assertThat(e).isZero()) + .assertNext(e -> assertThat(e).isEqualTo(1)) + .assertNext(e -> assertThat(e).isEqualTo(2)) + .assertNext(e -> assertThat(e).isEqualTo(42)) + .assertNext(e -> assertThat(e).isEqualTo(3)) + .assertNext(e -> assertThat(e).isEqualTo(4)) + .assertNext(e -> assertThat(e).isEqualTo(5)) + .verifyComplete(); + } }