Skip to content

Commit

Permalink
BFD-3738: Track RDA sequence number latency (#2535)
Browse files Browse the repository at this point in the history
  • Loading branch information
aschey-forpeople authored Feb 3, 2025
1 parent 3232fa9 commit eca4638
Show file tree
Hide file tree
Showing 24 changed files with 243 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ public final class AppConfiguration extends BaseAppConfiguration {
Set.of(
"FissClaimRdaSink.change.latency.millis",
"McsClaimRdaSink.change.latency.millis",
"FissClaimRdaSink.lastSeq",
"McsClaimRdaSink.lastSeq",
"FissClaimRdaSink.maxSeq",
"McsClaimRdaSink.maxSeq",
CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import gov.cms.bfd.model.rda.MessageError;
import gov.cms.model.dsl.codegen.library.DataTransformer;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -191,6 +192,13 @@ Optional<TClaim> transformMessage(String apiVersion, TMessage message)
*/
int writeClaims(Collection<TClaim> objects) throws ProcessingException;

/**
* Updates the available range of sequence numbers.
*
* @param sequenceNumberRange Current range of sequence numbers.
*/
void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange);

/**
* Return count of records processed since the most recent call to a write method or this method.
* Calls to this method collect the current value and resets the counter. The sum of this method's
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.cms.bfd.pipeline.rda.grpc.server;

import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import java.util.NoSuchElementException;

/**
Expand Down Expand Up @@ -45,6 +46,11 @@ public T next() throws Exception {
throw new NoSuchElementException();
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(0).build();
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.cms.bfd.pipeline.rda.grpc.server;

import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -62,6 +63,11 @@ public T next() throws Exception {
return source.next();
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(remainingBeforeThrow).build();
}

@Override
public void close() throws Exception {
source.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.io.CharSource;
import com.google.protobuf.util.JsonFormat;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.FissClaimChange;
import gov.cms.mpsm.rda.v1.McsClaimChange;
import java.io.BufferedReader;
Expand Down Expand Up @@ -167,6 +168,12 @@ public T next() throws Exception {
return answer;
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
// Can't easily support this since it would require deserializing the entire contents
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(0).build();
}

@Override
public void close() throws Exception {
reader.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gov.cms.bfd.pipeline.rda.grpc.server;

import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;

/**
* Interface for objects that produce message objects from some source (e.g. a file, an array, a
* database, etc). Mirrors the Iterator protocol but allows for unwrapped exceptions to be passed
Expand Down Expand Up @@ -36,4 +38,11 @@ public interface MessageSource<T> extends AutoCloseable {
* @throws Exception if there is an issue getting the next claim
*/
MessageSource<T> skipTo(long startingSequenceNumber) throws Exception;

/**
* Returns the current range of sequence numbers.
*
* @return sequence number range
*/
ClaimSequenceNumberRange getSequenceNumberRange();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.Timestamp;
import gov.cms.mpsm.rda.v1.ChangeType;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.FissClaimChange;
import gov.cms.mpsm.rda.v1.RecordSource;
import java.time.Clock;
Expand Down Expand Up @@ -89,6 +90,11 @@ public FissClaimChange next() {
return change;
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(maxToSend).build();
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.Timestamp;
import gov.cms.mpsm.rda.v1.ChangeType;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.McsClaimChange;
import gov.cms.mpsm.rda.v1.RecordSource;
import java.time.Clock;
Expand Down Expand Up @@ -87,6 +88,11 @@ public McsClaimChange next() {
return change;
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return ClaimSequenceNumberRange.newBuilder().setLower(0).setUpper(maxToSend).build();
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,42 @@ public void getVersion(Empty request, StreamObserver<ApiVersion> responseObserve
}
}

@Override
public void getFissClaimsSequenceNumberRange(
com.google.protobuf.Empty request,
StreamObserver<gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange> responseObserver) {
LOGGER.info("start getFissClaimsSequenceNumberRange");
try (MessageSource<FissClaimChange> source = messageSourceFactory.createFissMessageSource(0)) {
responseObserver.onNext(source.getSequenceNumberRange());
responseObserver.onCompleted();
LOGGER.info("end getFissClaimsSequenceNumberRange");
} catch (Exception ex) {
responseObserver.onError(Status.fromThrowable(ex).asException());
LOGGER.error(
"end getFissClaimsSequenceNumberRange call - call failed with exception: message={}",
ex.getMessage(),
ex);
}
}

@Override
public void getMcsClaimsSequenceNumberRange(
com.google.protobuf.Empty request,
StreamObserver<gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange> responseObserver) {
LOGGER.info("start getMcsClaimsSequenceNumberRange");
try (MessageSource<McsClaimChange> source = messageSourceFactory.createMcsMessageSource(0)) {
responseObserver.onNext(source.getSequenceNumberRange());
responseObserver.onCompleted();
LOGGER.info("end getMcsClaimsSequenceNumberRange");
} catch (Exception ex) {
responseObserver.onError(Status.fromThrowable(ex).asException());
LOGGER.error(
"end getMcsClaimsSequenceNumberRange call - call failed with exception: message={}",
ex.getMessage(),
ex);
}
}

@Override
public void getFissClaims(
ClaimRequest request, StreamObserver<FissClaimChange> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.annotations.VisibleForTesting;
import gov.cms.bfd.pipeline.rda.grpc.RdaChange;
import gov.cms.bfd.pipeline.sharedutils.s3.S3DirectoryDao;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -186,6 +187,11 @@ public synchronized T next() throws Exception {
return current.next();
}

@Override
public ClaimSequenceNumberRange getSequenceNumberRange() {
return current.getSequenceNumberRange();
}

@Override
public synchronized void close() throws Exception {
current.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gov.cms.bfd.pipeline.sharedutils.MultiCloser;
import gov.cms.bfd.pipeline.sharedutils.SequenceNumberTracker;
import gov.cms.model.dsl.codegen.library.DataTransformer;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import jakarta.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -272,6 +273,11 @@ public Optional<Long> readMaxExistingSequenceNumber() throws ProcessingException
return sink.readMaxExistingSequenceNumber();
}

@Override
public void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange) {
sink.updateSequenceNumberRange(sequenceNumberRange);
}

/**
* This method is not implemented since that would bypass the queue used to schedule writes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import gov.cms.bfd.pipeline.sharedutils.PipelineApplicationState;
import gov.cms.bfd.pipeline.sharedutils.TransactionManager;
import gov.cms.model.dsl.codegen.library.DataTransformer;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -238,6 +239,11 @@ public int writeClaims(Collection<RdaChange<TClaim>> claims) throws ProcessingEx
return claims.size();
}

@Override
public void updateSequenceNumberRange(ClaimSequenceNumberRange sequenceNumberRange) {
metrics.setMaxSequenceNumber(sequenceNumberRange.getUpper());
}

/**
* Always returns zero since all claims are written synchronously by writeMessages.
*
Expand Down Expand Up @@ -525,7 +531,7 @@ static class Metrics {
/** Tracks the number of updates per database transaction. */
private final DistributionSummary dbBatchSize;

/** Latest sequnce number from writing a batch. * */
/** Latest sequence number from writing a batch. * */
private final AtomicLong latestSequenceNumber;

/** The value returned by the latestSequenceNumber gauge. * */
Expand All @@ -534,6 +540,12 @@ static class Metrics {
/** The number of insert statements executed. */
private final DistributionSummary insertCount;

/** Maximum available sequence number. */
private final AtomicLong maxSequenceNumber;

/** The value returned by the maxSequenceNumber gauge. */
private final AtomicLong maxSequenceNumberValue;

/**
* Initializes all the metrics.
*
Expand All @@ -560,6 +572,9 @@ private Metrics(Class<?> klass, MeterRegistry appMetrics) {
latestSequenceNumber = GAUGES.getGaugeForName(appMetrics, latestSequenceNumberGaugeName);
latestSequenceNumberValue = GAUGES.getValueForName(latestSequenceNumberGaugeName);
insertCount = appMetrics.summary(MetricRegistry.name(base, "insertCount"));
String maxSequenceNumberGaugeName = MetricRegistry.name(base, "maxSeq");
maxSequenceNumber = GAUGES.getGaugeForName(appMetrics, maxSequenceNumberGaugeName);
maxSequenceNumberValue = GAUGES.getValueForName(maxSequenceNumberGaugeName);
}

/**
Expand All @@ -571,5 +586,14 @@ private Metrics(Class<?> klass, MeterRegistry appMetrics) {
void setLatestSequenceNumber(long value) {
latestSequenceNumberValue.set(value);
}

/**
* Sets the {@link #maxSequenceNumber}.
*
* @param value value to set
*/
void setMaxSequenceNumber(long value) {
maxSequenceNumberValue.set(value);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package gov.cms.bfd.pipeline.rda.grpc.source;

import com.google.common.base.Preconditions;
import com.google.protobuf.Empty;
import gov.cms.mpsm.rda.v1.ClaimRequest;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.FissClaimChange;
import gov.cms.mpsm.rda.v1.RDAServiceGrpc;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -45,4 +47,13 @@ public GrpcResponseStream<FissClaimChange> callService(
ClientCalls.blockingServerStreamingCall(call, request);
return new GrpcResponseStream<>(call, apiResults);
}

@Override
public ClaimSequenceNumberRange callSequenceNumberRangeService(
ManagedChannel channel, CallOptions callOptions) {
final MethodDescriptor<Empty, ClaimSequenceNumberRange> method =
RDAServiceGrpc.getGetFissClaimsSequenceNumberRangeMethod();
final ClientCall<Empty, ClaimSequenceNumberRange> call = channel.newCall(method, callOptions);
return ClientCalls.blockingUnaryCall(call, Empty.getDefaultInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.protobuf.Empty;
import gov.cms.bfd.pipeline.rda.grpc.RdaServerJob;
import gov.cms.mpsm.rda.v1.ApiVersion;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.RDAServiceGrpc;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
Expand Down Expand Up @@ -62,6 +63,16 @@ public abstract GrpcResponseStream<TResponse> callService(
ManagedChannel channel, CallOptions callOptions, long startingSequenceNumber)
throws Exception;

/**
* Calls the service to get the sequence number range.
*
* @param channel an already open channel to the service being called
* @param callOptions the CallOptions object to use for the API call
* @return sequence number range
*/
public abstract ClaimSequenceNumberRange callSequenceNumberRangeService(
ManagedChannel channel, CallOptions callOptions);

/**
* Make a call to the server's {@code getVersion()} service and return the version component. Will
* retry several times if the call fails. Retries allow the job to handle with a race condition
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package gov.cms.bfd.pipeline.rda.grpc.source;

import com.google.common.base.Preconditions;
import com.google.protobuf.Empty;
import gov.cms.mpsm.rda.v1.ClaimRequest;
import gov.cms.mpsm.rda.v1.ClaimSequenceNumberRange;
import gov.cms.mpsm.rda.v1.McsClaimChange;
import gov.cms.mpsm.rda.v1.RDAServiceGrpc;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -44,4 +46,13 @@ public GrpcResponseStream<McsClaimChange> callService(
ClientCalls.blockingServerStreamingCall(call, request);
return new GrpcResponseStream<>(call, apiResults);
}

@Override
public ClaimSequenceNumberRange callSequenceNumberRangeService(
ManagedChannel channel, CallOptions callOptions) {
final MethodDescriptor<Empty, ClaimSequenceNumberRange> method =
RDAServiceGrpc.getGetMcsClaimsSequenceNumberRangeMethod();
final ClientCall<Empty, ClaimSequenceNumberRange> call = channel.newCall(method, callOptions);
return ClientCalls.blockingUnaryCall(call, Empty.getDefaultInstance());
}
}
Loading

0 comments on commit eca4638

Please sign in to comment.