Skip to content

Commit

Permalink
[server] Augment completion report with previous ready to serve state
Browse files Browse the repository at this point in the history
This change persists into the offset record weather or not this partition originally marked itself as ready to serve.  This is then used on restart to determine if a node should report completion or not when coming online.

The intention is that when a node is lagged under normal conditions (things like heavy load or some other bad condition), we can still restart the node or do deployments and have the node come up and serve traffic.  This is done under the notion that it's generally preferable to be online and serving and stale as opposed to caught up/catching up, but unavailable.

This will not impact buffer replay on a version push because a replay won't pass a ready to serve check.
  • Loading branch information
ZacAttack committed Nov 6, 2024
1 parent 429e589 commit e9edc16
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
*/
public class PartitionConsumptionState {
private static final int MAX_INCREMENTAL_PUSH_ENTRY_NUM = 50;
private static final String PREVIOUSLY_READY_TO_SERVE = "previouslyReadyToServe";
private static final String TRUE = "true";

private final String replicaId;
private final int partition;
Expand Down Expand Up @@ -599,6 +601,19 @@ public void setSkipKafkaMessage(boolean skipKafkaMessage) {
this.skipKafkaMessage = skipKafkaMessage;
}

/**
* This persists to the offsetRecord associated to this partitionConsumptionState that the ready to serve check has
* passed. This will be persisted to disk once the offsetRecord is checkpointed, and subsequent restarts will
* consult this information when determining if the node should come online or not to serve traffic
*/
public void recordReadyToServeInOffsetRecord() {
offsetRecord.setPreviousStatusesEntry(PREVIOUSLY_READY_TO_SERVE, TRUE);
}

public boolean getReadyToServeInOffsetRecord() {
return offsetRecord.getPreviousStatusesEntry(PREVIOUSLY_READY_TO_SERVE).equals(TRUE);
}

/**
* This immutable class holds a association between a key and value and the source offset of the consumed message.
* The value could be either as received in kafka ConsumerRecord or it could be a write computed value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,8 @@ private void checkConsumptionStateWhenStart(
offsetLag,
previousOffsetLag,
offsetLagThreshold);
if (offsetLag < previousOffsetLag + offsetLagDeltaRelaxFactor * offsetLagThreshold) {
if ((offsetLag < previousOffsetLag + offsetLagDeltaRelaxFactor * offsetLagThreshold)
&& newPartitionConsumptionState.getReadyToServeInOffsetRecord()) {
newPartitionConsumptionState.lagHasCaughtUp();
reportCompleted(newPartitionConsumptionState, true);
isCompletedReport = true;
Expand Down Expand Up @@ -4025,6 +4026,7 @@ private ReadyToServeCheck getDefaultReadyToServeChecker() {
}
unSubscribePartition(new PubSubTopicPartitionImpl(versionTopic, partition));
}
partitionConsumptionState.recordReadyToServeInOffsetRecord();
} else {
ingestionNotificationDispatcher.reportProgress(partitionConsumptionState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class OffsetRecord {
public static final long DEFAULT_OFFSET_LAG = -1;
public static final String NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY = ""; // A place holder key
private static final String PARTITION_STATE_STRING = "PartitionState";
private static final String NULL_STRING = "null";
private final PartitionState partitionState;
private final InternalAvroSpecificSerializer<PartitionState> serializer;

Expand Down Expand Up @@ -85,6 +86,14 @@ public long getLocalVersionTopicOffset() {
return this.partitionState.offset;
}

public void setPreviousStatusesEntry(String key, String value) {
partitionState.getPreviousStatuses().put(key, value);
}

public String getPreviousStatusesEntry(String key) {
return partitionState.getPreviousStatuses().getOrDefault(key, NULL_STRING).toString();
}

public void setCheckpointLocalVersionTopicOffset(long offset) {
this.partitionState.offset = offset;
}
Expand Down

0 comments on commit e9edc16

Please sign in to comment.