Skip to content

Commit

Permalink
refactor: use failsafe for webhook retries (#13487)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Aug 12, 2024
1 parent 871ea3d commit 16114a9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 20 deletions.
1 change: 1 addition & 0 deletions airbyte-workers/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ dependencies {
implementation(libs.micrometer.statsd)
implementation(libs.bundles.datadog)
implementation(libs.sentry.java)
implementation(libs.failsafe)

implementation(project(":oss:airbyte-analytics"))
implementation(project(":oss:airbyte-api:server-api"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.ScopeType;
import io.airbyte.api.client.model.generated.SecretPersistenceConfig;
Expand All @@ -33,6 +35,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -48,8 +51,13 @@ public class WebhookOperationActivityImpl implements WebhookOperationActivity {
private static final Logger LOGGER = LoggerFactory.getLogger(WebhookOperationActivityImpl.class);
private static final int MAX_RETRIES = 3;

private final HttpClient httpClient;
private static final RetryPolicy<Object> WEBHOOK_RETRY_POLICY = RetryPolicy.builder()
.withBackoff(1, 5, ChronoUnit.SECONDS)
.withMaxRetries(MAX_RETRIES)
.handle(Exception.class)
.build();

private final HttpClient httpClient;
private final SecretsRepositoryReader secretsRepositoryReader;
private final AirbyteApiClient airbyteApiClient;
private final FeatureFlagClient featureFlagClient;
Expand Down Expand Up @@ -98,6 +106,12 @@ public boolean invokeWebhook(final OperatorWebhookInput input) {
ApmTraceUtils.addTagsToTrace(Map.of(WEBHOOK_CONFIG_ID_KEY, input.getWebhookConfigId()));
LOGGER.info("Invoking webhook operation {}", webhookConfig.get().getName());

final HttpRequest.Builder requestBuilder = buildRequest(input, webhookConfig);

return Failsafe.with(WEBHOOK_RETRY_POLICY).get(() -> sendWebhook(requestBuilder, webhookConfig));
}

private HttpRequest.Builder buildRequest(final OperatorWebhookInput input, final Optional<WebhookConfig> webhookConfig) {
final HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(URI.create(input.getExecutionUrl()));
if (input.getExecutionBody() != null) {
Expand All @@ -109,24 +123,18 @@ public boolean invokeWebhook(final OperatorWebhookInput input) {
.header("Authorization", "Bearer " + webhookConfig.get().getAuthToken()).build();
}

Exception finalException = null;
// TODO(mfsiega-airbyte): replace this loop with retries configured on the HttpClient impl.
for (int i = 0; i < MAX_RETRIES; i++) {
try {
final HttpResponse<String> response = this.httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
LOGGER.debug("Webhook response: {}", response == null ? null : response.body());
LOGGER.info("Webhook response status: {}", response == null ? "empty response" : response.statusCode());
// Return true if the request was successful.
final boolean isSuccessful = response != null && response.statusCode() >= 200 && response.statusCode() <= 300;
LOGGER.info("Webhook {} execution status {}", webhookConfig.get().getName(), isSuccessful ? "successful" : "failed");
return isSuccessful;
} catch (final Exception e) {
LOGGER.warn(e.getMessage());
finalException = e;
}
}
// If we ever get here, it means we exceeded MAX_RETRIES without returning in the happy path.
throw new RuntimeException(finalException);
return requestBuilder;
}

private boolean sendWebhook(final HttpRequest.Builder requestBuilder, final Optional<WebhookConfig> webhookConfig)
throws IOException, InterruptedException {
final HttpResponse<String> response = this.httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
LOGGER.debug("Webhook response: {}", response == null ? null : response.body());
LOGGER.info("Webhook response status: {}", response == null ? "empty response" : response.statusCode());
// Return true if the request was successful.
final boolean isSuccessful = response != null && response.statusCode() >= 200 && response.statusCode() <= 300;
LOGGER.info("Webhook {} execution status {}", webhookConfig.get().getName(), isSuccessful ? "successful" : "failed");
return isSuccessful;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

package io.airbyte.workers.temporal.sync;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import dev.failsafe.FailsafeException;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectionContext;
Expand All @@ -18,6 +21,7 @@
import io.airbyte.config.secrets.SecretsRepositoryReader;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.TestClient;
import io.micronaut.http.HttpStatus;
import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
Expand Down Expand Up @@ -53,7 +57,7 @@ void init() {
@Test
void webhookActivityInvokesConfiguredWebhook() throws IOException, InterruptedException {
final HttpResponse mockHttpResponse = mock(HttpResponse.class);
when(mockHttpResponse.statusCode()).thenReturn(200).thenReturn(200);
when(mockHttpResponse.statusCode()).thenReturn(HttpStatus.OK.getCode()).thenReturn(HttpStatus.OK.getCode());
when(secretsRepositoryReader.hydrateConfigFromDefaultSecretPersistence(any())).thenReturn(Jsons.jsonNode(WORKSPACE_WEBHOOK_CONFIGS));
final OperatorWebhookInput input = new OperatorWebhookInput()
.withExecutionBody(WEBHOOK_EXECUTION_BODY)
Expand All @@ -66,4 +70,18 @@ void webhookActivityInvokesConfiguredWebhook() throws IOException, InterruptedEx
assertTrue(success);
}

@Test
void webhookActivityFailsWhenRetriesExhausted() throws IOException, InterruptedException {
final IOException exception = new IOException("test");
when(httpClient.send(any(), any())).thenThrow(exception);
when(secretsRepositoryReader.hydrateConfigFromDefaultSecretPersistence(any())).thenReturn(Jsons.jsonNode(WORKSPACE_WEBHOOK_CONFIGS));
final OperatorWebhookInput input = new OperatorWebhookInput()
.withExecutionBody(WEBHOOK_EXECUTION_BODY)
.withExecutionUrl(WEBHOOK_EXECUTION_URL)
.withWebhookConfigId(WEBHOOK_ID)
.withConnectionContext(new ConnectionContext().withOrganizationId(ORGANIZATION_ID));
final Throwable t = assertThrows(FailsafeException.class, () -> webhookActivity.invokeWebhook(input));
assertEquals(exception, t.getCause());
}

}

0 comments on commit 16114a9

Please sign in to comment.