From 2798b5242227f3979e40a686beb77a35323235b8 Mon Sep 17 00:00:00 2001 From: Mitchell Alessio <5306896+malessi@users.noreply.github.com> Date: Mon, 27 Jan 2025 12:15:46 -0500 Subject: [PATCH] BFD-3808: Pipeline submits its own per-dataset metrics to CloudWatch (#2524) --- .../bfd/pipeline/app/AppConfiguration.java | 2 + .../bfd/pipeline/app/PipelineApplication.java | 3 + .../bfd/pipeline/ccw/rif/CcwRifLoadJob.java | 206 ++++++++++++++-- .../ccw/rif/extract/s3/FinalManifestList.java | 8 +- .../bfd/pipeline/ccw/rif/CcwRifLoadJobIT.java | 221 ++++++++++++++++++ 5 files changed, 420 insertions(+), 20 deletions(-) diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java index be3aaa1fc8..919404f418 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java @@ -377,6 +377,8 @@ public final class AppConfiguration extends BaseAppConfiguration { Set.of( "FissClaimRdaSink.change.latency.millis", "McsClaimRdaSink.change.latency.millis", + CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME, + CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME, CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME, CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME, DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME, diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java index 4bb6a61c4d..6693165fc9 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java @@ -347,6 +347,9 @@ private PipelineOutcome createJobsAndRunPipeline( PipelineOutcome pipelineOutcome = pipelineManager.awaitCompletion(); + // Ensures that any CloudWatch metrics are published prior to the stop of the Pipeline + appMeters.close(); + if (pipelineManager.getError() != null) { throw new FatalAppException( "Pipeline job threw exception", pipelineManager.getError(), EXIT_CODE_JOB_FAILED); diff --git a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java index 024b91a3a1..49aef1b75f 100644 --- a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java +++ b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java @@ -1,6 +1,7 @@ package gov.cms.bfd.pipeline.ccw.rif; import com.codahale.metrics.MetricRegistry; +import com.google.common.annotations.VisibleForTesting; import gov.cms.bfd.model.rif.RifFilesEvent; import gov.cms.bfd.model.rif.entities.S3DataFile; import gov.cms.bfd.model.rif.entities.S3ManifestFile; @@ -27,7 +28,9 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -223,6 +226,14 @@ public PipelineJobOutcome call() throws Exception { final Set finalManifestTimestamps = getTimestampsFromManifestLists(finalManifestLists); + // This is a failsafe in the possible case where the final manifest of a dataset was loaded + // but the final manifest list had not yet arrived. In that case the timers started for these + // dataset(s) were never stopped, and so we should make sure they're stopped now + finalManifestLists.stream() + .filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests())) + .map(FinalManifestList::getTimestampText) + .forEach(dataset -> loadJobMetrics.stopTimersForDataset(dataset, false)); + listener.noDataAvailable(); statusReporter.reportNothingToDo(); // Ensure all manifests from the manifest lists are accounted for and completed. @@ -345,16 +356,35 @@ public PipelineJobOutcome call() throws Exception { * processing multiple data sets in parallel (which would lead to data * consistency problems). */ - final var activeTimer = - loadJobMetrics.createActiveTimerForManifest(manifestToProcess).start(); - final var totalTimer = Timer.start(); + loadJobMetrics.startTimersForDataset( + manifestToProcess.getTimestampText(), manifestToProcess.isSyntheticData()); + loadJobMetrics.startTimersForManifest(manifestToProcess); statusReporter.reportProcessingManifestData(manifestToProcess.getIncomingS3Key()); dataSetQueue.markAsStarted(manifestRecord); listener.dataAvailable(rifFilesEvent); statusReporter.reportCompletedManifest(manifestToProcess.getIncomingS3Key()); dataSetQueue.markAsProcessed(manifestRecord); - activeTimer.stop(); - totalTimer.stop(loadJobMetrics.createTotalTimerForManifest(manifestToProcess)); + loadJobMetrics.stopTimersForManifest(manifestToProcess); + if (!manifestToProcess.isSyntheticData()) { + // Non-synthetic datasets are typically one manifest to one RIF, so we need to look for + // the final manifest list that corresponds to the just-loaded manifest and ensure, via the + // database, that the dataset associated with the manifest that was just loaded is + // fully complete before submitting dataset metrics. If there's no final manifest list, no + // corresponding list, or the database indicates not all manifests are loaded, the timers + // will not be stopped as the dataset has not completed loading. Note that there is an edge + // case if the current manifest was the last to be loaded for a dataset but the final + // manifest list has not yet arrived. There is a failsafe above for this possibility + dataSetQueue.readFinalManifestLists().stream() + .filter(l -> l.getTimestampText().equals(manifestToProcess.getTimestampText())) + .filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests())) + .map(FinalManifestList::getTimestampText) + .forEach(dataset -> loadJobMetrics.stopTimersForDataset(dataset, false)); + } else { + // Synthetic datasets contain only a single manifest, so if the currently loading manifest + // is synthetic we can stop the dataset timers immediately after it has loaded + loadJobMetrics.stopTimersForDataset(manifestToProcess.getTimestampText(), true); + } + LOGGER.info(LOG_MESSAGE_DATA_SET_COMPLETE); /* @@ -556,6 +586,19 @@ private boolean isProcessingRequired(S3DataFile dataFileRecord) { /** Micrometer metrics and helpers for measuring {@link CcwRifLoadJob} operations. */ @RequiredArgsConstructor public static final class Metrics { + /** + * Name of the per-dataset {@link LongTaskTimer}s that actively, at each Micrometer reporting + * interval, records and reports the duration of processing of a given dataset. + */ + public static final String DATASET_PROCESSING_ACTIVE_TIMER_NAME = + String.format("%s.dataset_processing.active", CcwRifLoadJob.class.getSimpleName()); + + /** + * Name of the per-dataset {@link Timer}s that report the final duration of processing once the + * dataset is processed. + */ + public static final String DATASET_PROCESSING_TOTAL_TIMER_NAME = + String.format("%s.dataset_processing.total", CcwRifLoadJob.class.getSimpleName()); /** * Name of the per-{@link DataSetManifest} {@link LongTaskTimer}s that actively, at each @@ -575,53 +618,145 @@ public static final class Metrics { /** * Tag indicating which data set (identified by its timestamp in S3) a given metric measured. */ - private static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp"; + @VisibleForTesting static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp"; /** * Tag indicating whether the data load associated with the measured metric was synthetic or * not. */ - private static final String TAG_IS_SYNTHETIC = "is_synthetic"; + @VisibleForTesting static final String TAG_IS_SYNTHETIC = "is_synthetic"; /** Tag indicating which {@link DataSetManifest} was associated with the measured metric. */ - private static final String TAG_MANIFEST = "manifest"; + @VisibleForTesting static final String TAG_MANIFEST = "manifest"; /** Micrometer {@link MeterRegistry} for the Pipeline application. */ private final MeterRegistry appMetrics; + /** Map of a {@link DataSetManifest} to its active {@link ManifestTimerSet} timer metrics. */ + private final Map activeManifestTimersMap = new HashMap<>(); + + /** Map of a dataset to its active {@link DatasetTimerSet} timer metrics. */ + private final Map activeDatasetTimersMap = new HashMap<>(); + + /** + * Starts the active and total processing time timers for a {@link DataSetManifest} that is + * beginning to be processed. Will not start new timers if the {@link DataSetManifest} is + * already being timed. + * + * @param manifest the {@link DataSetManifest} to measure processing time for + */ + void startTimersForManifest(DataSetManifest manifest) { + activeManifestTimersMap.computeIfAbsent( + manifest, + key -> new ManifestTimerSet(createActiveTimerForManifest(key).start(), Timer.start())); + } + + /** + * Stops the active and total processing time timers for a {@link DataSetManifest}, if they + * exist. + * + * @param manifest the {@link DataSetManifest} for which its started timers will be stopped + */ + void stopTimersForManifest(DataSetManifest manifest) { + if (!activeManifestTimersMap.containsKey(manifest)) return; + + final var manifestTimers = activeManifestTimersMap.get(manifest); + manifestTimers.activeTimer.stop(); + manifestTimers.totalTimer.stop(createTotalTimerForManifest(manifest)); + } + + /** + * Starts the active and total processing time timers for a dataset that is beginning to be + * processed. Will not start new timers if the dataset is already being timed. + * + * @param datasetTimestampText the dataset to measure processing time for + * @param isSynthetic whether the dataset is synthetic + */ + void startTimersForDataset(String datasetTimestampText, boolean isSynthetic) { + activeDatasetTimersMap.computeIfAbsent( + datasetTimestampText, + key -> + new DatasetTimerSet( + createActiveTimerForDataset(key, isSynthetic).start(), Timer.start())); + } + + /** + * Stops the active and total processing time timers for a {@link DataSetManifest}, if they + * exist. + * + * @param datasetTimestampText the dataset for which its processing time timers will be stopped + * @param isSynthetic whether the dataset is synthetic + */ + void stopTimersForDataset(String datasetTimestampText, boolean isSynthetic) { + if (!activeDatasetTimersMap.containsKey(datasetTimestampText)) return; + + final var datasetTimers = activeDatasetTimersMap.get(datasetTimestampText); + datasetTimers.activeTimer.stop(); + datasetTimers.totalTimer.stop(createTotalTimerForDataset(datasetTimestampText, isSynthetic)); + } + /** * Creates a {@link LongTaskTimer} for a given {@link DataSetManifest} so that the time it takes - * to process the manifest can be measured and recorded while processing is ongoing. Should be - * called prior to processing a {@link DataSetManifest}. + * to process the manifest can be measured and recorded while processing is ongoing. * * @param manifest the {@link DataSetManifest} to time * @return the {@link LongTaskTimer} that will be used to actively measure and record the time * taken to load the {@link DataSetManifest} */ - LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) { + private LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) { return LongTaskTimer.builder(MANIFEST_PROCESSING_ACTIVE_TIMER_NAME) - .tags(getTags(manifest)) + .tags(getTagsForManifestMetrics(manifest)) .register(appMetrics); } /** * Creates a {@link Timer} for a given {@link DataSetManifest} so that the total time it takes - * to process the manifest can be recorded. Should be used with {@link Timer.Sample#stop(Timer)} - * after processing a {@link DataSetManifest} to record the total duration. + * to process the manifest can be recorded. * * @param manifest the {@link DataSetManifest} to time * @return the {@link LongTaskTimer} that will be used to record the total time taken to load * the {@link DataSetManifest} */ - Timer createTotalTimerForManifest(DataSetManifest manifest) { + private Timer createTotalTimerForManifest(DataSetManifest manifest) { return Timer.builder(MANIFEST_PROCESSING_TOTAL_TIMER_NAME) - .tags(getTags(manifest)) + .tags(getTagsForManifestMetrics(manifest)) + .register(appMetrics); + } + + /** + * Creates an "active" {@link LongTaskTimer} for the provided dataset so that the running time + * it takes to process the dataset can be recorded. + * + * @param datasetTimestamp the timestamp text of the dataset to time + * @param isSynthetic whether the dataset is synthetic + * @return the {@link LongTaskTimer} that will be used to actively record the time it is taking + * to processing the dataset + */ + private LongTaskTimer createActiveTimerForDataset( + String datasetTimestamp, boolean isSynthetic) { + return LongTaskTimer.builder(DATASET_PROCESSING_ACTIVE_TIMER_NAME) + .tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic)) + .register(appMetrics); + } + + /** + * Creates a {@link Timer} for a given dataset so that the total time it takes to process the + * dataset can be recorded. + * + * @param datasetTimestamp the dataset to record the total processing time for + * @param isSynthetic whether the dataset is synthetic + * @return the {@link Timer} that will be used to record the total time taken to load the + * dataset + */ + private Timer createTotalTimerForDataset(String datasetTimestamp, boolean isSynthetic) { + return Timer.builder(DATASET_PROCESSING_TOTAL_TIMER_NAME) + .tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic)) .register(appMetrics); } /** * Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric - * based on its corresponding {@link DataSetManifest}. + * for a manifest based on its corresponding {@link DataSetManifest}. * * @param manifest {@link DataSetManifest} from which the values of {@link * DataSetManifest#getTimestampText()}, {@link DataSetManifest#isSyntheticData()} and {@link @@ -630,7 +765,7 @@ Timer createTotalTimerForManifest(DataSetManifest manifest) { * Tag}s, respectively * @return a {@link List} of {@link Tag}s including relevant information from {@code manifest} */ - private List getTags(DataSetManifest manifest) { + private List getTagsForManifestMetrics(DataSetManifest manifest) { final var manifestFullpath = manifest.getIncomingS3Key(); final var manifestFilename = manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1); @@ -639,5 +774,40 @@ private List getTags(DataSetManifest manifest) { Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())), Tag.of(TAG_MANIFEST, manifestFilename)); } + + /** + * Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric + * for a dataset based on its corresponding dataset's timestamp text and whether its synthetic. + * + * @param datasetTimestamp the timestamp text of the dataset + * @param isSynthetic whether the dataset is synthetic + * @return a {@link List} of {@link Tag}s including the dataset's timestamp text and whether it + * is synthetic + */ + private List getTagsForDatasetMetrics(String datasetTimestamp, boolean isSynthetic) { + return List.of( + Tag.of(TAG_DATA_SET_TIMESTAMP, datasetTimestamp), + Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(isSynthetic))); + } + + /** + * A set of started timer metrics for a {@link DataSetManifest}. + * + * @param activeTimer a {@link LongTaskTimer.Sample} that is actively timing the processing time + * of the manifest + * @param totalTimer a {@link Timer.Sample} that will time the total time it takes to process + * the manifest + */ + private record ManifestTimerSet(LongTaskTimer.Sample activeTimer, Timer.Sample totalTimer) {} + + /** + * A set of started timer metrics for a dataset. + * + * @param activeTimer a {@link LongTaskTimer.Sample} that is actively timing the processing time + * of the dataset + * @param totalTimer a {@link Timer.Sample} that will time the total time it takes to process + * the dataset + */ + private record DatasetTimerSet(LongTaskTimer.Sample activeTimer, Timer.Sample totalTimer) {} } } diff --git a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/extract/s3/FinalManifestList.java b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/extract/s3/FinalManifestList.java index 7e153d4eb6..ce2e306b4d 100644 --- a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/extract/s3/FinalManifestList.java +++ b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/extract/s3/FinalManifestList.java @@ -13,6 +13,9 @@ public class FinalManifestList { /** Contained list of manifests. */ private final Set manifests; + /** Timestamp text extracted from the S3 prefix. */ + private final String timestampText; + /** Timestamp from the S3 prefix. */ private final Instant timestamp; @@ -28,8 +31,9 @@ public FinalManifestList(byte[] downloadedFileContent, String key) { String prefix = key.substring(0, key.lastIndexOf('/')); String[] components = prefix.split("/"); - timestamp = Instant.parse(components[components.length - 1]); - manifests = + this.timestampText = components[components.length - 1]; + this.timestamp = Instant.parse(this.timestampText); + this.manifests = Arrays.stream(fileString.split("\n")) .filter(l -> !l.isBlank()) .map(l -> prefix + "/" + l) diff --git a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/test/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJobIT.java b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/test/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJobIT.java index 53178d139f..faf955066f 100644 --- a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/test/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJobIT.java +++ b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/test/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJobIT.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -29,6 +30,8 @@ import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Optional; +import java.util.Set; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -46,6 +49,14 @@ final class CcwRifLoadJobIT extends AbstractLocalStackS3Test { /** Used to capture status updates from the job. */ @Mock private CcwRifLoadJobStatusReporter statusReporter; + /** Clear Micrometer meters after each test to ensure each test can verify its own metrics. */ + @AfterEach + void clearMeters() { + final var pipelineAppState = PipelineTestUtils.get().getPipelineApplicationState(); + final var meterRegistry = pipelineAppState.getMeters(); + meterRegistry.getMeters().forEach(meterRegistry::remove); + } + /** * Tests {@link CcwRifLoadJob} when run against an empty bucket. * @@ -763,6 +774,216 @@ public void shutdownWithManifestListDelayed() throws Exception { } } + /** + * Tests that {@link CcwRifLoadJob#call()} submits/creates Micrometer expected timer metrics for + * non-synthetic datasets. + * + * @throws Exception exception + */ + @Test + void submitsMetricsForNonSyntheticDataset() throws Exception { + String bucket = null; + try { + bucket = s3Dao.createTestBucket(); + final var datasetTimestamp = Instant.now().minus(1, ChronoUnit.HOURS); + final var options = + new ExtractionOptions(bucket, Optional.empty(), Optional.of(1), s3ClientConfig); + final var manifestA = + new DataSetManifest( + datasetTimestamp, + 0, + false, + CcwRifLoadJob.S3_PREFIX_PENDING_DATA_SETS, + new DataSetManifestEntry("beneficiaries.rif", RifFileType.BENEFICIARY)); + DataSetTestUtilities.putObject(s3Dao, bucket, manifestA); + DataSetTestUtilities.putObject( + s3Dao, + bucket, + manifestA, + manifestA.getEntries().getFirst(), + StaticRifResource.SAMPLE_A_BENES.getResourceUrl()); + final var manifestB = + new DataSetManifest( + datasetTimestamp, + 1, + false, + CcwRifLoadJob.S3_PREFIX_PENDING_DATA_SETS, + new DataSetManifestEntry("carrier.rif", RifFileType.CARRIER)); + DataSetTestUtilities.putObject(s3Dao, bucket, manifestB); + DataSetTestUtilities.putObject( + s3Dao, + bucket, + manifestB, + manifestB.getEntries().getFirst(), + StaticRifResource.SAMPLE_A_CARRIER.getResourceUrl()); + final var manifests = List.of(manifestA, manifestB); + putFinalManifestListInTestBucket(bucket, manifests); + + final var pipelineAppState = PipelineTestUtils.get().getPipelineApplicationState(); + final var listener = new MockDataSetMonitorListener(); + final var s3FilesDao = new S3ManifestDbDao(pipelineAppState.getEntityManagerFactory()); + final var s3FileCache = spy(new S3FileManager(pipelineAppState.getMetrics(), s3Dao, bucket)); + final var dataSetQueue = + new DataSetQueue( + pipelineAppState.getClock(), pipelineAppState.getMetrics(), s3FilesDao, s3FileCache); + try (CcwRifLoadJob ccwJob = + new CcwRifLoadJob( + pipelineAppState, + options, + dataSetQueue, + listener, + false, + Optional.empty(), + statusReporter)) { + for (final var ignored : manifests) { + assertEquals(PipelineJobOutcome.WORK_DONE, ccwJob.call()); + } + } + + final var allManifestMeters = + pipelineAppState.getMeters().getMeters().stream() + .filter( + meter -> + Set.of( + CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME, + CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME) + .contains(meter.getId().getName())) + .toList(); + // There are 2 timer metrics per-manifest and 2 manifests, so 4 total manifest meters + assertEquals(4, allManifestMeters.size()); + // Assert all meters have the expected tags + for (final var manifestMeter : allManifestMeters) { + assertEquals( + manifestA.getTimestampText(), + manifestMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_DATA_SET_TIMESTAMP)); + assertEquals("false", manifestMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_IS_SYNTHETIC)); + assertTrue( + Set.of("0_manifest.xml", "1_manifest.xml") + .contains(manifestMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_MANIFEST))); + } + + final var allDatasetMeters = + pipelineAppState.getMeters().getMeters().stream() + .filter( + meter -> + Set.of( + CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME, + CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME) + .contains(meter.getId().getName())) + .toList(); + // There are 2 timer metrics per-dataset, so only 2 timers should be expected + assertEquals(2, allDatasetMeters.size()); + // Assert all meters have the expected tags + for (final var datasetMeter : allDatasetMeters) { + assertEquals( + manifestA.getTimestampText(), + datasetMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_DATA_SET_TIMESTAMP)); + assertEquals("false", datasetMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_IS_SYNTHETIC)); + } + } finally { + if (StringUtils.isNotBlank(bucket)) s3Dao.deleteTestBucket(bucket); + } + } + + /** + * Tests that {@link CcwRifLoadJob#call()} submits/creates Micrometer expected timer metrics for + * synthetic datasets. + * + * @throws Exception exception + */ + @Test + void submitsMetricsForSyntheticDataset() throws Exception { + String bucket = null; + try { + bucket = s3Dao.createTestBucket(); + final var datasetTimestamp = Instant.now().minus(1, ChronoUnit.HOURS); + final var options = + new ExtractionOptions(bucket, Optional.empty(), Optional.of(1), s3ClientConfig); + final var manifest = + new DataSetManifest( + datasetTimestamp, + 0, + true, + CcwRifLoadJob.S3_PREFIX_PENDING_DATA_SETS, + new DataSetManifestEntry("beneficiaries.rif", RifFileType.BENEFICIARY), + new DataSetManifestEntry("carrier.rif", RifFileType.CARRIER)); + DataSetTestUtilities.putObject(s3Dao, bucket, manifest); + DataSetTestUtilities.putObject( + s3Dao, + bucket, + manifest, + manifest.getEntries().getFirst(), + StaticRifResource.SAMPLE_A_BENES.getResourceUrl()); + DataSetTestUtilities.putObject( + s3Dao, + bucket, + manifest, + manifest.getEntries().getLast(), + StaticRifResource.SAMPLE_A_CARRIER.getResourceUrl()); + + final var pipelineAppState = PipelineTestUtils.get().getPipelineApplicationState(); + final var listener = new MockDataSetMonitorListener(); + final var s3FilesDao = new S3ManifestDbDao(pipelineAppState.getEntityManagerFactory()); + final var s3FileCache = spy(new S3FileManager(pipelineAppState.getMetrics(), s3Dao, bucket)); + final var dataSetQueue = + new DataSetQueue( + pipelineAppState.getClock(), pipelineAppState.getMetrics(), s3FilesDao, s3FileCache); + try (CcwRifLoadJob ccwJob = + new CcwRifLoadJob( + pipelineAppState, + options, + dataSetQueue, + listener, + false, + Optional.empty(), + statusReporter)) { + assertEquals(PipelineJobOutcome.WORK_DONE, ccwJob.call()); + } + + final var allManifestMeters = + pipelineAppState.getMeters().getMeters().stream() + .filter( + meter -> + Set.of( + CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME, + CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME) + .contains(meter.getId().getName())) + .toList(); + // There are 2 timer metrics per-manifest and only 1 manifest, so 2 total manifest meters + assertEquals(2, allManifestMeters.size()); + // Assert all meters have the expected tags + for (final var manifestMeter : allManifestMeters) { + assertEquals( + manifest.getTimestampText(), + manifestMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_DATA_SET_TIMESTAMP)); + assertEquals("true", manifestMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_IS_SYNTHETIC)); + assertEquals( + "0_manifest.xml", manifestMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_MANIFEST)); + } + + final var allDatasetMeters = + pipelineAppState.getMeters().getMeters().stream() + .filter( + meter -> + Set.of( + CcwRifLoadJob.Metrics.DATASET_PROCESSING_ACTIVE_TIMER_NAME, + CcwRifLoadJob.Metrics.DATASET_PROCESSING_TOTAL_TIMER_NAME) + .contains(meter.getId().getName())) + .toList(); + // There are 2 timer metrics per-dataset, so only 2 timers should be expected + assertEquals(2, allDatasetMeters.size()); + // Assert all meters have the expected tags + for (final var datasetMeter : allDatasetMeters) { + assertEquals( + manifest.getTimestampText(), + datasetMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_DATA_SET_TIMESTAMP)); + assertEquals("true", datasetMeter.getId().getTag(CcwRifLoadJob.Metrics.TAG_IS_SYNTHETIC)); + } + } finally { + if (StringUtils.isNotBlank(bucket)) s3Dao.deleteTestBucket(bucket); + } + } + /** * Validate load given the input location to load files and output location to look for the files * once they're loaded.