Skip to content

Commit

Permalink
Register workflows and acitivities using instances along classes (#1201)
Browse files Browse the repository at this point in the history
  • Loading branch information
artur-ciocanu authored Feb 3, 2025
1 parent cf7405f commit 58d6218
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
public class DaprWorkflowsConfiguration implements ApplicationContextAware {
private static final Logger LOGGER = LoggerFactory.getLogger(DaprWorkflowsConfiguration.class);

private WorkflowRuntimeBuilder workflowRuntimeBuilder;
private final WorkflowRuntimeBuilder workflowRuntimeBuilder;

public DaprWorkflowsConfiguration(WorkflowRuntimeBuilder workflowRuntimeBuilder) {
this.workflowRuntimeBuilder = workflowRuntimeBuilder;
Expand All @@ -29,16 +29,21 @@ public DaprWorkflowsConfiguration(WorkflowRuntimeBuilder workflowRuntimeBuilder)
*/
private void registerWorkflowsAndActivities(ApplicationContext applicationContext) {
LOGGER.info("Registering Dapr Workflows and Activities");

Map<String, Workflow> workflowBeans = applicationContext.getBeansOfType(Workflow.class);
for (Workflow w : workflowBeans.values()) {
LOGGER.info("Dapr Workflow: '{}' registered", w.getClass().getName());
workflowRuntimeBuilder.registerWorkflow(w.getClass());

for (Workflow workflow : workflowBeans.values()) {
LOGGER.info("Dapr Workflow: '{}' registered", workflow.getClass().getName());

workflowRuntimeBuilder.registerWorkflow(workflow);
}

Map<String, WorkflowActivity> workflowActivitiesBeans = applicationContext.getBeansOfType(WorkflowActivity.class);
for (WorkflowActivity a : workflowActivitiesBeans.values()) {
LOGGER.info("Dapr Workflow Activity: '{}' registered", a.getClass().getName());
workflowRuntimeBuilder.registerActivity(a.getClass());

for (WorkflowActivity activity : workflowActivitiesBeans.values()) {
LOGGER.info("Dapr Workflow Activity: '{}' registered", activity.getClass().getName());

workflowRuntimeBuilder.registerActivity(activity);
}

try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* Wrapper for Durable Task Framework task activity factory.
*/
public class WorkflowActivityWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
public class WorkflowActivityClassWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
private final Constructor<T> activityConstructor;
private final String name;

Expand All @@ -32,7 +32,7 @@ public class WorkflowActivityWrapper<T extends WorkflowActivity> implements Task
*
* @param clazz Class of the activity to wrap.
*/
public WorkflowActivityWrapper(Class<T> clazz) {
public WorkflowActivityClassWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();
try {
this.activityConstructor = clazz.getDeclaredConstructor();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.TaskActivity;
import com.microsoft.durabletask.TaskActivityFactory;
import io.dapr.workflows.WorkflowActivity;

/**
* Wrapper for Durable Task Framework task activity factory.
*/
public class WorkflowActivityInstanceWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
private final T activity;
private final String name;

/**
* Constructor for WorkflowActivityWrapper.
*
* @param instance Instance of the activity to wrap.
*/
public WorkflowActivityInstanceWrapper(T instance) {
this.name = instance.getClass().getCanonicalName();
this.activity = instance;
}

@Override
public String getName() {
return name;
}

@Override
public TaskActivity create() {
return ctx -> activity.run(new DefaultWorkflowActivityContext(ctx));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
/**
* Wrapper for Durable Task Framework orchestration factory.
*/
class WorkflowWrapper<T extends Workflow> implements TaskOrchestrationFactory {
class WorkflowClassWrapper<T extends Workflow> implements TaskOrchestrationFactory {
private final Constructor<T> workflowConstructor;
private final String name;

public WorkflowWrapper(Class<T> clazz) {
public WorkflowClassWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();
try {
this.workflowConstructor = clazz.getDeclaredConstructor();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows.runtime;

import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.saga.Saga;

/**
* Wrapper for Durable Task Framework orchestration factory.
*/
class WorkflowInstanceWrapper<T extends Workflow> implements TaskOrchestrationFactory {
private final T workflow;
private final String name;

public WorkflowInstanceWrapper(T instance) {
this.name = instance.getClass().getCanonicalName();
this.workflow = instance;
}

@Override
public String getName() {
return name;
}

@Override
public TaskOrchestration create() {
return ctx -> {
if (workflow.getSagaOption() != null) {
Saga saga = new Saga(workflow.getSagaOption());
workflow.run(new DefaultWorkflowContext(ctx, saga));
} else {
workflow.run(new DefaultWorkflowContext(ctx));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,30 @@ public WorkflowRuntime build() {
* @return the WorkflowRuntimeBuilder
*/
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
this.builder.addOrchestration(new WorkflowWrapper<>(clazz));
this.builder.addOrchestration(new WorkflowClassWrapper<>(clazz));
this.workflowSet.add(clazz.getCanonicalName());
this.workflows.add(clazz.getSimpleName());

this.logger.info("Registered Workflow: " + clazz.getSimpleName());
this.logger.info("Registered Workflow: {}", clazz.getSimpleName());

return this;
}

/**
* Registers a Workflow object.
*
* @param <T> any Workflow type
* @param instance the workflow instance being registered
* @return the WorkflowRuntimeBuilder
*/
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(T instance) {
Class<T> clazz = (Class<T>) instance.getClass();

this.builder.addOrchestration(new WorkflowInstanceWrapper<>(instance));
this.workflowSet.add(clazz.getCanonicalName());
this.workflows.add(clazz.getSimpleName());

this.logger.info("Registered Workflow: {}", clazz.getSimpleName());

return this;
}
Expand All @@ -109,11 +128,30 @@ public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> cla
* @return the WorkflowRuntimeBuilder
*/
public <T extends WorkflowActivity> WorkflowRuntimeBuilder registerActivity(Class<T> clazz) {
this.builder.addActivity(new WorkflowActivityWrapper<>(clazz));
this.builder.addActivity(new WorkflowActivityClassWrapper<>(clazz));
this.activitySet.add(clazz.getCanonicalName());
this.activities.add(clazz.getSimpleName());

this.logger.info("Registered Activity: {}", clazz.getSimpleName());

return this;
}

/**
* Registers an Activity object.
*
* @param <T> any WorkflowActivity type
* @param instance the class instance being registered
* @return the WorkflowRuntimeBuilder
*/
public <T extends WorkflowActivity> WorkflowRuntimeBuilder registerActivity(T instance) {
Class<T> clazz = (Class<T>) instance.getClass();

this.builder.addActivity(new WorkflowActivityInstanceWrapper<>(instance));
this.activitySet.add(clazz.getCanonicalName());
this.activities.add(clazz.getSimpleName());

this.logger.info("Registered Activity: " + clazz.getSimpleName());
this.logger.info("Registered Activity: {}", clazz.getSimpleName());

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
import com.microsoft.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.junit.Assert;
import org.junit.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class WorkflowActivityWrapperTest {
public class WorkflowActivityClassWrapperTest {
public static class TestActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
Expand All @@ -22,24 +21,26 @@ public Object run(WorkflowActivityContext ctx) {
}

@Test
public void getName() throws NoSuchMethodException {
WorkflowActivityWrapper<TestActivity> wrapper = new WorkflowActivityWrapper<>(
WorkflowActivityWrapperTest.TestActivity.class);
Assert.assertEquals(
"io.dapr.workflows.runtime.WorkflowActivityWrapperTest.TestActivity",
public void getName() {
WorkflowActivityClassWrapper<TestActivity> wrapper = new WorkflowActivityClassWrapper<>(TestActivity.class);

assertEquals(
"io.dapr.workflows.runtime.WorkflowActivityClassWrapperTest.TestActivity",
wrapper.getName()
);
}

@Test
public void createWithClass() throws NoSuchMethodException {
public void createWithClass() {
TaskActivityContext mockContext = mock(TaskActivityContext.class);
WorkflowActivityWrapper<TestActivity> wrapper = new WorkflowActivityWrapper<>(
WorkflowActivityWrapperTest.TestActivity.class);
WorkflowActivityClassWrapper<TestActivity> wrapper = new WorkflowActivityClassWrapper<>(TestActivity.class);

when(mockContext.getInput(String.class)).thenReturn("Hello");
when(mockContext.getName()).thenReturn("TestActivityContext");

Object result = wrapper.create().run(mockContext);

verify(mockContext, times(1)).getInput(String.class);
Assert.assertEquals("Hello world! from TestActivityContext", result);
assertEquals("Hello world! from TestActivityContext", result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.dapr.workflows.runtime;

import com.microsoft.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.junit.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class WorkflowActivityInstanceWrapperTest {
public static class TestActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
String activityContextName = ctx.getName();
return ctx.getInput(String.class) + " world! from " + activityContextName;
}
}

@Test
public void getName() {
WorkflowActivityInstanceWrapper<TestActivity> wrapper = new WorkflowActivityInstanceWrapper<>(new TestActivity());

assertEquals(
"io.dapr.workflows.runtime.WorkflowActivityInstanceWrapperTest.TestActivity",
wrapper.getName()
);
}

@Test
public void createWithInstance() {
TaskActivityContext mockContext = mock(TaskActivityContext.class);
WorkflowActivityInstanceWrapper<TestActivity> wrapper = new WorkflowActivityInstanceWrapper<>(new TestActivity());

when(mockContext.getInput(String.class)).thenReturn("Hello");
when(mockContext.getName()).thenReturn("TestActivityContext");

Object result = wrapper.create().run(mockContext);

verify(mockContext, times(1)).getInput(String.class);
assertEquals("Hello world! from TestActivityContext", result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@

package io.dapr.workflows.runtime;


import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class WorkflowWrapperTest {
public class WorkflowClassWrapperTest {
public static class TestWorkflow implements Workflow {
@Override
public WorkflowStub create() {
Expand All @@ -36,20 +35,22 @@ public WorkflowStub create() {

@Test
public void getName() {
WorkflowWrapper<TestWorkflow> wrapper = new WorkflowWrapper<>(TestWorkflow.class);
Assertions.assertEquals(
"io.dapr.workflows.runtime.WorkflowWrapperTest.TestWorkflow",
WorkflowClassWrapper<TestWorkflow> wrapper = new WorkflowClassWrapper<>(TestWorkflow.class);

assertEquals(
"io.dapr.workflows.runtime.WorkflowClassWrapperTest.TestWorkflow",
wrapper.getName()
);
}

@Test
public void createWithClass() {
TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class);
WorkflowWrapper<TestWorkflow> wrapper = new WorkflowWrapper<>(TestWorkflow.class);
WorkflowClassWrapper<TestWorkflow> wrapper = new WorkflowClassWrapper<>(TestWorkflow.class);

when(mockContext.getInstanceId()).thenReturn("uuid");
wrapper.create().run(mockContext);
verify(mockContext, times(1)).getInstanceId();
}

}
}
Loading

0 comments on commit 58d6218

Please sign in to comment.