Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add polling sample for infrequent poll with Retry-After delay - showc… #649

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,59 @@

package io.temporal.samples.polling;

import java.util.concurrent.ThreadLocalRandom;

/**
* Test service that we want to poll. It simulates a service being down and then returning a result
* after 5 attempts
*/
public class TestService {
private int tryAttempt = 0;
private int errorAttempts = 5; // default to 5 attempts before returning result
private boolean doRetryAfter = false;
private int minRetryAfter = 1;
private int maxRetryAfter = 3;

public TestService() {}

public TestService(int errorAttempts) {
this.errorAttempts = errorAttempts;
}

public TestService(int errorAttempts, boolean doRetryAfter) {
this.errorAttempts = errorAttempts;
this.doRetryAfter = doRetryAfter;
}

public String getServiceResult() throws TestServiceException {
tryAttempt++;
if (tryAttempt % errorAttempts == 0) {
return "OK";
} else {
throw new TestServiceException("Service is down");
if (!doRetryAfter) {
throw new TestServiceException("Service is down");
} else {
throw new TestServiceException(
"Service is down",
ThreadLocalRandom.current().nextInt(minRetryAfter, maxRetryAfter + 1));
}
}
}

public static class TestServiceException extends Exception {
private int retryAfterInMinutes = 1;

public TestServiceException(String message) {
super(message);
}

public TestServiceException(String message, int retryAfterInMinutes) {
super(message);
this.retryAfterInMinutes = retryAfterInMinutes;
}

public int getRetryAfterInMinutes() {
return retryAfterInMinutes;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.polling.infrequentwithretryafter;

import io.temporal.activity.Activity;
import io.temporal.failure.ApplicationFailure;
import io.temporal.samples.polling.PollingActivities;
import io.temporal.samples.polling.TestService;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

public class InfrequentPollingWithRetryAfterActivityImpl implements PollingActivities {
private TestService service;
final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_DATE_TIME;

public InfrequentPollingWithRetryAfterActivityImpl(TestService service) {
this.service = service;
}

@Override
public String doPoll() {
System.out.println(
"Attempt: "
+ Activity.getExecutionContext().getInfo().getAttempt()
+ " Poll time: "
+ LocalDateTime.now(ZoneId.systemDefault()).format(ISO_FORMATTER));

try {
return service.getServiceResult();
} catch (TestService.TestServiceException e) {
// we throw application failure that includes cause
// which is the test service exception
// and delay which is the interval to next retry based on test service retry-after directive
System.out.println("Activity next retry in: " + e.getRetryAfterInMinutes() + " minutes");
throw ApplicationFailure.newFailureWithCauseAndDelay(
e.getMessage(),
e.getClass().getName(),
e,
// here we set the next retry interval based on Retry-After duration given to us by our
// service
Duration.ofMinutes(e.getRetryAfterInMinutes()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.polling.infrequentwithretryafter;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.samples.polling.PollingWorkflow;
import io.temporal.samples.polling.TestService;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;

public class InfrequentPollingWithRetryAfterStarter {
private static final WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
private static final WorkflowClient client = WorkflowClient.newInstance(service);
private static final String taskQueue = "pollingSampleQueue";
private static final String workflowId = "InfrequentPollingWithRetryAfterWorkflow";

public static void main(String[] args) {
// Create our worker and register workflow and activities
createWorker();

// Create typed workflow stub and start execution (sync, wait for results)
PollingWorkflow workflow =
client.newWorkflowStub(
PollingWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue(taskQueue).setWorkflowId(workflowId).build());
String result = workflow.exec();
System.out.println("Result: " + result);
System.exit(0);
}

private static void createWorker() {
WorkerFactory workerFactory = WorkerFactory.newInstance(client);
Worker worker = workerFactory.newWorker(taskQueue);

// Register workflow and activities
worker.registerWorkflowImplementationTypes(InfrequentPollingWithRetryAfterWorkflowImpl.class);
worker.registerActivitiesImplementations(
new InfrequentPollingWithRetryAfterActivityImpl(new TestService(4, true)));

workerFactory.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.polling.infrequentwithretryafter;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.samples.polling.PollingActivities;
import io.temporal.samples.polling.PollingWorkflow;
import io.temporal.workflow.Workflow;
import java.time.Duration;

public class InfrequentPollingWithRetryAfterWorkflowImpl implements PollingWorkflow {
@Override
public String exec() {
/*
* Infrequent polling via activity can be implemented via activity retries. For this sample we
* want to poll the test service initially 60 seconds. After that we want to retry it based on
* the Retry-After directive from the downstream servie we are invoking from the activity.
*
* <ol>
* <li>Set RetryPolicy backoff coefficient of 1
* <li>Set initial interval to the poll frequency (since coefficient is 1, same interval will
* be used as default retry attempt)
* </ol>
*/
ActivityOptions options =
ActivityOptions.newBuilder()
// Set activity StartToClose timeout (single activity exec), does not include retries
.setStartToCloseTimeout(Duration.ofSeconds(2))
.setRetryOptions(
RetryOptions.newBuilder()
.setBackoffCoefficient(1)
// note we don't set initial interval here
.build())
.build();
// create our activities stub and start activity execution
PollingActivities activities = Workflow.newActivityStub(PollingActivities.class, options);
return activities.doPoll();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
## Infrequent polling with Service returning Retry-After time

* Note - for this sample to work you should use Temporal Service
version 1.24.2 or Temporal Cloud

* Note - for sample we assume that our downstream service returns a retry-after duration
that is longer than 1 minute

This sample shows how we can use Activity retries for infrequent polling of a third-party service (for example via REST).
This method can be used for infrequent polls of one minute or slower.
For this sample our test service also returns a Retry-After time (typically its done via response header but
for sample its just done in service exception)

We utilize activity retries for this option, setting Retries options:
* setBackoffCoefficient to 1
* here we do not set initial interval as its changed by the Retry-After duration
sent to us by the downstream service our activity invokes
*
This will allow us to retry our Activity based on the Retry-After duration our downstream service
tells us.

To run this sample:
```bash
./gradlew -q execute -PmainClass=io.temporal.samples.polling.infrequent.InfrequentPollingWithRetryAfterStarter
```

Since our test service simulates it being "down" for 3 polling attempts and then returns "OK" on the 4th poll attempt,
our Workflow is going to perform 3 activity retries
with different intervals based on the Retry-After time our serviec gives us,
and then return the service result on the successful 4th attempt.

Note that individual Activity retries are not recorded in
Workflow History, so we this approach we can poll for a very long time without affecting the history size.

### Sample result
If you run this sample you can see following in the logs for example:

```
Attempt: 1 Poll time: 2024-07-14T22:03:03.750506
Activity next retry in: 2 minutes
Attempt: 2 Poll time: 2024-07-14T22:05:03.780079
Activity next retry in: 3 minutes
Attempt: 3 Poll time: 2024-07-14T22:08:03.799703
Activity next retry in: 1 minutes
Attempt: 4 Poll time: 2024-07-14T22:09:03.817751
Result: OK
```
Loading