Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
aschey-forpeople committed Jan 17, 2025
2 parents 96392d3 + ecfc0ae commit 5a571ac
Show file tree
Hide file tree
Showing 59 changed files with 1,343 additions and 84 deletions.
2 changes: 1 addition & 1 deletion apps/bfd-data-fda/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<groupId>gov.cms.bfd.data.fda.utility</groupId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-data-npi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<groupId>gov.cms.bfd.data.npi</groupId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-db-migrator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-db-migrator</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-model/bfd-model-codebook-data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-codebook-data</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-model/bfd-model-codebook-library/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-codebook-library</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-model/bfd-model-codebook-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-codebook-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-dsl-codegen-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-dsl-codegen-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-dsl-codegen-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-dsl-codegen-plugin</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
<packaging>maven-plugin</packaging>

<description>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-model/bfd-model-dsl-codegen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-dsl-codegen-parent</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-model/bfd-model-rda/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-rda</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-model/bfd-model-rif-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-rif-samples</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-model/bfd-model-rif/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-model-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-rif</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-model-parent</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-ops/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-ops</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-pipeline/bfd-pipeline-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-pipeline-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-pipeline-app</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gov.cms.bfd.model.rda.MessageError;
import gov.cms.bfd.model.rif.RifFileType;
import gov.cms.bfd.model.rif.RifRecordEvent;
import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadJob;
import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadOptions;
import gov.cms.bfd.pipeline.ccw.rif.extract.ExtractionOptions;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetManifest;
Expand Down Expand Up @@ -373,7 +374,13 @@ public final class AppConfiguration extends BaseAppConfiguration {
* auto-generated aggregate metric names with suffixes like {@code .avg}.
*/
public static final Set<String> MICROMETER_CW_ALLOWED_METRIC_NAMES =
Set.of("FissClaimRdaSink.change.latency.millis", "McsClaimRdaSink.change.latency.millis");
Set.of(
"FissClaimRdaSink.change.latency.millis",
"McsClaimRdaSink.change.latency.millis",
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME,
DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME,
DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_TOTAL_TIMER_NAME);

/**
* The CCW rif load options. This can be null if the CCW job is not configured, Optional is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadJob;
import gov.cms.bfd.pipeline.ccw.rif.extract.RifFileRecords;
import gov.cms.bfd.pipeline.ccw.rif.extract.RifFilesProcessor;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetManifest;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetMonitorListener;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.S3RifFile;
import gov.cms.bfd.pipeline.ccw.rif.load.RifLoader;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +37,9 @@ public final class DefaultDataSetMonitorListener implements DataSetMonitorListen
/** Metrics for this class. */
private final MetricRegistry appMetrics;

/** Micrometer metrics for this class. */
private final Metrics metrics;

/** Handles processing of new RIF files. */
private final RifFilesProcessor rifProcessor;

Expand All @@ -40,14 +50,19 @@ public final class DefaultDataSetMonitorListener implements DataSetMonitorListen
* Initializes the instance.
*
* @param appMetrics the {@link MetricRegistry} for the application
* @param micrometerMetrics the {@link MeterRegistry} for the application
* @param rifProcessor the {@link RifFilesProcessor} for the application
* @param rifLoader the {@link RifLoader} for the application
*/
DefaultDataSetMonitorListener(
MetricRegistry appMetrics, RifFilesProcessor rifProcessor, RifLoader rifLoader) {
MetricRegistry appMetrics,
MeterRegistry micrometerMetrics,
RifFilesProcessor rifProcessor,
RifLoader rifLoader) {
this.appMetrics = appMetrics;
this.rifProcessor = rifProcessor;
this.rifLoader = rifLoader;
this.metrics = new Metrics(micrometerMetrics);
}

@Override
Expand All @@ -66,6 +81,10 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception {
Slf4jReporter.forRegistry(rifFileEvent.getEventMetrics()).outputTo(LOGGER).build();
dataSetFileMetricsReporter.start(2, TimeUnit.MINUTES);

final LongTaskTimer.Sample activeTimer = metrics.createActiveTimerForRif(rifFile).start();
final io.micrometer.core.instrument.Timer.Sample totalTimer =
io.micrometer.core.instrument.Timer.start();

try {
LOGGER.info("Processing file {}", rifFile.getDisplayName());
rifFile.markAsStarted();
Expand All @@ -82,6 +101,9 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception {
failure = e;
}

activeTimer.stop();
totalTimer.stop(metrics.createTotalTimerForRif(rifFile));

dataSetFileMetricsReporter.stop();
dataSetFileMetricsReporter.report();

Expand All @@ -104,4 +126,110 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception {
public void noDataAvailable() {
// Nothing to do here.
}

/** Metrics for the {@link DefaultDataSetMonitorListener}'s operations. */
@RequiredArgsConstructor
public static final class Metrics {
/**
* Name of the per-{@link RifFile} data processing {@link LongTaskTimer}s that actively, at each
* Micrometer reporting interval, records and reports the duration of processing of a given
* {@link RifFile}.
*
* @implNote We use the class name of {@link CcwRifLoadJob} as the metric prefix instead of
* {@link DefaultDataSetMonitorListener} as there are other CCW RIF-related metrics
* generated from the {@link CcwRifLoadJob}. Additionally, {@link
* DefaultDataSetMonitorListener} is indirectly invoked by {@link CcwRifLoadJob}
*/
public static final String RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME =
String.format("%s.rif_file_processing.active", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-{@link RifFile} data processing {@link Timer}s that report the final duration
* of processing once the {@link RifFile} is processed.
*
* @implNote We use the class name of {@link CcwRifLoadJob} as the metric prefix instead of
* {@link DefaultDataSetMonitorListener} as there are other CCW RIF-related metrics
* generated from the {@link CcwRifLoadJob}. Additionally, {@link
* DefaultDataSetMonitorListener} is indirectly invoked by {@link CcwRifLoadJob}
*/
public static final String RIF_FILE_PROCESSING_TOTAL_TIMER_NAME =
String.format("%s.rif_file_processing.total", CcwRifLoadJob.class.getSimpleName());

/**
* Tag indicating which data set (identified by its timestamp in S3) a given metric measured.
*/
private static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp";

/** Tag indicating which RIF file a given metric measured. */
private static final String TAG_RIF_FILE = "rif_file";

/**
* Tag indicating whether the data load associated with the measured metric was synthetic or
* not.
*/
private static final String TAG_IS_SYNTHETIC = "is_synthetic";

/** Tag indicating which {@link DataSetManifest} was associated with the measured metric. */
private static final String TAG_MANIFEST = "manifest";

/** Micrometer {@link MeterRegistry} for the Pipeline application. */
private final MeterRegistry micrometerMetrics;

/**
* Creates a {@link LongTaskTimer} for a given {@link RifFile} so that the time it takes to
* process the RIF can be measured while processing is ongoing. Should be called prior to
* processing a {@link RifFile}.
*
* @param rifFile the {@link RifFile} to time
* @return the {@link LongTaskTimer} that will be used to measure the ongoing load time of the
* {@link RifFile}
*/
LongTaskTimer createActiveTimerForRif(RifFile rifFile) {
return LongTaskTimer.builder(RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTags(rifFile))
.register(micrometerMetrics);
}

/**
* Creates a {@link io.micrometer.core.instrument.Timer} for a given {@link RifFile} so that the
* total time it takes to process the RIF can be recorded once a {@link RifFile} is done
* processing. Should be used with {@link
* io.micrometer.core.instrument.Timer.Sample#stop(io.micrometer.core.instrument.Timer)} after
* processing a {@link RifFile} to record the total duration.
*
* @param rifFile the {@link RifFile} to time
* @return the {@link LongTaskTimer} that will be used to measure the total time taken to load
* the {@link RifFile}
*/
io.micrometer.core.instrument.Timer createTotalTimerForRif(RifFile rifFile) {
return io.micrometer.core.instrument.Timer.builder(RIF_FILE_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTags(rifFile))
.register(micrometerMetrics);
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* based on its corresponding {@link DataSetManifest}.
*
* @param rifFile {@link RifFile} from which several properties will be used to set relevant
* {@link Tag}s
* @return a {@link List} of {@link Tag}s including relevant information from {@code rifFile}
*/
private List<Tag> getTags(RifFile rifFile) {
final var rifFileTag = Tag.of(TAG_RIF_FILE, rifFile.getFileType().name().toLowerCase());
if (rifFile instanceof S3RifFile s3RifFile) {
final var manifest = s3RifFile.getManifestEntry().getParentManifest();
final var manifestFullpath = manifest.getIncomingS3Key();
final var manifestFilename =
manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1);
return List.of(
Tag.of(TAG_DATA_SET_TIMESTAMP, manifest.getTimestampText()),
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())),
rifFileTag,
Tag.of(TAG_MANIFEST, manifestFilename));
}

return List.of(rifFileTag);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,8 @@ private PipelineJob createCcwRifLoadJob(
* each data set that is found.
*/
DataSetMonitorListener dataSetMonitorListener =
new DefaultDataSetMonitorListener(appState.getMetrics(), rifProcessor, rifLoader);
new DefaultDataSetMonitorListener(
appState.getMetrics(), appState.getMeters(), rifProcessor, rifLoader);
var s3Factory = new AwsS3ClientFactory(loadOptions.getExtractionOptions().getS3ClientConfig());
// Tell SQ it's ok not to use try-finally here since this will be closed by the CcwRifLoadJob.
@SuppressWarnings("java:S2095")
Expand Down
2 changes: 1 addition & 1 deletion apps/bfd-pipeline/bfd-pipeline-ccw-rif/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>gov.cms.bfd</groupId>
<artifactId>bfd-pipeline-parent</artifactId>
<version>2.172.0-SNAPSHOT</version>
<version>2.175.0-SNAPSHOT</version>
</parent>

<artifactId>bfd-pipeline-ccw-rif</artifactId>
Expand Down
Loading

0 comments on commit 5a571ac

Please sign in to comment.