Skip to content

Commit

Permalink
* Add options waitForSync,syncTimeout,watchHealth parameters for Upda…
Browse files Browse the repository at this point in the history
…teSpec and CreateUpdate operations

* Fixed timeout issues for waitForSync method by moving BuffereReader instance creation statement from try-with-resource block.
  • Loading branch information
g0h04k0 committed Jun 2, 2024
1 parent fe2334c commit 5c1f328
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
Expand Down Expand Up @@ -137,8 +138,8 @@ static V1alpha1Application getAppWatchEvent(String app,
WaitWatchParams p,
ApplicationServiceApi appApi) throws Exception {

try (CloseableHttpResponse response = client.getHttpClient().execute(request);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
try (CloseableHttpResponse response = client.getHttpClient().execute(request)) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));
String line;

while ((line = bufferedReader.readLine()) != null) {
Expand Down Expand Up @@ -235,7 +236,7 @@ public V1alpha1Application waitForSync(String appName, String resourceVersion, D
return Optional.empty();
};

return new CallRetry<>(mainAttempt, fallback).attemptWithRetry(RETRY_LIMIT);
return new CallRetry<>(mainAttempt, fallback, List.of(SocketTimeoutException.class) ).attemptWithRetry(RETRY_LIMIT);
}

private static String healthStatus(V1alpha1Application app) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ private TaskResult processUpdateSpecAction(TaskParams.UpdateSpecParams in) throw
appSpec = ConfigurationUtils.deepMerge(appSpec, in.spec());
V1alpha1ApplicationSpec specObject = objectMapper.mapToModel(appSpec, V1alpha1ApplicationSpec.class);
V1alpha1ApplicationSpec result = client.updateAppSpec(in.app(), specObject);

if(in.waitForSync()) {
client.waitForSync(in.app(), app.getMetadata().getResourceVersion(), in.syncTimeout(), toWatchParams(in.watchHealth()));
}
return TaskResult.success()
.value("spec", toMap(result));
} finally {
Expand Down Expand Up @@ -261,8 +263,10 @@ private TaskResult processCreateAction(TaskParams.CreateUpdateParams in) throws
V1alpha1Application application = objectMapper.buildApplicationObject(in);
ArgoCdClient client = new ArgoCdClient(in);
V1alpha1Application app = client.createApp(application, in.upsert());
app = client.waitForSync(in.app(), app.getMetadata().getResourceVersion(), in.syncTimeout(),
toWatchParams(false));
if(in.waitForSync()){
app = client.waitForSync(in.app(), app.getMetadata().getResourceVersion(), in.syncTimeout(),
toWatchParams(in.watchHealth()));
}
return TaskResult.success()
.value("app", toMap(app));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.Callable;

public class CallRetry<R> {
private static final Logger log = LoggerFactory.getLogger(CallRetry.class);
private final Callable<R> mainAttempt;
private final Collection<Callable<Optional<R>>> fallbackAttempts;
private final List<Class<? extends Exception>> exceptionsToNotRetry;

/**
* @param mainAttempt Primary call to attempt to execute
* @param fallbackAttempt Fallback call which will be attempted if the main call throws an exception.
* @param exceptionsToNotRetry dont retry for these exceptions
*/
public CallRetry(Callable<R> mainAttempt, Callable<Optional<R>> fallbackAttempt) {
public CallRetry(Callable<R> mainAttempt, Callable<Optional<R>> fallbackAttempt, List<Class<? extends Exception>> exceptionsToNotRetry) {
this.mainAttempt = mainAttempt;
this.fallbackAttempts = Collections.singleton(fallbackAttempt);
this.exceptionsToNotRetry = exceptionsToNotRetry;
}

/**
Expand All @@ -50,23 +51,33 @@ public CallRetry(Callable<R> mainAttempt, Callable<Optional<R>> fallbackAttempt)
* @return Results of main call, or fallback result if data is present after main failure
*/
public R attemptWithRetry(int maxTries) {
log.info("attempt with retry start {}", new Date().toString());
Throwable lastError = null;

int attemptsMade = 0;
while (attemptsMade++ < maxTries) {
try {
return mainAttempt.call();
R out = mainAttempt.call();
log.info("after main attempt {}", new Date().toString());
return out;
} catch (Exception e) {
if(exceptionsToNotRetry.stream().anyMatch(exceptionToNotRetry -> e.getClass().equals(exceptionToNotRetry))) {
throw new RuntimeException(e);
}
log.info("Exception in main attempt {}, e", new Date().toString(), e);
lastError = e;
}

// if any of these returns cleanly then that's good enough
for (Callable<Optional<R>> c : fallbackAttempts) {
log.info("Running fallback attempt {}", new Date().toString());
try {
Optional<R> result = c.call();

if (result.isPresent()) {
return result.get();
R out = result.get();
log.info("after fallback attempt {}", new Date().toString());
return out;
}
} catch (Exception e) {
log.warn("Error attempting fallback: {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ default String project() {
@Nullable
Map<String, Object> spec();

@Value.Default
default boolean waitForSync() { return true; }

@Value.Default
default boolean watchHealth() { return false; }

interface GitRepo {
String repoUrl();

Expand Down Expand Up @@ -266,6 +272,15 @@ interface UpdateSpecParams extends TaskParams {
String app();

Map<String, Object> spec();

@Value.Default
default boolean waitForSync() { return false; }

@Value.Default
default boolean watchHealth() { return false; }

@Nullable
Duration syncTimeout();
}

interface SetAppParams extends TaskParams {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ public static class CreateParamsImpl extends TaskParamsImpl implements CreateUpd
private static final String SYNC_TIMEOUT_KEY = "syncTimeout";
private static final String SPEC_KEY = "spec";
private static final String UPSERT_KEY = "upsert";
private static final String WAIT_FOR_SYNC_KEY = "waitForSync";
private static final String WATCH_HEALTH_KEY = "watchHealth";

protected CreateParamsImpl(Variables variables) {
super(variables);
Expand All @@ -447,6 +449,16 @@ public String namespace() {
return variables.assertString(NAMESPACE_KEY);
}

@Override
public boolean waitForSync() {
return variables.getBoolean(WAIT_FOR_SYNC_KEY, CreateUpdateParams.super.waitForSync());
}

@Override
public boolean watchHealth() {
return variables.getBoolean(WATCH_HEALTH_KEY, CreateUpdateParams.super.watchHealth());
}

@Override
public boolean upsert() { return variables.getBoolean(UPSERT_KEY, false); }

Expand Down Expand Up @@ -759,11 +771,36 @@ private static final class UpdateSpecParamsImpl extends TaskParamsImpl implement

private static final String APP_KEY = "app";
private static final String SPEC_KEY = "spec";
private static final String WAIT_FOR_SYNC_KEY = "waitForSync";
private static final String WATCH_HEALTH_KEY = "watchHealth";
private static final String SYNC_TIMEOUT_KEY = "syncTimeout";

protected UpdateSpecParamsImpl(Variables variables) {
super(variables);
}

@Nullable
@Override
public Duration syncTimeout() {
String value = variables.getString(SYNC_TIMEOUT_KEY);
if (value == null) {
return null;
}

return Duration.parse(value);
}

@Override
public boolean waitForSync() {
return variables.getBoolean(WAIT_FOR_SYNC_KEY, UpdateSpecParams.super.waitForSync());
}

@Override
public boolean watchHealth() {
return variables.getBoolean(WATCH_HEALTH_KEY, UpdateSpecParams.super.watchHealth());
}


@Override
public String app() {
return variables.assertString(APP_KEY);
Expand Down Expand Up @@ -847,7 +884,7 @@ public String propagationPolicy() {
}
}

private static class SyncParamsImpl extends TaskParamsImpl implements SyncParams {
public static class SyncParamsImpl extends TaskParamsImpl implements SyncParams {

private static class ResourceImpl implements SyncParams.Resource {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;

import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -47,7 +50,7 @@ class CallRetryTest {
void test() throws Exception {
when(primaryResp.call()).thenReturn("a");

String result = new CallRetry<>(primaryResp, fallbackResp).attemptWithRetry(2);
String result = new CallRetry<>(primaryResp, fallbackResp, Collections.emptyList()).attemptWithRetry(2);

assertEquals("a", result);
verify(primaryResp, times(1)).call();
Expand All @@ -59,13 +62,23 @@ void testPrimaryFail() throws Exception {
when(primaryResp.call()).thenThrow(new IllegalStateException("forced exception"));
when(fallbackResp.call()).thenReturn(Optional.of("b"));

String result = new CallRetry<>(primaryResp, fallbackResp).attemptWithRetry(2);
String result = new CallRetry<>(primaryResp, fallbackResp, Collections.emptyList()).attemptWithRetry(2);

assertEquals("b", result);
verify(primaryResp, times(1)).call();
verify(fallbackResp, times(1)).call();
}

@Test
void testPrimaryFailWithExpectedException() throws Exception {
when(primaryResp.call()).thenThrow(new SocketTimeoutException("forced exception"));
CallRetry<String> callRetry = new CallRetry<>(primaryResp, fallbackResp, List.of(SocketTimeoutException.class));
Exception e = assertThrows(RuntimeException.class, () -> callRetry.attemptWithRetry(2));
assertEquals(e.getMessage(), "java.net.SocketTimeoutException: forced exception");
verify(primaryResp, times(1)).call();
verify(fallbackResp, times(0)).call();
}

@Test
void testRetry() throws Exception {
when(primaryResp.call()).thenAnswer(new Answer<String>() {
Expand All @@ -81,7 +94,7 @@ public String answer(InvocationOnMock invocation) {
});
when(fallbackResp.call()).thenReturn(Optional.empty());

String result = new CallRetry<>(primaryResp, fallbackResp).attemptWithRetry(2);
String result = new CallRetry<>(primaryResp, fallbackResp, Collections.emptyList()).attemptWithRetry(2);

assertEquals("a", result);
verify(primaryResp, times(2)).call();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.walmartlabs.concord.runtime.v2.sdk.MapBackedVariables;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.*;

import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -89,4 +90,37 @@ public void testAzureAuth() {
assertEquals("https://login.azure.com/cleint-1", ((TaskParams.AzureAuth)in.auth()).authority());
assertTrue(((TaskParams.AzureAuth)in.auth()).scope().contains("user.read"));
}


@Test
public void testUpdateSpecParams() {
Map<String, Object> authParams = new HashMap<>();

Map<String, Object> vars = new HashMap<>();
vars.put("action", TaskParams.Action.UPDATESPEC.name());
vars.put("auth", Collections.singletonMap("azure", authParams));
vars.put("syncTimeout", "PT5M");

TaskParams.UpdateSpecParams in = (TaskParams.UpdateSpecParams) TaskParamsImpl.of(new MapBackedVariables(vars), Collections.emptyMap());
assertFalse(in.waitForSync());
assertFalse(in.watchHealth());
assertEquals(in.syncTimeout(), Duration.parse("PT5M"));
}


@Test
public void testCreateSpecParams() {
Map<String, Object> authParams = new HashMap<>();

Map<String, Object> vars = new HashMap<>();
vars.put("action", TaskParams.Action.CREATE.name());
vars.put("auth", Collections.singletonMap("azure", authParams));
vars.put("syncTimeout", "PT5M");

TaskParams.CreateUpdateParams in = (TaskParams.CreateUpdateParams) TaskParamsImpl.of(new MapBackedVariables(vars), Collections.emptyMap());
assertTrue(in.waitForSync());
assertFalse(in.watchHealth());
assertEquals(in.syncTimeout(), Duration.parse("PT5M"));
}

}

0 comments on commit 5c1f328

Please sign in to comment.