Skip to content

Commit

Permalink
BFD-3808: Pipeline submits its own per-dataset metrics to CloudWatch (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
malessi authored Jan 27, 2025
1 parent abd1f22 commit 2798b52
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ public final class AppConfiguration extends BaseAppConfiguration {
Set.of(
"FissClaimRdaSink.change.latency.millis",
"McsClaimRdaSink.change.latency.millis",
CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME,
DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ private PipelineOutcome createJobsAndRunPipeline(

PipelineOutcome pipelineOutcome = pipelineManager.awaitCompletion();

// Ensures that any CloudWatch metrics are published prior to the stop of the Pipeline
appMeters.close();

if (pipelineManager.getError() != null) {
throw new FatalAppException(
"Pipeline job threw exception", pipelineManager.getError(), EXIT_CODE_JOB_FAILED);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gov.cms.bfd.pipeline.ccw.rif;

import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import gov.cms.bfd.model.rif.RifFilesEvent;
import gov.cms.bfd.model.rif.entities.S3DataFile;
import gov.cms.bfd.model.rif.entities.S3ManifestFile;
Expand All @@ -27,7 +28,9 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -223,6 +226,14 @@ public PipelineJobOutcome call() throws Exception {
final Set<Instant> finalManifestTimestamps =
getTimestampsFromManifestLists(finalManifestLists);

// This is a failsafe in the possible case where the final manifest of a dataset was loaded
// but the final manifest list had not yet arrived. In that case the timers started for these
// dataset(s) were never stopped, and so we should make sure they're stopped now
finalManifestLists.stream()
.filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests()))
.map(FinalManifestList::getTimestampText)
.forEach(dataset -> loadJobMetrics.stopTimersForDataset(dataset, false));

listener.noDataAvailable();
statusReporter.reportNothingToDo();
// Ensure all manifests from the manifest lists are accounted for and completed.
Expand Down Expand Up @@ -345,16 +356,35 @@ public PipelineJobOutcome call() throws Exception {
* processing multiple data sets in parallel (which would lead to data
* consistency problems).
*/
final var activeTimer =
loadJobMetrics.createActiveTimerForManifest(manifestToProcess).start();
final var totalTimer = Timer.start();
loadJobMetrics.startTimersForDataset(
manifestToProcess.getTimestampText(), manifestToProcess.isSyntheticData());
loadJobMetrics.startTimersForManifest(manifestToProcess);
statusReporter.reportProcessingManifestData(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsStarted(manifestRecord);
listener.dataAvailable(rifFilesEvent);
statusReporter.reportCompletedManifest(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsProcessed(manifestRecord);
activeTimer.stop();
totalTimer.stop(loadJobMetrics.createTotalTimerForManifest(manifestToProcess));
loadJobMetrics.stopTimersForManifest(manifestToProcess);
if (!manifestToProcess.isSyntheticData()) {
// Non-synthetic datasets are typically one manifest to one RIF, so we need to look for
// the final manifest list that corresponds to the just-loaded manifest and ensure, via the
// database, that the dataset associated with the manifest that was just loaded is
// fully complete before submitting dataset metrics. If there's no final manifest list, no
// corresponding list, or the database indicates not all manifests are loaded, the timers
// will not be stopped as the dataset has not completed loading. Note that there is an edge
// case if the current manifest was the last to be loaded for a dataset but the final
// manifest list has not yet arrived. There is a failsafe above for this possibility
dataSetQueue.readFinalManifestLists().stream()
.filter(l -> l.getTimestampText().equals(manifestToProcess.getTimestampText()))
.filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests()))
.map(FinalManifestList::getTimestampText)
.forEach(dataset -> loadJobMetrics.stopTimersForDataset(dataset, false));
} else {
// Synthetic datasets contain only a single manifest, so if the currently loading manifest
// is synthetic we can stop the dataset timers immediately after it has loaded
loadJobMetrics.stopTimersForDataset(manifestToProcess.getTimestampText(), true);
}

LOGGER.info(LOG_MESSAGE_DATA_SET_COMPLETE);

/*
Expand Down Expand Up @@ -556,6 +586,19 @@ private boolean isProcessingRequired(S3DataFile dataFileRecord) {
/** Micrometer metrics and helpers for measuring {@link CcwRifLoadJob} operations. */
@RequiredArgsConstructor
public static final class Metrics {
/**
* Name of the per-dataset {@link LongTaskTimer}s that actively, at each Micrometer reporting
* interval, records and reports the duration of processing of a given dataset.
*/
public static final String DATASET_PROCESSING_ACTIVE_TIMER_NAME =
String.format("%s.dataset_processing.active", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-dataset {@link Timer}s that report the final duration of processing once the
* dataset is processed.
*/
public static final String DATASET_PROCESSING_TOTAL_TIMER_NAME =
String.format("%s.dataset_processing.total", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-{@link DataSetManifest} {@link LongTaskTimer}s that actively, at each
Expand All @@ -575,53 +618,145 @@ public static final class Metrics {
/**
* Tag indicating which data set (identified by its timestamp in S3) a given metric measured.
*/
private static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp";
@VisibleForTesting static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp";

/**
* Tag indicating whether the data load associated with the measured metric was synthetic or
* not.
*/
private static final String TAG_IS_SYNTHETIC = "is_synthetic";
@VisibleForTesting static final String TAG_IS_SYNTHETIC = "is_synthetic";

/** Tag indicating which {@link DataSetManifest} was associated with the measured metric. */
private static final String TAG_MANIFEST = "manifest";
@VisibleForTesting static final String TAG_MANIFEST = "manifest";

/** Micrometer {@link MeterRegistry} for the Pipeline application. */
private final MeterRegistry appMetrics;

/** Map of a {@link DataSetManifest} to its active {@link ManifestTimerSet} timer metrics. */
private final Map<DataSetManifest, ManifestTimerSet> activeManifestTimersMap = new HashMap<>();

/** Map of a dataset to its active {@link DatasetTimerSet} timer metrics. */
private final Map<String, DatasetTimerSet> activeDatasetTimersMap = new HashMap<>();

/**
* Starts the active and total processing time timers for a {@link DataSetManifest} that is
* beginning to be processed. Will not start new timers if the {@link DataSetManifest} is
* already being timed.
*
* @param manifest the {@link DataSetManifest} to measure processing time for
*/
void startTimersForManifest(DataSetManifest manifest) {
activeManifestTimersMap.computeIfAbsent(
manifest,
key -> new ManifestTimerSet(createActiveTimerForManifest(key).start(), Timer.start()));
}

/**
* Stops the active and total processing time timers for a {@link DataSetManifest}, if they
* exist.
*
* @param manifest the {@link DataSetManifest} for which its started timers will be stopped
*/
void stopTimersForManifest(DataSetManifest manifest) {
if (!activeManifestTimersMap.containsKey(manifest)) return;

final var manifestTimers = activeManifestTimersMap.get(manifest);
manifestTimers.activeTimer.stop();
manifestTimers.totalTimer.stop(createTotalTimerForManifest(manifest));
}

/**
* Starts the active and total processing time timers for a dataset that is beginning to be
* processed. Will not start new timers if the dataset is already being timed.
*
* @param datasetTimestampText the dataset to measure processing time for
* @param isSynthetic whether the dataset is synthetic
*/
void startTimersForDataset(String datasetTimestampText, boolean isSynthetic) {
activeDatasetTimersMap.computeIfAbsent(
datasetTimestampText,
key ->
new DatasetTimerSet(
createActiveTimerForDataset(key, isSynthetic).start(), Timer.start()));
}

/**
* Stops the active and total processing time timers for a {@link DataSetManifest}, if they
* exist.
*
* @param datasetTimestampText the dataset for which its processing time timers will be stopped
* @param isSynthetic whether the dataset is synthetic
*/
void stopTimersForDataset(String datasetTimestampText, boolean isSynthetic) {
if (!activeDatasetTimersMap.containsKey(datasetTimestampText)) return;

final var datasetTimers = activeDatasetTimersMap.get(datasetTimestampText);
datasetTimers.activeTimer.stop();
datasetTimers.totalTimer.stop(createTotalTimerForDataset(datasetTimestampText, isSynthetic));
}

/**
* Creates a {@link LongTaskTimer} for a given {@link DataSetManifest} so that the time it takes
* to process the manifest can be measured and recorded while processing is ongoing. Should be
* called prior to processing a {@link DataSetManifest}.
* to process the manifest can be measured and recorded while processing is ongoing.
*
* @param manifest the {@link DataSetManifest} to time
* @return the {@link LongTaskTimer} that will be used to actively measure and record the time
* taken to load the {@link DataSetManifest}
*/
LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
private LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
return LongTaskTimer.builder(MANIFEST_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTags(manifest))
.tags(getTagsForManifestMetrics(manifest))
.register(appMetrics);
}

/**
* Creates a {@link Timer} for a given {@link DataSetManifest} so that the total time it takes
* to process the manifest can be recorded. Should be used with {@link Timer.Sample#stop(Timer)}
* after processing a {@link DataSetManifest} to record the total duration.
* to process the manifest can be recorded.
*
* @param manifest the {@link DataSetManifest} to time
* @return the {@link LongTaskTimer} that will be used to record the total time taken to load
* the {@link DataSetManifest}
*/
Timer createTotalTimerForManifest(DataSetManifest manifest) {
private Timer createTotalTimerForManifest(DataSetManifest manifest) {
return Timer.builder(MANIFEST_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTags(manifest))
.tags(getTagsForManifestMetrics(manifest))
.register(appMetrics);
}

/**
* Creates an "active" {@link LongTaskTimer} for the provided dataset so that the running time
* it takes to process the dataset can be recorded.
*
* @param datasetTimestamp the timestamp text of the dataset to time
* @param isSynthetic whether the dataset is synthetic
* @return the {@link LongTaskTimer} that will be used to actively record the time it is taking
* to processing the dataset
*/
private LongTaskTimer createActiveTimerForDataset(
String datasetTimestamp, boolean isSynthetic) {
return LongTaskTimer.builder(DATASET_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
}

/**
* Creates a {@link Timer} for a given dataset so that the total time it takes to process the
* dataset can be recorded.
*
* @param datasetTimestamp the dataset to record the total processing time for
* @param isSynthetic whether the dataset is synthetic
* @return the {@link Timer} that will be used to record the total time taken to load the
* dataset
*/
private Timer createTotalTimerForDataset(String datasetTimestamp, boolean isSynthetic) {
return Timer.builder(DATASET_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* based on its corresponding {@link DataSetManifest}.
* for a manifest based on its corresponding {@link DataSetManifest}.
*
* @param manifest {@link DataSetManifest} from which the values of {@link
* DataSetManifest#getTimestampText()}, {@link DataSetManifest#isSyntheticData()} and {@link
Expand All @@ -630,7 +765,7 @@ Timer createTotalTimerForManifest(DataSetManifest manifest) {
* Tag}s, respectively
* @return a {@link List} of {@link Tag}s including relevant information from {@code manifest}
*/
private List<Tag> getTags(DataSetManifest manifest) {
private List<Tag> getTagsForManifestMetrics(DataSetManifest manifest) {
final var manifestFullpath = manifest.getIncomingS3Key();
final var manifestFilename =
manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1);
Expand All @@ -639,5 +774,40 @@ private List<Tag> getTags(DataSetManifest manifest) {
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())),
Tag.of(TAG_MANIFEST, manifestFilename));
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* for a dataset based on its corresponding dataset's timestamp text and whether its synthetic.
*
* @param datasetTimestamp the timestamp text of the dataset
* @param isSynthetic whether the dataset is synthetic
* @return a {@link List} of {@link Tag}s including the dataset's timestamp text and whether it
* is synthetic
*/
private List<Tag> getTagsForDatasetMetrics(String datasetTimestamp, boolean isSynthetic) {
return List.of(
Tag.of(TAG_DATA_SET_TIMESTAMP, datasetTimestamp),
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(isSynthetic)));
}

/**
* A set of started timer metrics for a {@link DataSetManifest}.
*
* @param activeTimer a {@link LongTaskTimer.Sample} that is actively timing the processing time
* of the manifest
* @param totalTimer a {@link Timer.Sample} that will time the total time it takes to process
* the manifest
*/
private record ManifestTimerSet(LongTaskTimer.Sample activeTimer, Timer.Sample totalTimer) {}

/**
* A set of started timer metrics for a dataset.
*
* @param activeTimer a {@link LongTaskTimer.Sample} that is actively timing the processing time
* of the dataset
* @param totalTimer a {@link Timer.Sample} that will time the total time it takes to process
* the dataset
*/
private record DatasetTimerSet(LongTaskTimer.Sample activeTimer, Timer.Sample totalTimer) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public class FinalManifestList {
/** Contained list of manifests. */
private final Set<String> manifests;

/** Timestamp text extracted from the S3 prefix. */
private final String timestampText;

/** Timestamp from the S3 prefix. */
private final Instant timestamp;

Expand All @@ -28,8 +31,9 @@ public FinalManifestList(byte[] downloadedFileContent, String key) {
String prefix = key.substring(0, key.lastIndexOf('/'));

String[] components = prefix.split("/");
timestamp = Instant.parse(components[components.length - 1]);
manifests =
this.timestampText = components[components.length - 1];
this.timestamp = Instant.parse(this.timestampText);
this.manifests =
Arrays.stream(fileString.split("\n"))
.filter(l -> !l.isBlank())
.map(l -> prefix + "/" + l)
Expand Down
Loading

0 comments on commit 2798b52

Please sign in to comment.