Skip to content

Commit

Permalink
chore: add attributes to long running sync metric. (#14838)
Browse files Browse the repository at this point in the history
Adds sourceImg, destImg and connectionId attrs to the long running sync metric to support connector rollouts—we can catch hanging syncs per connector image.
  • Loading branch information
tryangul committed Dec 18, 2024
1 parent 70138e5 commit 0dc2470
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import jakarta.inject.Singleton;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -131,12 +133,23 @@ public Duration getDuration() {

@SuppressWarnings("OneTopLevelClass")
@Singleton
final class NumUnusuallyLongSyncs extends Emitter {
final class UnusuallyLongSyncs extends Emitter {

NumUnusuallyLongSyncs(final MetricClient client, final MetricRepository db) {
UnusuallyLongSyncs(final MetricClient client, final MetricRepository db) {
super(client, () -> {
final var count = db.numberOfJobsRunningUnusuallyLong();
client.gauge(OssMetricsRegistry.NUM_UNUSUALLY_LONG_SYNCS, count);
final var longRunningJobs = db.unusuallyLongRunningJobs();
longRunningJobs.forEach(job -> {
final List<MetricAttribute> attributes = new ArrayList<>();
// job might be null if we fail to map the row to the model under rare circumstances
if (job != null) {
attributes.add(new MetricAttribute(MetricTags.SOURCE_IMAGE, job.sourceDockerImage()));
attributes.add(new MetricAttribute(MetricTags.DESTINATION_IMAGE, job.destinationDockerImage()));
attributes.add(new MetricAttribute(MetricTags.CONNECTION_ID, job.connectionId()));
}

client.count(OssMetricsRegistry.NUM_UNUSUALLY_LONG_SYNCS, 1, attributes.toArray(new MetricAttribute[0]));
});

return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
import io.airbyte.db.instance.jobs.jooq.generated.enums.AttemptStatus;
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus;
import io.airbyte.metrics.reporter.model.LongRunningJobMetadata;
import jakarta.inject.Singleton;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.impl.DSL;

@Singleton
Expand Down Expand Up @@ -221,34 +224,37 @@ having count(*) < 1440 / cast(c.schedule::jsonb->'units' as integer)
+ ctx.fetchOne(queryForAbnormalSyncInMinutesInLastDay).get("cnt", long.class);
}

long numberOfJobsRunningUnusuallyLong() {
List<LongRunningJobMetadata> unusuallyLongRunningJobs() {
// Definition of unusually long means runtime is more than 2x historic avg run time or 15
// minutes more than avg run time, whichever is greater.
// It will skip jobs with fewer than 4 runs in last week to make sure the historic avg run is
// meaningful and consistent.
final var query =
"""
-- pick average running time and last sync running time in attempts table.
select
select
current_running_attempts.connection_id,
current_running_attempts.running_time,
current_running_attempts.source_image,
current_running_attempts.dest_image,
current_running_attempts.running_time_sec,
historic_avg_running_attempts.avg_run_sec
from
(
-- Sub-query-1: query the currently running attempt's running time.
(
select
jobs.scope as connection_id,
extract(epoch from age(NOW(), attempts.created_at)) as running_time
extract(epoch from age(NOW(), attempts.created_at)) as running_time_sec,
jobs.config->'sync'->>'sourceDockerImage' as source_image,
jobs.config->'sync'->>'destinationDockerImage' as dest_image
from
jobs
join attempts on
jobs.id = attempts.job_id
where
jobs.status = 'running'
and attempts.status = 'running'
and jobs.config_type = 'sync' )
as current_running_attempts
and jobs.config_type = 'sync'
) as current_running_attempts
join
-- Sub-query-2: query historic attempts' average running time within last week.
(
Expand All @@ -275,13 +281,27 @@ long numberOfJobsRunningUnusuallyLong() {
where
-- Find if currently running time takes 2x more time than average running time,
-- and it's 15 minutes (900 seconds) more than average running time so it won't alert on noises for quick sync jobs.
current_running_attempts.running_time > greatest(
current_running_attempts.running_time_sec > greatest(
historic_avg_running_attempts.avg_run_sec * 2,
historic_avg_running_attempts.avg_run_sec + 900
)
""";
final var queryResults = ctx.fetch(query);
return queryResults.getValues("connection_id").size();
return queryResults.map(new RecordMapper<Record, LongRunningJobMetadata>() {

@Override
public LongRunningJobMetadata map(final Record rec) {
try {
return new LongRunningJobMetadata(
rec.getValue("source_image").toString(),
rec.getValue("dest_image").toString(),
rec.getValue("connection_id").toString());
} catch (final Exception e) {
return null;
}
}

});
}

Map<JobStatus, Double> overallJobRuntimeForTerminalJobsInLastHour() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.metrics.reporter.model;

public record LongRunningJobMetadata(
String sourceDockerImage,
String destinationDockerImage,
String connectionId) {}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -14,13 +15,15 @@
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.metrics.reporter.model.LongRunningJobMetadata;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@SuppressWarnings("MethodName")
@SuppressWarnings({"MethodName", "PMD.JUnitTestsShouldIncludeAssert"})
class EmitterTest {

private MetricClient client;
Expand Down Expand Up @@ -184,4 +187,52 @@ void TestTotalJobRuntimeByTerminalState() {
verify(client).count(OssMetricsRegistry.EST_NUM_METRICS_EMITTED_BY_REPORTER, 1);
}

@Test
void unusuallyLongSyncs() {
final var values = List.of(
new LongRunningJobMetadata("sourceImg1", "destImg1", "connection1"),
new LongRunningJobMetadata("sourceImg2", "destImg2", "connection2"),
new LongRunningJobMetadata("sourceImg3", "destImg3", "connection3"));
when(repo.unusuallyLongRunningJobs()).thenReturn(values);

final var emitter = new UnusuallyLongSyncs(client, repo);
emitter.emit();

values.forEach(meta -> {
verify(client).count(
OssMetricsRegistry.NUM_UNUSUALLY_LONG_SYNCS, 1,
new MetricAttribute(MetricTags.SOURCE_IMAGE, meta.sourceDockerImage()),
new MetricAttribute(MetricTags.DESTINATION_IMAGE, meta.destinationDockerImage()),
new MetricAttribute(MetricTags.CONNECTION_ID, meta.connectionId()));
});
}

@Test
void unusuallyLongSyncsHandlesNullMetadata() {
final List<LongRunningJobMetadata> values = new ArrayList<>();
values.add(new LongRunningJobMetadata("sourceImg1", "destImg1", "connection1"));
values.add(null); // specifically add a null to simulate a mapping failure
values.add(new LongRunningJobMetadata("sourceImg2", "destImg2", "connection2"));
when(repo.unusuallyLongRunningJobs()).thenReturn(values);

final var emitter = new UnusuallyLongSyncs(client, repo);
emitter.emit();

// metric is incremented for well-formed job metadata with attrs
verify(client).count(
OssMetricsRegistry.NUM_UNUSUALLY_LONG_SYNCS, 1,
new MetricAttribute(MetricTags.SOURCE_IMAGE, "sourceImg1"),
new MetricAttribute(MetricTags.DESTINATION_IMAGE, "destImg1"),
new MetricAttribute(MetricTags.CONNECTION_ID, "connection1"));

verify(client).count(
OssMetricsRegistry.NUM_UNUSUALLY_LONG_SYNCS, 1,
new MetricAttribute(MetricTags.SOURCE_IMAGE, "sourceImg2"),
new MetricAttribute(MetricTags.DESTINATION_IMAGE, "destImg2"),
new MetricAttribute(MetricTags.CONNECTION_ID, "connection2"));

// metric is incremented without attrs for the null case
verify(client, times(1)).count(OssMetricsRegistry.NUM_UNUSUALLY_LONG_SYNCS, 1, new MetricAttribute[0]);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,65 @@ void shouldNotCountNormalJobsInAbnormalMetric() {
class UnusuallyLongJobs {

@Test
void shouldCountInJobsWithUnusuallyLongTime() throws SQLException {
void shouldCountJobsWithUnusuallyLongTime() throws SQLException {
final var connectionId = UUID.randomUUID();
final var syncConfigType = JobConfigType.sync;
final var config = JSONB.valueOf("""
{
"sync": {
"sourceDockerImage": "airbyte/source-postgres-1.1.0",
"destinationDockerImage": "airbyte/destination-s3-1.4.0"
}
}
""");

// Current job has been running for 12 hours while the previous 5 jobs runs 2 hours. Avg will be 2
// hours.
// Thus latest job will be counted as an unusually long-running job.
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT, JOBS.UPDATED_AT, JOBS.CONFIG_TYPE, JOBS.CONFIG)
.values(100L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS), syncConfigType, config)
.values(1L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS), syncConfigType, config)
.values(2L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(16, ChronoUnit.HOURS), syncConfigType, config)
.values(3L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(16, ChronoUnit.HOURS),
OffsetDateTime.now().minus(14, ChronoUnit.HOURS), syncConfigType, config)
.values(4L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(14, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS), syncConfigType, config)
.values(5L, connectionId.toString(), JobStatus.running, OffsetDateTime.now().minus(12, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS), syncConfigType, config)
.execute();

ctx.insertInto(ATTEMPTS, ATTEMPTS.ID, ATTEMPTS.JOB_ID, ATTEMPTS.STATUS, ATTEMPTS.CREATED_AT, ATTEMPTS.UPDATED_AT)
.values(100L, 100L, AttemptStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS))
.values(1L, 1L, AttemptStatus.succeeded, OffsetDateTime.now().minus(20, ChronoUnit.HOURS),
OffsetDateTime.now().minus(18, ChronoUnit.HOURS))
.values(2L, 2L, AttemptStatus.succeeded, OffsetDateTime.now().minus(18, ChronoUnit.HOURS),
OffsetDateTime.now().minus(16, ChronoUnit.HOURS))
.values(3L, 3L, AttemptStatus.succeeded, OffsetDateTime.now().minus(16, ChronoUnit.HOURS),
OffsetDateTime.now().minus(14, ChronoUnit.HOURS))
.values(4L, 4L, AttemptStatus.succeeded, OffsetDateTime.now().minus(14, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS))
.values(5L, 5L, AttemptStatus.running, OffsetDateTime.now().minus(12, ChronoUnit.HOURS),
OffsetDateTime.now().minus(12, ChronoUnit.HOURS))
.execute();

final var longRunningJobs = db.unusuallyLongRunningJobs();
assertEquals(1, longRunningJobs.size());
final var job = longRunningJobs.get(0);
assertEquals("airbyte/source-postgres-1.1.0", job.sourceDockerImage());
assertEquals("airbyte/destination-s3-1.4.0", job.destinationDockerImage());
assertEquals(connectionId.toString(), job.connectionId());
}

@Test
void handlesNullConfigRows() throws SQLException {
final var connectionId = UUID.randomUUID();
final var syncConfigType = JobConfigType.sync;

// same as above but no value passed for `config`
ctx.insertInto(JOBS, JOBS.ID, JOBS.SCOPE, JOBS.STATUS, JOBS.CREATED_AT, JOBS.UPDATED_AT, JOBS.CONFIG_TYPE)
.values(100L, connectionId.toString(), JobStatus.succeeded, OffsetDateTime.now().minus(28, ChronoUnit.HOURS),
OffsetDateTime.now().minus(26, ChronoUnit.HOURS), syncConfigType)
Expand Down Expand Up @@ -554,8 +606,8 @@ void shouldCountInJobsWithUnusuallyLongTime() throws SQLException {
OffsetDateTime.now().minus(12, ChronoUnit.HOURS))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(1, numOfJubsRunningUnusallyLong);
final var longRunningJobs = db.unusuallyLongRunningJobs();
assertEquals(1, longRunningJobs.size());
}

@Test
Expand Down Expand Up @@ -597,8 +649,8 @@ void shouldNotCountInJobsWithinFifteenMinutes() throws SQLException {
OffsetDateTime.now().minus(14, ChronoUnit.MINUTES))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(0, numOfJubsRunningUnusallyLong);
final var numOfUnusuallyLongRunningJobs = db.unusuallyLongRunningJobs().size();
assertEquals(0, numOfUnusuallyLongRunningJobs);
}

@Test
Expand All @@ -625,8 +677,8 @@ void shouldSkipInsufficientJobRuns() throws SQLException {
OffsetDateTime.now().minus(1, ChronoUnit.HOURS))
.execute();

final var numOfJubsRunningUnusallyLong = db.numberOfJobsRunningUnusuallyLong();
assertEquals(0, numOfJubsRunningUnusallyLong);
final var numOfUnusuallyLongRunningJobs = db.unusuallyLongRunningJobs().size();
assertEquals(0, numOfUnusuallyLongRunningJobs);
}

}
Expand Down

0 comments on commit 0dc2470

Please sign in to comment.