Skip to content

Commit

Permalink
refactor to use interceptor
Browse files Browse the repository at this point in the history
Signed-off-by: Tihomir Surdilovic <[email protected]>
  • Loading branch information
tsurdilo committed Feb 10, 2025
1 parent c4b02d4 commit f7aba20
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class AutoHeartbeater {
public class AutoHeartbeatUtil {
private final long period;
private final long initialDelay;
private final TimeUnit periodTimeUnit;
Expand All @@ -38,7 +38,7 @@ public class AutoHeartbeater {
private final Object details;
private String heartbeaterId;

public AutoHeartbeater(
public AutoHeartbeatUtil(
long period,
long initialDelay,
TimeUnit periodTimeUnit,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl;
import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor;
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow;
import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.worker.WorkerFactoryOptions;

public class Starter {
static final String TASK_QUEUE = "AutoheartbeatTaskQueue";
Expand All @@ -32,7 +37,16 @@ public class Starter {
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);

// Configure our auto heartbeat workflow interceptor which will apply
// AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat
// timeout configured
WorkerFactoryOptions wfo =
WorkerFactoryOptions.newBuilder()
.setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor())
.build();

WorkerFactory factory = WorkerFactory.newInstance(client, wfo);
Worker worker = factory.newWorker(TASK_QUEUE);

worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat;
package io.temporal.samples.autoheartbeat.activities;

import io.temporal.activity.ActivityInterface;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,26 @@
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat;
package io.temporal.samples.autoheartbeat.activities;

import io.temporal.activity.Activity;
import io.temporal.client.ActivityCompletionException;
import java.util.concurrent.TimeUnit;

public class AutoActivitiesImpl implements AutoActivities {

@Override
public String runActivityOne(String input) {
return runActivity("runActivityOne - " + input);
return runActivity("runActivityOne - " + input, 20);
}

@Override
public String runActivityTwo(String input) {
return runActivity("runActivityTwo - " + input);
return runActivity("runActivityTwo - " + input, 10);
}

@SuppressWarnings("FutureReturnValueIgnored")
private String runActivity(String input) {
// Start Autoheartbeater
AutoHeartbeater autoHearbeater =
new AutoHeartbeater(
getHeartbeatPeriod(), 0, TimeUnit.SECONDS, Activity.getExecutionContext(), input);
autoHearbeater.start();

// For sample our activity just sleeps for a second for 20 seconds
for (int i = 0; i < 20; i++) {
private String runActivity(String input, int seconds) {
for (int i = 0; i < seconds; i++) {
try {
sleep(1);
} catch (ActivityCompletionException e) {
Expand All @@ -58,24 +50,13 @@ private String runActivity(String input) {
+ "Workflow runid: "
+ e.getRunId().get()
+ " was canceled. Shutting down auto heartbeats");
autoHearbeater.stop();
// We want to rethrow the cancel failure
throw e;
}
}
return "Activity completed: " + input;
}

private long getHeartbeatPeriod() {
// Note you can add checks if heartbeat timeout is set if not and
// decide to log / fail activity / not start autoheartbeater based on your business logic

// For sample we want to heartbeat 1 seconds less than heartbeat timeout
return Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() <= 1
? 1
: Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() - 1;
}

private void sleep(int seconds) {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat.interceptor;

import io.temporal.activity.ActivityExecutionContext;
import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase;
import io.temporal.samples.autoheartbeat.AutoHeartbeatUtil;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

public class AutoHeartbeatActivityInboundCallsInterceptor
extends ActivityInboundCallsInterceptorBase {
private ActivityExecutionContext activityExecutionContext;
private Duration activityHeartbeatTimeout;

public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) {
super(next);
}

@Override
public void init(ActivityExecutionContext context) {
this.activityExecutionContext = context;
activityHeartbeatTimeout = activityExecutionContext.getInfo().getHeartbeatTimeout();
super.init(context);
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public ActivityOutput execute(ActivityInput input) {
// If activity has heartbeat timeout defined we want to apply auto-heartbeter
AutoHeartbeatUtil autoHearbeater = null;
if (activityHeartbeatTimeout != null) {
System.out.println(
"Auto heartbeating applied for activity: "
+ activityExecutionContext.getInfo().getActivityType());
autoHearbeater =
new AutoHeartbeatUtil(
getHeartbeatPeriod(activityHeartbeatTimeout),
0,
TimeUnit.SECONDS,
activityExecutionContext,
input);
autoHearbeater.start();
} else {
System.out.println(
"Auto heartbeating not being applied for activity: "
+ activityExecutionContext.getInfo().getActivityType());
}

try {
return super.execute(input);
} catch (Exception e) {
throw e;
} finally {
if (autoHearbeater != null) {
autoHearbeater.stop();
}
}
}

private long getHeartbeatPeriod(Duration activityHeartbeatTimeout) {
// For sample we want to heartbeat 1 seconds less than heartbeat timeout
return activityHeartbeatTimeout.getSeconds() <= 1
? 1
: activityHeartbeatTimeout.getSeconds() - 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat.interceptor;

import io.temporal.common.interceptors.ActivityInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkerInterceptorBase;

public class AutoHeartbeatWorkerInterceptor extends WorkerInterceptorBase {
@Override
public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) {
return new AutoHeartbeatActivityInboundCallsInterceptor(next);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat;
package io.temporal.samples.autoheartbeat.workflows;

import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
* permissions and limitations under the License.
*/

package io.temporal.samples.autoheartbeat;
package io.temporal.samples.autoheartbeat.workflows;

import io.temporal.activity.ActivityOptions;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.CanceledFailure;
import io.temporal.samples.autoheartbeat.activities.AutoActivities;
import io.temporal.workflow.Async;
import io.temporal.workflow.CancellationScope;
import io.temporal.workflow.Promise;
Expand All @@ -42,15 +43,15 @@ public String exec(String input) {
AutoActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(22))
.setHeartbeatTimeout(Duration.ofSeconds(3))
.setHeartbeatTimeout(Duration.ofSeconds(5))
.build());

AutoActivities activitiesTwo =
Workflow.newActivityStub(
AutoActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(22))
.setHeartbeatTimeout(Duration.ofSeconds(5))
.setStartToCloseTimeout(Duration.ofSeconds(12))
.setHeartbeatTimeout(Duration.ofSeconds(3))
.build());

// Start our activity in CancellationScope so we can cancel it if needed
Expand Down

0 comments on commit f7aba20

Please sign in to comment.