From f7aba205e30fe7314b56260001f41198c63d8c3f Mon Sep 17 00:00:00 2001 From: Tihomir Surdilovic Date: Mon, 10 Feb 2025 08:53:10 -0500 Subject: [PATCH] refactor to use interceptor Signed-off-by: Tihomir Surdilovic --- ...eartbeater.java => AutoHeartbeatUtil.java} | 4 +- .../temporal/samples/autoheartbeat/README.md | 0 .../samples/autoheartbeat/Starter.java | 16 +++- .../{ => activities}/AutoActivities.java | 2 +- .../{ => activities}/AutoActivitiesImpl.java | 29 ++----- ...rtbeatActivityInboundCallsInterceptor.java | 85 +++++++++++++++++++ .../AutoHeartbeatWorkerInterceptor.java | 30 +++++++ .../{ => workflows}/AutoWorkflow.java | 2 +- .../{ => workflows}/AutoWorkflowImpl.java | 9 +- 9 files changed, 144 insertions(+), 33 deletions(-) rename core/src/main/java/io/temporal/samples/autoheartbeat/{AutoHeartbeater.java => AutoHeartbeatUtil.java} (97%) create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/README.md rename core/src/main/java/io/temporal/samples/autoheartbeat/{ => activities}/AutoActivities.java (94%) rename core/src/main/java/io/temporal/samples/autoheartbeat/{ => activities}/AutoActivitiesImpl.java (63%) create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java create mode 100644 core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java rename core/src/main/java/io/temporal/samples/autoheartbeat/{ => workflows}/AutoWorkflow.java (95%) rename core/src/main/java/io/temporal/samples/autoheartbeat/{ => workflows}/AutoWorkflowImpl.java (94%) diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java similarity index 97% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java index e5cffe87..1bda80e8 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeater.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/AutoHeartbeatUtil.java @@ -28,7 +28,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -public class AutoHeartbeater { +public class AutoHeartbeatUtil { private final long period; private final long initialDelay; private final TimeUnit periodTimeUnit; @@ -38,7 +38,7 @@ public class AutoHeartbeater { private final Object details; private String heartbeaterId; - public AutoHeartbeater( + public AutoHeartbeatUtil( long period, long initialDelay, TimeUnit periodTimeUnit, diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/README.md b/core/src/main/java/io/temporal/samples/autoheartbeat/README.md new file mode 100644 index 00000000..e69de29b diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java index 0e7d799f..a488bcb6 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/Starter.java @@ -21,9 +21,14 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; +import io.temporal.samples.autoheartbeat.activities.AutoActivitiesImpl; +import io.temporal.samples.autoheartbeat.interceptor.AutoHeartbeatWorkerInterceptor; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflow; +import io.temporal.samples.autoheartbeat.workflows.AutoWorkflowImpl; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerFactoryOptions; public class Starter { static final String TASK_QUEUE = "AutoheartbeatTaskQueue"; @@ -32,7 +37,16 @@ public class Starter { public static void main(String[] args) { WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); WorkflowClient client = WorkflowClient.newInstance(service); - WorkerFactory factory = WorkerFactory.newInstance(client); + + // Configure our auto heartbeat workflow interceptor which will apply + // AutoHeartbeaterUtil to each activity workflow schedules which has a heartbeat + // timeout configured + WorkerFactoryOptions wfo = + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new AutoHeartbeatWorkerInterceptor()) + .build(); + + WorkerFactory factory = WorkerFactory.newInstance(client, wfo); Worker worker = factory.newWorker(TASK_QUEUE); worker.registerWorkflowImplementationTypes(AutoWorkflowImpl.class); diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java similarity index 94% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java index 5b01950f..81726e05 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivities.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivities.java @@ -17,7 +17,7 @@ * permissions and limitations under the License. */ -package io.temporal.samples.autoheartbeat; +package io.temporal.samples.autoheartbeat.activities; import io.temporal.activity.ActivityInterface; diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java similarity index 63% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java index 6357e158..5a9507ab 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoActivitiesImpl.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/activities/AutoActivitiesImpl.java @@ -17,9 +17,8 @@ * permissions and limitations under the License. */ -package io.temporal.samples.autoheartbeat; +package io.temporal.samples.autoheartbeat.activities; -import io.temporal.activity.Activity; import io.temporal.client.ActivityCompletionException; import java.util.concurrent.TimeUnit; @@ -27,24 +26,17 @@ public class AutoActivitiesImpl implements AutoActivities { @Override public String runActivityOne(String input) { - return runActivity("runActivityOne - " + input); + return runActivity("runActivityOne - " + input, 20); } @Override public String runActivityTwo(String input) { - return runActivity("runActivityTwo - " + input); + return runActivity("runActivityTwo - " + input, 10); } @SuppressWarnings("FutureReturnValueIgnored") - private String runActivity(String input) { - // Start Autoheartbeater - AutoHeartbeater autoHearbeater = - new AutoHeartbeater( - getHeartbeatPeriod(), 0, TimeUnit.SECONDS, Activity.getExecutionContext(), input); - autoHearbeater.start(); - - // For sample our activity just sleeps for a second for 20 seconds - for (int i = 0; i < 20; i++) { + private String runActivity(String input, int seconds) { + for (int i = 0; i < seconds; i++) { try { sleep(1); } catch (ActivityCompletionException e) { @@ -58,7 +50,6 @@ private String runActivity(String input) { + "Workflow runid: " + e.getRunId().get() + " was canceled. Shutting down auto heartbeats"); - autoHearbeater.stop(); // We want to rethrow the cancel failure throw e; } @@ -66,16 +57,6 @@ private String runActivity(String input) { return "Activity completed: " + input; } - private long getHeartbeatPeriod() { - // Note you can add checks if heartbeat timeout is set if not and - // decide to log / fail activity / not start autoheartbeater based on your business logic - - // For sample we want to heartbeat 1 seconds less than heartbeat timeout - return Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() <= 1 - ? 1 - : Activity.getExecutionContext().getInfo().getHeartbeatTimeout().getSeconds() - 1; - } - private void sleep(int seconds) { try { Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java new file mode 100644 index 00000000..58d55736 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatActivityInboundCallsInterceptor.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.ActivityInboundCallsInterceptorBase; +import io.temporal.samples.autoheartbeat.AutoHeartbeatUtil; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +public class AutoHeartbeatActivityInboundCallsInterceptor + extends ActivityInboundCallsInterceptorBase { + private ActivityExecutionContext activityExecutionContext; + private Duration activityHeartbeatTimeout; + + public AutoHeartbeatActivityInboundCallsInterceptor(ActivityInboundCallsInterceptor next) { + super(next); + } + + @Override + public void init(ActivityExecutionContext context) { + this.activityExecutionContext = context; + activityHeartbeatTimeout = activityExecutionContext.getInfo().getHeartbeatTimeout(); + super.init(context); + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public ActivityOutput execute(ActivityInput input) { + // If activity has heartbeat timeout defined we want to apply auto-heartbeter + AutoHeartbeatUtil autoHearbeater = null; + if (activityHeartbeatTimeout != null) { + System.out.println( + "Auto heartbeating applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + autoHearbeater = + new AutoHeartbeatUtil( + getHeartbeatPeriod(activityHeartbeatTimeout), + 0, + TimeUnit.SECONDS, + activityExecutionContext, + input); + autoHearbeater.start(); + } else { + System.out.println( + "Auto heartbeating not being applied for activity: " + + activityExecutionContext.getInfo().getActivityType()); + } + + try { + return super.execute(input); + } catch (Exception e) { + throw e; + } finally { + if (autoHearbeater != null) { + autoHearbeater.stop(); + } + } + } + + private long getHeartbeatPeriod(Duration activityHeartbeatTimeout) { + // For sample we want to heartbeat 1 seconds less than heartbeat timeout + return activityHeartbeatTimeout.getSeconds() <= 1 + ? 1 + : activityHeartbeatTimeout.getSeconds() - 1; + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java new file mode 100644 index 00000000..9816fe64 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/interceptor/AutoHeartbeatWorkerInterceptor.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.autoheartbeat.interceptor; + +import io.temporal.common.interceptors.ActivityInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkerInterceptorBase; + +public class AutoHeartbeatWorkerInterceptor extends WorkerInterceptorBase { + @Override + public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { + return new AutoHeartbeatActivityInboundCallsInterceptor(next); + } +} diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java similarity index 95% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java index d749afe2..e8816c6a 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflow.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflow.java @@ -17,7 +17,7 @@ * permissions and limitations under the License. */ -package io.temporal.samples.autoheartbeat; +package io.temporal.samples.autoheartbeat.workflows; import io.temporal.workflow.SignalMethod; import io.temporal.workflow.WorkflowInterface; diff --git a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java similarity index 94% rename from core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java rename to core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java index df913337..88b6969c 100644 --- a/core/src/main/java/io/temporal/samples/autoheartbeat/AutoWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/autoheartbeat/workflows/AutoWorkflowImpl.java @@ -17,11 +17,12 @@ * permissions and limitations under the License. */ -package io.temporal.samples.autoheartbeat; +package io.temporal.samples.autoheartbeat.workflows; import io.temporal.activity.ActivityOptions; import io.temporal.failure.ActivityFailure; import io.temporal.failure.CanceledFailure; +import io.temporal.samples.autoheartbeat.activities.AutoActivities; import io.temporal.workflow.Async; import io.temporal.workflow.CancellationScope; import io.temporal.workflow.Promise; @@ -42,15 +43,15 @@ public String exec(String input) { AutoActivities.class, ActivityOptions.newBuilder() .setStartToCloseTimeout(Duration.ofSeconds(22)) - .setHeartbeatTimeout(Duration.ofSeconds(3)) + .setHeartbeatTimeout(Duration.ofSeconds(5)) .build()); AutoActivities activitiesTwo = Workflow.newActivityStub( AutoActivities.class, ActivityOptions.newBuilder() - .setStartToCloseTimeout(Duration.ofSeconds(22)) - .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setStartToCloseTimeout(Duration.ofSeconds(12)) + .setHeartbeatTimeout(Duration.ofSeconds(3)) .build()); // Start our activity in CancellationScope so we can cancel it if needed