Skip to content

Commit

Permalink
Use health monitor for PCES backpressure.
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
Cody Littley committed May 26, 2024
1 parent 2daf25b commit 4b2feb1
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ public OutputWire<Duration> getHealthMonitorWire() {
return new NoOpOutputWire<>(this, "HealthMonitor");
}

/**
* {@inheritDoc}
*/
@NonNull
@Override
public Duration getUnhealthyDuration() {
return Duration.ZERO;
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public class StandardWiringModel extends TraceableWiringModel {
*/
private final TaskScheduler<Duration> healthMonitorScheduler;

/**
* The health monitor.
*/
private HealthMonitor healthMonitor;

/**
* The input wire for the health monitor.
*/
Expand Down Expand Up @@ -154,6 +159,16 @@ public OutputWire<Duration> getHealthMonitorWire() {
return healthMonitorScheduler.getOutputWire();
}

/**
* {@inheritDoc}
*/
@NonNull
@Override
public Duration getUnhealthyDuration() {
throwIfNotStarted();
return healthMonitor.getUnhealthyDuration();
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -186,8 +201,7 @@ public void start() {
anchor.start();
}

final HealthMonitor healthMonitor =
new HealthMonitor(platformContext, schedulers, healthLogThreshold, healthLogPeriod);
healthMonitor = new HealthMonitor(platformContext, schedulers, healthLogThreshold, healthLogPeriod);
healthMonitorInputWire.bind(healthMonitor::checkSystemHealth);

markAsStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ String generateWiringDiagram(
@NonNull
OutputWire<Duration> getHealthMonitorWire();

/**
* Get the duration that any particular scheduler has been concurrently unhealthy. This getter is intended for use
* by things outside of the wiring framework. For use within the framework, the proper way to access this value is
* via the wire returned by {@link #getHealthMonitorWire()}.
*
* @return the duration that any particular scheduler has been concurrently unhealthy, or {@link Duration#ZERO} if
* no scheduler is currently unhealthy
*/
@NonNull
Duration getUnhealthyDuration();

/**
* Build a wire that produces an instant (reflecting current time) at the specified rate. Note that the exact rate
* of heartbeats may vary. This is a best effort algorithm, and actual rates may vary depending on a variety of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

/**
* Monitors the health of a wiring model. A healthy wiring model is a model without too much work backed up in queues.
Expand Down Expand Up @@ -61,6 +62,11 @@ public class HealthMonitor {
*/
private final HealthMonitorLogger logger;

/**
* The longest duration that any single scheduler has been concurrently unhealthy.
*/
private final AtomicReference<Duration> longestUnhealthyDuration = new AtomicReference<>(Duration.ZERO);

/**
* Constructor.
*
Expand Down Expand Up @@ -114,11 +120,27 @@ public Duration checkSystemHealth(@NonNull final Instant now) {
}

try {
// Only report when there is a change in health status
metrics.reportUnhealthyDuration(longestUnhealthyDuration);
return longestUnhealthyDuration.equals(previouslyReportedDuration) ? null : longestUnhealthyDuration;
if (longestUnhealthyDuration.equals(previouslyReportedDuration)) {
// Only report when there is a change in health status
return null;
} else {
this.longestUnhealthyDuration.set(longestUnhealthyDuration);
metrics.reportUnhealthyDuration(longestUnhealthyDuration);
return longestUnhealthyDuration.equals(previouslyReportedDuration) ? null : longestUnhealthyDuration;
}
} finally {
previouslyReportedDuration = longestUnhealthyDuration;
}
}

/**
* Get the duration that any particular scheduler has been concurrently unhealthy.
*
* @return the duration that any particular scheduler has been concurrently unhealthy, or {@link Duration#ZERO} if
* no scheduler is currently unhealthy
*/
@NonNull
public Duration getUnhealthyDuration() {
return longestUnhealthyDuration.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.swirlds.platform;

import static com.swirlds.common.threading.interrupt.Uninterruptable.abortAndThrowIfInterrupted;
import static com.swirlds.common.utility.CompareTo.isLessThan;
import static com.swirlds.logging.legacy.LogMarker.RECONNECT;
import static com.swirlds.logging.legacy.LogMarker.STARTUP;
import static com.swirlds.logging.legacy.LogMarker.STATE_TO_DISK;
Expand Down Expand Up @@ -56,6 +57,7 @@
import com.swirlds.platform.event.AncientMode;
import com.swirlds.platform.event.EventCounter;
import com.swirlds.platform.event.GossipEvent;
import com.swirlds.platform.event.preconsensus.PcesConfig;
import com.swirlds.platform.event.preconsensus.PcesFileTracker;
import com.swirlds.platform.event.preconsensus.PcesReplayer;
import com.swirlds.platform.event.validation.AddressBookUpdate;
Expand Down Expand Up @@ -101,6 +103,7 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -255,12 +258,17 @@ public SwirldsPlatform(@NonNull final PlatformComponentBuilder builder) {
.set(x -> platformWiring.getStatusActionSubmitter().submitStatusAction(x));

final StateSigner stateSigner = new StateSigner(new PlatformSigner(keysAndCerts), statusNexus);
final Duration replayHealthThreshold = platformContext
.getConfiguration()
.getConfigData(PcesConfig.class)
.replayHealthThreshold();
final PcesReplayer pcesReplayer = new PcesReplayer(
platformContext.getTime(),
platformWiring.getPcesReplayerEventOutput(),
platformWiring::flushIntakePipeline,
platformWiring::flushConsensusRoundHandler,
() -> latestImmutableStateNexus.getState("PCES replay"));
() -> latestImmutableStateNexus.getState("PCES replay"),
() -> isLessThan(blocks.model().getUnhealthyDuration(), replayHealthThreshold));

initializeState(initialState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
* has been stuck for too long
* @param suspiciousRoundDurabilityDuration the duration after which a round is considered stuck in the round
* durability buffer component
* @param replayHealthThreshold if the system is unhealthy (i.e. overloaded) for more than this amount of
* time, pause PCES replay until the system is able to catch up.
*/
@ConfigData("event.preconsensus")
public record PcesConfig(
Expand All @@ -102,4 +104,5 @@ public record PcesConfig(
@ConfigProperty(defaultValue = "true") boolean compactLastFileOnStartup,
@ConfigProperty(defaultValue = "false") boolean forceIgnorePcesSignatures,
@ConfigProperty(defaultValue = "1m") Duration roundDurabilityBufferHeartbeatPeriod,
@ConfigProperty(defaultValue = "1m") Duration suspiciousRoundDurabilityDuration) {}
@ConfigProperty(defaultValue = "1m") Duration suspiciousRoundDurabilityDuration,
@ConfigProperty(defaultValue = "5s") Duration replayHealthThreshold) {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.swirlds.common.formatting.StringFormattingUtils.commaSeparatedNumber;
import static com.swirlds.common.units.TimeUnit.UNIT_MILLISECONDS;
import static com.swirlds.logging.legacy.LogMarker.STARTUP;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.swirlds.base.time.Time;
import com.swirlds.common.formatting.UnitFormatter;
Expand All @@ -38,8 +39,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

// TODO this needs to be made to work with the health monitor

/**
* This class encapsulates the logic for replaying preconsensus events at boot up time.
*/
Expand All @@ -54,6 +53,7 @@ public class PcesReplayer {
private final Runnable flushTransactionHandling;

private final Supplier<ReservedSignedState> latestImmutableState;
private final Supplier<Boolean> isSystemHealthy;

/**
* Constructor
Expand All @@ -63,19 +63,23 @@ public class PcesReplayer {
* @param flushIntake a runnable that flushes the intake pipeline
* @param flushTransactionHandling a runnable that flushes the transaction handling pipeline
* @param latestImmutableState a supplier of the latest immutable state
* @param isSystemHealthy a supplier that returns true if the system is healthy and false if the system is
* overwhelmed
*/
public PcesReplayer(
final @NonNull Time time,
final @NonNull StandardOutputWire<GossipEvent> eventOutputWire,
final @NonNull Runnable flushIntake,
final @NonNull Runnable flushTransactionHandling,
final @NonNull Supplier<ReservedSignedState> latestImmutableState) {
final @NonNull Supplier<ReservedSignedState> latestImmutableState,
final @NonNull Supplier<Boolean> isSystemHealthy) {

this.time = Objects.requireNonNull(time);
this.eventOutputWire = Objects.requireNonNull(eventOutputWire);
this.flushIntake = Objects.requireNonNull(flushIntake);
this.flushTransactionHandling = Objects.requireNonNull(flushTransactionHandling);
this.latestImmutableState = Objects.requireNonNull(latestImmutableState);
this.isSystemHealthy = Objects.requireNonNull(isSystemHealthy);
}

/**
Expand Down Expand Up @@ -162,6 +166,10 @@ public NoInput replayPces(@NonNull final IOIterator<GossipEvent> eventIterator)
int transactionCount = 0;
try {
while (eventIterator.hasNext()) {
// If the system is not keeping up with the rate at which we are replaying PCES, we need to wait
// until it catches up before we can continue.
waitUntilHealthy();

final GossipEvent event = eventIterator.next();

eventCount++;
Expand All @@ -182,4 +190,19 @@ public NoInput replayPces(@NonNull final IOIterator<GossipEvent> eventIterator)

return NoInput.getInstance();
}

/**
* Blocks until the system is in a healthy state. An unhealthy state is caused by the backlog of work growing too
* large.
*/
private void waitUntilHealthy() {
while (!isSystemHealthy.get()) {
// wait until the system is healthy
try {
MILLISECONDS.sleep(100);
} catch (final InterruptedException e) {
throw new RuntimeException("interrupted while replaying PCES", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
*/
@DisplayName("PcesReplayer Tests")
class PcesReplayerTests {

// TODO test health backpressure

@Test
@DisplayName("Test standard operation")
void testStandardOperation() {
Expand Down Expand Up @@ -75,7 +78,7 @@ void testStandardOperation() {
final Supplier<ReservedSignedState> latestImmutableStateSupplier = () -> latestImmutableState;

final PcesReplayer replayer = new PcesReplayer(
time, eventOutputWire, flushIntake, flushTransactionHandling, latestImmutableStateSupplier);
time, eventOutputWire, flushIntake, flushTransactionHandling, latestImmutableStateSupplier, () -> true);

final List<GossipEvent> events = new ArrayList<>();
final int eventCount = 100;
Expand Down

0 comments on commit 4b2feb1

Please sign in to comment.