Skip to content

Commit

Permalink
Rearranging chairs, dangerously (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Sep 7, 2023
1 parent 966ff60 commit 32c24bb
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 9 deletions.
19 changes: 16 additions & 3 deletions dz3r-model/src/main/java/net/sf/dz3r/device/actuator/HeatPump.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class HeatPump extends AbstractHvacDevice {
private final boolean reverseFan;
private final Duration modeChangeDelay;

private final Scheduler scheduler;

/**
* Requested device state.
*
Expand Down Expand Up @@ -144,6 +147,8 @@ public HeatPump(
logger.warn("using default mode change delay of {}", DEFAULT_MODE_CHANGE_DELAY);
return DEFAULT_MODE_CHANGE_DELAY;
});

scheduler = Schedulers.newSingle("HeatPump(" + getAddress() + ")");
}

@Override
Expand All @@ -168,7 +173,7 @@ public Flux<Signal<HvacDeviceStatus, Void>> compute(Flux<Signal<HvacCommand, Voi

return Flux
.concat(init, commands, shutdown)
.publishOn(Schedulers.newSingle("HeatPump(" + getAddress() + ")"))
.publishOn(scheduler)
.flatMap(this::process)
.doOnNext(this::broadcast);
}
Expand Down Expand Up @@ -231,6 +236,7 @@ private Flux<Signal<HvacDeviceStatus, Void>> setMode(HvacMode mode, boolean need

return Flux
.concat(condenserOff, forceMode)
.doOnNext(s -> logger.debug("{}: setMode: {}", getAddress(), s.getValue().command))
.doOnComplete(() -> logger.info("{}: mode changed to: {}", getAddress(), mode));
}

Expand All @@ -243,14 +249,20 @@ private Flux<Signal<HvacDeviceStatus, Void>> stopCondenser() {
.just(new StateCommand(switchRunning, reverseRunning))
.doOnNext(ignore -> logger.info("{}: stopping the condenser", getAddress()))
.flatMap(this::setState)
.doOnNext(ignore -> logger.warn("{}: letting the hardware settle for modeChangeDelay={}", getAddress(), modeChangeDelay))

// VT: FIXME: This doesn't work where as it should (see test cases) and allows the next main sequence element to jump ahead, why?
// .delayElements(modeChangeDelay, scheduler)
// .publishOn(scheduler)

.flatMap(ignore -> Mono.create(sink -> {
// Can't afford to just call delayElement() of Flux or Mono, that will change the scheduler
logger.warn("{}: letting the hardware settle for modeChangeDelay={}", getAddress(), modeChangeDelay);
// VT: NOTE: Calling delayElement() of Flux or Mono breaks things, need to figure out why
try {
// VT: FIXME: Need to find a lasting solution for this
// For now, this should be fine as long as the output from this flux is used in a sane way.
logger.warn("{}: BLOCKING WAIT FOR {}", getAddress(), modeChangeDelay);
Thread.sleep(modeChangeDelay.toMillis());
logger.warn("{}: blocking wait for {} DONE", getAddress(), modeChangeDelay);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.warn("interrupted, nothing we can do about it", ex);
Expand Down Expand Up @@ -345,6 +357,7 @@ private Flux<Signal<HvacDeviceStatus, Void>> setState(HvacCommand command) {

return Flux
.zip(runningFlux, fanFlux)
.doOnNext(z -> logger.debug("{}: zip(running, fan) received: ({}, {})", getAddress(), z.getT1(), z.getT2()))
.map(pair ->
// If we're here, this means that the operation was carried out successfully
new Signal<>(clock.instant(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ public HvacDeviceStatus(HvacCommand command, Duration uptime) {

@Override
public String toString() {
return "{requested=" + command + ", uptime=" + uptime + "}";
return "{command=" + command + ", uptime=" + uptime + "}";
}
}
115 changes: 110 additions & 5 deletions dz3r-model/src/test/java/net/sf/dz3r/device/actuator/HeatPumpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import net.sf.dz3r.model.HvacMode;
import net.sf.dz3r.signal.Signal;
import net.sf.dz3r.signal.hvac.HvacCommand;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.tools.agent.ReactorDebugAgent;

Expand All @@ -16,6 +20,9 @@

class HeatPumpTest {

private final Logger logger = LogManager.getLogger();
private final Duration delay = Duration.ofMillis(500);

@BeforeAll
static void init() {
ReactorDebugAgent.init();
Expand All @@ -35,7 +42,7 @@ void empty() { // NOSONAR It's not complex, it's just mundane
switchMode, false,
switchRunning, false,
switchFan, false,
Duration.ofSeconds(1));
delay);
Flux<Signal<HvacCommand, Void>> sequence = Flux.empty();

var result = d.compute(sequence).log();
Expand Down Expand Up @@ -73,7 +80,7 @@ void demandBeforeMode() { // NOSONAR It's not complex, it's just mundane
switchMode, false,
switchRunning, false,
switchFan, false,
Duration.ofSeconds(1));
delay);
var sequence = Flux.just(
// This will fail
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(null, 0.8, null)),
Expand Down Expand Up @@ -138,7 +145,7 @@ void setMode() { // NOSONAR It's not complex, it's just mundane
switchMode, false,
switchRunning, false,
switchFan, false,
Duration.ofSeconds(1));
delay);
var sequence = Flux.just(
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(HvacMode.HEATING, 0.8, null))
);
Expand Down Expand Up @@ -189,7 +196,7 @@ void changeMode() { // NOSONAR It's not complex, it's just mundane
switchMode, false,
switchRunning, false,
switchFan, false,
Duration.ofSeconds(1));
delay);
var sequence = Flux.just(
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(HvacMode.HEATING, 0.8, null)),
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(HvacMode.COOLING, 0.7, null))
Expand Down Expand Up @@ -265,7 +272,7 @@ void boot() { // NOSONAR It's not complex, it's just mundane
switchMode, false,
switchRunning, false,
switchFan, false,
Duration.ofSeconds(1));
delay);
var sequence = Flux.just(
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(null, 0d, null)),
new Signal<HvacCommand, Void>(Instant.now(), new HvacCommand(HvacMode.COOLING, null, null)),
Expand Down Expand Up @@ -319,4 +326,102 @@ void boot() { // NOSONAR It's not complex, it's just mundane
})
.verifyComplete();
}

@Test
void delayElementsFromFlux1() {

var s = Schedulers.newSingle("single");
var head = Flux.range(0, 3).publishOn(s);
var detour = Flux.just(42).delayElements(delay, s);
var tail = Flux.range(3, 3);

var result = Flux.concat(
head,
detour.flatMap(Flux::just),
tail)
.doOnNext(e -> logger.info("{}", e));

StepVerifier
.create(result)
.assertNext(e -> assertThat(e).isZero())
.assertNext(e -> assertThat(e).isEqualTo(1))
.assertNext(e -> assertThat(e).isEqualTo(2))
.assertNext(e -> assertThat(e).isEqualTo(42))
.assertNext(e -> assertThat(e).isEqualTo(3))
.assertNext(e -> assertThat(e).isEqualTo(4))
.assertNext(e -> assertThat(e).isEqualTo(5))
.verifyComplete();
}

@Test
void delayElementsFromFlux2() {

var s = Schedulers.newSingle("single");
var head = Flux.range(0, 3).publishOn(s);
var detour = Flux.just(42).delayElements(delay);
var tail = Flux.range(3, 3);

var result = Flux.concat(
head,
detour.flatMap(Flux::just).publishOn(s),
tail)
.doOnNext(e -> logger.info("{}", e));

StepVerifier
.create(result)
.assertNext(e -> assertThat(e).isZero())
.assertNext(e -> assertThat(e).isEqualTo(1))
.assertNext(e -> assertThat(e).isEqualTo(2))
.assertNext(e -> assertThat(e).isEqualTo(42))
.assertNext(e -> assertThat(e).isEqualTo(3))
.assertNext(e -> assertThat(e).isEqualTo(4))
.assertNext(e -> assertThat(e).isEqualTo(5))
.verifyComplete();
}

@Test
void delayElementsFromMono1() {

var s = Schedulers.newSingle("single");
var head = Flux.range(0, 3).publishOn(s);
var detour = Flux.just(-1).flatMap(ignore -> Mono.just(42)).delayElements(delay, s);
var tail = Flux.range(3, 3);

var result = Flux.concat(head, detour, tail)
.doOnNext(e -> logger.info("{}", e));

StepVerifier
.create(result)
.assertNext(e -> assertThat(e).isZero())
.assertNext(e -> assertThat(e).isEqualTo(1))
.assertNext(e -> assertThat(e).isEqualTo(2))
.assertNext(e -> assertThat(e).isEqualTo(42))
.assertNext(e -> assertThat(e).isEqualTo(3))
.assertNext(e -> assertThat(e).isEqualTo(4))
.assertNext(e -> assertThat(e).isEqualTo(5))
.verifyComplete();
}

@Test
void delayElementsFromMono2() {

var s = Schedulers.newSingle("single");
var head = Flux.range(0, 3).publishOn(s);
var detour = Flux.just(-1).flatMap(ignore -> Mono.just(42).delayElement(delay)).publishOn(s);
var tail = Flux.range(3, 3);

var result = Flux.concat(head, detour, tail)
.doOnNext(e -> logger.info("{}", e));

StepVerifier
.create(result)
.assertNext(e -> assertThat(e).isZero())
.assertNext(e -> assertThat(e).isEqualTo(1))
.assertNext(e -> assertThat(e).isEqualTo(2))
.assertNext(e -> assertThat(e).isEqualTo(42))
.assertNext(e -> assertThat(e).isEqualTo(3))
.assertNext(e -> assertThat(e).isEqualTo(4))
.assertNext(e -> assertThat(e).isEqualTo(5))
.verifyComplete();
}
}

0 comments on commit 32c24bb

Please sign in to comment.