Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support orchestration id reuse policy #188

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .github/workflows/build-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,23 @@ jobs:
- name: Integration Tests with Gradle
run: ./gradlew integrationTest

- name: Stop Durable Task Sidecar
run: docker stop durabletask-sidecar

- name: Initialize Durable Task GO
run: docker run --name durabletask-go -p 4001:4001 -d kaibocai/durabletask-go:latest

- name: Integration GO Tests with Gradle
run: ./gradlew integrationGoTest

- name: Archive test report
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: Integration test report
path: client/build/reports/tests/integrationTest

- name: Upload JAR output
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: Package
path: client/build/libs
Expand Down Expand Up @@ -105,7 +114,7 @@ jobs:
arguments: endToEndTest

- name: Archive test report
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: Integration test report
path: client/build/reports/tests/endToEndTest
Expand Down Expand Up @@ -146,7 +155,7 @@ jobs:
arguments: sampleTest

- name: Archive test report
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: Integration test report
path: client/build/reports/tests/endToEndTest
27 changes: 25 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,45 @@
## placeholder
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v1.6.0 (unreleased)

### New

* Support orchestration id reuse policy ([#188](https://github.com/microsoft/durabletask-java/pull/188))

### Updates

* Fix infinite loop when use continueasnew after wait external event ([#183](https://github.com/microsoft/durabletask-java/pull/183))
* Fix the issue "Deserialize Exception got swallowed when use anyOf with external event." ([#185](https://github.com/microsoft/durabletask-java/pull/185))

## v1.5.0

### Updates

* Fix exception type issue when using `RetriableTask` in fan in/out pattern ([#174](https://github.com/microsoft/durabletask-java/pull/174))
* Add implementation to generate name-based deterministic UUID ([#176](https://github.com/microsoft/durabletask-java/pull/176))
* Update dependencies to resolve CVEs ([#177](https://github.com/microsoft/durabletask-java/pull/177))


## v1.4.0

### Updates

* Refactor `createTimer` to be non-blocking ([#161](https://github.com/microsoft/durabletask-java/pull/161))

## v1.3.0

### Updates

* Refactor `RetriableTask` and add new `CompoundTask`, fixing Fan-out/Fan-in stuck when using `RetriableTask` ([#157](https://github.com/microsoft/durabletask-java/pull/157))

## v1.2.0

### Updates

* Add `thenAccept` and `thenApply` to `Task` interface ([#148](https://github.com/microsoft/durabletask-java/pull/148))
* Support Suspend and Resume Client APIs ([#151](https://github.com/microsoft/durabletask-java/pull/151))
* Support restartInstance and pass restartPostUri in HttpManagementPayload ([#108](https://github.com/microsoft/durabletask-java/issues/108))
Expand All @@ -27,11 +48,13 @@
## v1.1.1

### Updates

* Fix exception occurring when invoking the `TaskOrchestrationContext#continueAsNew` method ([#118](https://github.com/microsoft/durabletask-java/issues/118))

## v1.1.0

### Updates

* Fix the potential NPE issue of `DurableTaskClient#terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104))
* Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115))
* Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114))
Expand Down
11 changes: 11 additions & 0 deletions client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ test {
// Skip tests tagged as "integration" since those are slower
// and require external dependencies.
excludeTags "integration"
excludeTags "integration-go"
}
}

Expand All @@ -96,6 +97,16 @@ task integrationTest(type: Test) {
testLogging.showStandardStreams = true
}

// integration-go runs against sidecar durabletask-go
task integrationGoTest(type: Test) {
useJUnitPlatform {
includeTags 'integration-go'
}
dependsOn build
shouldRunAfter test
testLogging.showStandardStreams = true
}

publishing {
repositories {
maven {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.google.protobuf.StringValue;
import com.google.protobuf.Timestamp;
import com.microsoft.durabletask.client.InstanceIdReuseAction;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*;
import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
Expand Down Expand Up @@ -87,6 +88,17 @@ public String scheduleNewOrchestrationInstance(
CreateInstanceRequest.Builder builder = CreateInstanceRequest.newBuilder();
builder.setName(orchestratorName);

// build orchestration ID reuse policy
OrchestrationIdReusePolicy.Builder reuseIdPolicyBuilder = OrchestrationIdReusePolicy.newBuilder();
// if options.getInstanceIdReuseAction() is null, default value will be ERROR
if (options.getInstanceIdReuseAction() != null) {
reuseIdPolicyBuilder.setAction(InstanceIdReuseAction.toProtobuf(options.getInstanceIdReuseAction()));
}
for (OrchestrationRuntimeStatus targetStatus : options.getTargetStatuses()) {
reuseIdPolicyBuilder.addOperationStatus(OrchestrationRuntimeStatus.toProtobuf(targetStatus));
}
builder.setOrchestrationIdReusePolicy(reuseIdPolicyBuilder);

String instanceId = options.getInstanceId();
if (instanceId == null) {
instanceId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
// Licensed under the MIT License.
package com.microsoft.durabletask;

import com.microsoft.durabletask.client.InstanceIdReuseAction;

import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Options for starting a new instance of an orchestration.
Expand All @@ -12,6 +17,8 @@ public final class NewOrchestrationInstanceOptions {
private String instanceId;
private Object input;
private Instant startTime;
private final Set<OrchestrationRuntimeStatus> targetStatuses = new HashSet<>();
private InstanceIdReuseAction instanceIdReuseAction;

/**
* Default constructor for the {@link NewOrchestrationInstanceOptions} class.
Expand Down Expand Up @@ -71,6 +78,112 @@ public NewOrchestrationInstanceOptions setStartTime(Instant startTime) {
return this;
}

/**
* Sets the target statuses for the reuse orchestration ID policy of the new orchestration instance.
* This method allows specifying the desired statuses for orchestrations with the same ID
* when configuring the orchestration ID reuse policy.
*
* <p>
* By default, the {@code targetStatuses} is empty. If an orchestration with the same instance ID
* already exists, an error will be thrown, indicating a duplicate orchestration instance.
* You can customize the orchestration ID reuse policy by setting the {@code targetStatuses}
* and {@code instanceIdReuseAction}.
*
* <p>
* For example, the following options will terminate an existing orchestration instance with the same instance ID
* if it's in RUNNING, FAILED, or COMPLETED runtime status:
* <pre>{@code
* NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions();
* options.addTargetStatus(OrchestrationRuntimeStatus.RUNNING, OrchestrationRuntimeStatus.FAILED,
* OrchestrationRuntimeStatus.COMPLETED);
* options.setInstanceIdReuseAction(InstanceIdReuseAction.TERMINATE);
* }</pre>
*
* @param statuses The target statuses for the reuse orchestration ID policy when creating the new orchestration instance.
* @return This {@link NewOrchestrationInstanceOptions} object.
*/
public NewOrchestrationInstanceOptions addTargetStatus(OrchestrationRuntimeStatus... statuses) {
for (OrchestrationRuntimeStatus status : statuses) {
this.addTargetStatus(status);
}
return this;
}

/**
* Sets the target statuses for the reuse orchestration ID policy of the new orchestration instance.
* This method allows specifying the desired statuses for orchestrations with the same ID
* when configuring the orchestration ID reuse policy.
*
* <p>
* By default, the {@code targetStatuses} is empty. If an orchestration with the same instance ID
* already exists, an error will be thrown, indicating a duplicate orchestration instance.
* You can customize the orchestration ID reuse policy by setting the {@code targetStatuses}
* and {@code instanceIdReuseAction}.
*
* <p>
* For example, the following options will terminate an existing orchestration instance with the same instance ID
* if it's in RUNNING, FAILED, or COMPLETED runtime status:
*<pre>{@code
* NewOrchestrationInstanceOptions option = new NewOrchestrationInstanceOptions();
* List<OrchestrationRuntimeStatus> statuses = new ArrayList<>();
* statuses.add(RUNNING);
* statuses.add(FAILED);
* statuses.add(COMPLETED);
* option.setTargetStatus(statuses);
* option.setInstanceIdReuseAction(TERMINATE);
* }
*</pre>
* @param statuses A list of target statuses for the reuse orchestration ID policy of creating the new orchestration instance.
* @return this {@link NewOrchestrationInstanceOptions} object
*/
public NewOrchestrationInstanceOptions setTargetStatus(List<OrchestrationRuntimeStatus> statuses) {
for (OrchestrationRuntimeStatus status : statuses) {
this.addTargetStatus(status);
}
return this;
}

private void addTargetStatus(OrchestrationRuntimeStatus status) {
this.targetStatuses.add(status);
}

/**
* Sets the target action for the reuse orchestration ID policy of the new orchestration instance.
* This method allows specifying the desired action for orchestrations with the same ID
* when configuring the orchestration ID reuse policy.
*
* <p>
* By default, the {@code instanceIdReuseAction} is {@code InstanceIdReuseAction.ERROR}. If an orchestration with the same instance ID
* already exists, an error will be thrown, indicating a duplicate orchestration instance.
* You can customize the orchestration ID reuse policy by setting the {@code targetStatuses}
* and {@code instanceIdReuseAction}.
*
* <ul>
* <li>{@code InstanceIdReuseAction.ERROR}: Throws an error if an orchestration with the same instance ID already exists.</li>
* <li>{@code InstanceIdReuseAction.IGNORE}: Returns directly if an orchestration with the same instance ID already exists
* and is in the specified target statuses.</li>
* <li>{@code InstanceIdReuseAction.Terminate}: Terminates the existing orchestration instance with the same instance ID
* if it is in the specified target statuses, and creates a new instance with the same instance ID.</li>
* </ul>
*
* <p>
* For example, the following options will terminate an existing orchestration instance with the same instance ID
* if it's in RUNNING, FAILED, or COMPLETED runtime status:
* <pre>{@code
* NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions();
* options.addTargetStatus(OrchestrationRuntimeStatus.RUNNING, OrchestrationRuntimeStatus.FAILED,
* OrchestrationRuntimeStatus.COMPLETED);
* options.setInstanceIdReuseAction(InstanceIdReuseAction.TERMINATE);
* }</pre>
*
* @param instanceIdReuseAction The target action for the reuse orchestration ID policy when creating the new orchestration instance.
* @return This {@link NewOrchestrationInstanceOptions} object.
*/
public NewOrchestrationInstanceOptions setInstanceIdReuseAction(InstanceIdReuseAction instanceIdReuseAction) {
this.instanceIdReuseAction = instanceIdReuseAction;
return this;
}

/**
* Gets the user-specified version of the new orchestration.
*
Expand Down Expand Up @@ -106,4 +219,22 @@ public Object getInput() {
public Instant getStartTime() {
return this.startTime;
}

/**
* Gets the target statuses for the reuse orchestration ID policy of the new orchestration instance.
*
* @return The target statuses for the reuse orchestration ID policy when creating the new orchestration instance.
*/
public Set<OrchestrationRuntimeStatus> getTargetStatuses() {
return this.targetStatuses;
}

/**
* Gets the target action for the reuse orchestration ID policy of the new orchestration instance.
*
* @return The target action for the reuse orchestration ID policy when creating the new orchestration instance.
*/
public InstanceIdReuseAction getInstanceIdReuseAction() {
return this.instanceIdReuseAction;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.microsoft.durabletask.client;

import com.microsoft.durabletask.implementation.protobuf.OrchestratorService;

public enum InstanceIdReuseAction {
ERROR,
IGNORE,
TERMINATE;

public static OrchestratorService.CreateOrchestrationAction toProtobuf(
InstanceIdReuseAction action) {
switch (action) {
case ERROR:
return OrchestratorService.CreateOrchestrationAction.ERROR;
case IGNORE:
return OrchestratorService.CreateOrchestrationAction.IGNORE;
case TERMINATE:
return OrchestratorService.CreateOrchestrationAction.TERMINATE;
default:
throw new IllegalArgumentException(String.format("Unknown action value: %s", action));
}
}
}
Loading
Loading