-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update request processor #593
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package io.temporal.samples.hello; | ||
|
||
import io.temporal.activity.ActivityInterface; | ||
import io.temporal.activity.ActivityOptions; | ||
import io.temporal.api.enums.v1.WorkflowIdReusePolicy; | ||
import io.temporal.client.WorkflowClient; | ||
import io.temporal.client.WorkflowOptions; | ||
import io.temporal.client.WorkflowStub; | ||
import io.temporal.serviceclient.WorkflowServiceStubs; | ||
import io.temporal.worker.Worker; | ||
import io.temporal.worker.WorkerFactory; | ||
import io.temporal.workflow.CompletablePromise; | ||
import io.temporal.workflow.UpdateMethod; | ||
import io.temporal.workflow.Workflow; | ||
import io.temporal.workflow.WorkflowInterface; | ||
import io.temporal.workflow.WorkflowMethod; | ||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
public class CommandProcessor { | ||
static final String TASK_QUEUE = "MyTaskQueue"; | ||
static final String WORKFLOW_ID = "MyWorkflowId"; | ||
|
||
@WorkflowInterface | ||
public interface CommandProcessorWorkflow { | ||
@WorkflowMethod | ||
String startProcessing(); | ||
|
||
@UpdateMethod | ||
String submitCommand(int command); | ||
} | ||
|
||
@ActivityInterface | ||
public interface MyActivities { | ||
String processCommand(int command); | ||
} | ||
|
||
public static class CommandProcessorWorkflowImpl implements CommandProcessorWorkflow { | ||
|
||
private static class QueuedCommand { | ||
public int command; | ||
public CompletablePromise<String> promise; | ||
|
||
public QueuedCommand(int command, CompletablePromise<String> promise) { | ||
this.command = command; | ||
this.promise = promise; | ||
} | ||
} | ||
|
||
private ArrayList<QueuedCommand> commandQueue; | ||
private boolean done; | ||
|
||
public CommandProcessorWorkflowImpl() { | ||
this.commandQueue = new ArrayList<>(); | ||
this.done = false; | ||
} | ||
|
||
private final MyActivities activities = | ||
Workflow.newActivityStub( | ||
MyActivities.class, | ||
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()); | ||
|
||
@Override | ||
public String startProcessing() { | ||
while (true) { | ||
Workflow.await(() -> this.commandQueue.size() > 0 || this.done); | ||
if (this.done && this.commandQueue.size() == 0) { | ||
return "done"; | ||
} | ||
QueuedCommand queuedCommand = this.commandQueue.remove(0); | ||
String result = activities.processCommand(queuedCommand.command); | ||
queuedCommand.promise.complete(result); | ||
} | ||
} | ||
|
||
@Override | ||
public String submitCommand(int command) { | ||
if (command < 0) { | ||
this.done = true; | ||
return "stopping workflow"; | ||
} | ||
CompletablePromise<String> promise = Workflow.newPromise(); | ||
QueuedCommand queuedCommand = new QueuedCommand(command, promise); | ||
this.commandQueue.add(queuedCommand); | ||
return queuedCommand.promise.get(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Quinn-With-Two-Ns Is there a way here to free up the update thread? e.g. could the update handler return the promise, rather than block on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I'd be interested to know this, but haven't looked into it yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah we can explore alternative approaches that use less threads. Given we only allow 10 concurrent updates it wasn't prioritized, virtual threads also make this not a big deal. |
||
} | ||
} | ||
|
||
static class MyActivitiesImpl implements MyActivities { | ||
@Override | ||
public String processCommand(int commandNum) { | ||
try { | ||
// Earlier commands are slower, so we must serialize if they are to complete in order of | ||
// receipt. | ||
Thread.sleep(1000L * (3 - commandNum)); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
String result = commandNum + " [processed]"; | ||
System.out.println(result); | ||
return result; | ||
} | ||
} | ||
|
||
public static void main(String[] args) { | ||
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); | ||
WorkflowClient client = WorkflowClient.newInstance(service); | ||
WorkerFactory factory = WorkerFactory.newInstance(client); | ||
Worker worker = factory.newWorker(TASK_QUEUE); | ||
worker.registerWorkflowImplementationTypes(CommandProcessorWorkflowImpl.class); | ||
worker.registerActivitiesImplementations(new MyActivitiesImpl()); | ||
factory.start(); | ||
CommandProcessorWorkflow commandProcessor = | ||
client.newWorkflowStub( | ||
CommandProcessorWorkflow.class, | ||
WorkflowOptions.newBuilder() | ||
.setWorkflowId(WORKFLOW_ID) | ||
.setTaskQueue(TASK_QUEUE) | ||
.setWorkflowIdReusePolicy( | ||
// This is just for convenience during command-line development! | ||
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) | ||
.build()); | ||
|
||
WorkflowStub untypedWorkflowStub = WorkflowStub.fromTyped(commandProcessor); | ||
|
||
WorkflowClient.start(commandProcessor::startProcessing); | ||
|
||
CompletableFuture.allOf( | ||
untypedWorkflowStub.startUpdate("submitCommand", String.class, 1).getResultAsync(), | ||
untypedWorkflowStub.startUpdate("submitCommand", String.class, 2).getResultAsync()) | ||
.join(); | ||
commandProcessor.submitCommand(-1); | ||
untypedWorkflowStub.getResult(String.class); | ||
System.exit(0); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArrayList
works in this case, but preferWorkflowQueue
instead. It provides a more appropriate API, inspired from "Blocking Queue", but Workflow-safe (it internally useWorkflow.await()
to implement blocking ops).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Yes @Quinn-With-Two-Ns pointed that out to me also.