Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reusing orchestration id #46

Merged
merged 17 commits into from
Dec 11, 2023
13 changes: 12 additions & 1 deletion .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ jobs:

- name: Install dependencies
run: go get .


- name: Install Protoc
uses: arduino/setup-protoc@v2
kaibocai marked this conversation as resolved.
Show resolved Hide resolved

- name: Installing protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]
kaibocai marked this conversation as resolved.
Show resolved Hide resolved

- name: Generate grpc code
run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto

- name: Run integration tests
run: go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers
74 changes: 70 additions & 4 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
)

var (
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
ErrDuplicateInstance = errors.New("orchestration instance already exists")

EmptyInstanceID = InstanceID("")
)
Expand All @@ -36,6 +37,32 @@ type OrchestrationMetadata struct {
FailureDetails *protos.TaskFailureDetails
}

type CreateOrchestrationAction int

const (
THROW CreateOrchestrationAction = iota
SKIP
TERMINATE
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
)

type OrchestrationStatus int

const (
RUNNING OrchestrationStatus = iota
COMPLETED
// CONTINUED_AS_NEW
FAILED
// CANCELED
TERMINATED
PENDING
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
// SUSPENDED
)

type OrchestrationIDReuseOption struct {
CreateOrchestrationAction CreateOrchestrationAction
OrchestrationStatuses []OrchestrationStatus
}

// NewOrchestrationOptions configures options for starting a new orchestration.
type NewOrchestrationOptions func(*protos.CreateInstanceRequest) error

Expand All @@ -57,6 +84,45 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions {
}
}

// WithOrchestrationReuseOption configures Orchestration ID reuse policy.
cgillum marked this conversation as resolved.
Show resolved Hide resolved
func WithOrchestrationReuseOption(option *OrchestrationIDReuseOption) NewOrchestrationOptions {
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
return func(req *protos.CreateInstanceRequest) error {
req.CreateInstanceOption = &protos.CreateInstanceOption{}
// set action
switch option.CreateOrchestrationAction {
case SKIP:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_SKIP
case TERMINATE:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_TERMINATE
case THROW:
req.CreateInstanceOption.Action = protos.CreateOrchestrationAction_THROW
}

// set status
for _, status := range option.OrchestrationStatuses {
switch status {
case RUNNING:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_RUNNING)
case COMPLETED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED)
// case CONTINUED_AS_NEW:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_CONTINUED_AS_NEW)
case FAILED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED)
// case CANCELED:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_CANCELED)
case TERMINATED:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED)
case PENDING:
req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_PENDING)
// case SUSPENDED:
// req.CreateInstanceOption.OperationStatus = append(req.CreateInstanceOption.OperationStatus, protos.OrchestrationStatus_ORCHESTRATION_STATUS_SUSPENDED)
}
}
return nil
}
}

// WithInput configures an input for the orchestration. The specified input must be serializable.
func WithInput(input any) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
Expand Down
5 changes: 5 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type Backend interface {
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
// [api.ErrNotCompleted] is returned if the specified orchestration instance is still running.
PurgeOrchestrationState(context.Context, api.InstanceID) error

// CleanupOrchestration clean up all records for the specified orchestration instance in the entire task hub.
//
// [api.ErrInstanceNotFound] is returned if the specified orchestration instance doesn't exist.
CleanupOrchestration(context.Context, api.InstanceID) error
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
}

// MarshalHistoryEvent serializes the [HistoryEvent] into a protobuf byte array.
Expand Down
49 changes: 47 additions & 2 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -310,14 +311,58 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
ctx, span := helpers.StartNewCreateOrchestrationSpan(ctx, req.Name, req.Version.GetValue(), instanceID)
defer span.End()

// retreive instance with instanceID
metadata, err := g.backend.GetOrchestrationMetadata(ctx, api.InstanceID(instanceID))
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// if the instance doesn't exist, create instance directly.
if errors.Is(err, api.ErrInstanceNotFound) {
return createInstance(ctx, g.backend, instanceID, req, span)
} else {
return nil, err
}
}

// build target status set
statusSet := convertStatusToSet(req.CreateInstanceOption.OperationStatus)

// if current status is not one of the target status, create instance directly
if !statusSet[metadata.RuntimeStatus] {
return createInstance(ctx, g.backend, instanceID, req, span)
} else {
if req.CreateInstanceOption.Action == protos.CreateOrchestrationAction_THROW {
// throw ErrDuplicateEvent since instance already exists and the status is in target status set
return nil, api.ErrDuplicateInstance
} else if req.CreateInstanceOption.Action == protos.CreateOrchestrationAction_SKIP {
// skip creating new instance
g.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", instanceID)
return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
} else {
// CreateInstanceAction_TERMINATE
// terminate existing instance and create a new one
if err := g.backend.CleanupOrchestration(ctx, api.InstanceID(instanceID)); err != nil {
return nil, err
}
return createInstance(ctx, g.backend, instanceID, req, span)
}
}
}

func createInstance(ctx context.Context, be Backend, instanceID string, req *protos.CreateInstanceRequest, span trace.Span) (*protos.CreateInstanceResponse, error) {
e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
if err := g.backend.CreateOrchestrationInstance(ctx, e); err != nil {
if err := be.CreateOrchestrationInstance(ctx, e); err != nil {
return nil, err
}

return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
}

func convertStatusToSet(statuses []protos.OrchestrationStatus) map[protos.OrchestrationStatus]bool {
statusSet := make(map[protos.OrchestrationStatus]bool)
for _, status := range statuses {
statusSet[status] = true
}
return statusSet
}

// TerminateInstance implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) TerminateInstance(ctx context.Context, req *protos.TerminateRequest) (*protos.TerminateResponse, error) {
e := helpers.NewExecutionTerminatedEvent(req.Output, req.Recursive)
Expand Down
51 changes: 51 additions & 0 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,57 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins
return nil
}

func (be *sqliteBackend) CleanupOrchestration(ctx context.Context, id api.InstanceID) error {
if err := be.ensureDB(); err != nil {
return err
}

tx, err := be.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else {
return fmt.Errorf("failed to scan instance existence: %w", err)
}
}

_, err = tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewEvents table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewTasks table: %w", err)
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
}

// Start implements backend.Backend
func (*sqliteBackend) Start(context.Context) error {
return nil
Expand Down
Loading
Loading