Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reusing orchestration id #46

Merged
merged 17 commits into from
Dec 11, 2023
21 changes: 20 additions & 1 deletion .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ on:
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

env:
# Configure protoc and go grpc plugin
PROTOC_VERSION: "25.x"
PROTOC_GEN_GO: "v1.28"
PROTOC_GEN_GO_GRPC: "v1.2"

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
Expand All @@ -36,6 +42,19 @@ jobs:

- name: Install dependencies
run: go get .


- name: Install Protoc
uses: arduino/setup-protoc@v2
kaibocai marked this conversation as resolved.
Show resolved Hide resolved
with:
version: ${{ env.PROTOC_VERSION }}

- name: Installing protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/protoc-gen-go@$PROTOC_GEN_GO
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@$PROTOC_GEN_GO_GRPC

- name: Generate grpc code
run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto

- name: Run integration tests
run: go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed

- Support reusing orchestration id ([#46](https://github.com/microsoft/durabletask-go/pull/46)) - contributed by [@kaibocai](https://github.com/kaibocai)

## [v0.3.1] - 2023-09-08

### Fixed
Expand Down
22 changes: 18 additions & 4 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
)

var (
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
ErrDuplicateInstance = errors.New("orchestration instance already exists")
ErrIgnoreInstance = errors.New("ignore creating orchestration instance")

EmptyInstanceID = InstanceID("")
)
Expand Down Expand Up @@ -57,6 +59,18 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions {
}
}

// WithOrchestrationIdReusePolicy configures Orchestration ID reuse policy.
func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
// initialize CreateInstanceOption
req.OrchestrationIdReusePolicy = &protos.OrchestrationIdReusePolicy{
Action: policy.Action,
OperationStatus: policy.OperationStatus,
}
return nil
}
}

// WithInput configures an input for the orchestration. The specified input must be serializable.
func WithInput(input any) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
Expand Down
14 changes: 13 additions & 1 deletion backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ type (
TaskFailureDetails = protos.TaskFailureDetails
)

type OrchestrationIdReusePolicyOptions func(*protos.OrchestrationIdReusePolicy) error

func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) OrchestrationIdReusePolicyOptions {
cgillum marked this conversation as resolved.
Show resolved Hide resolved
return func(po *protos.OrchestrationIdReusePolicy) error {
if policy != nil {
po.Action = policy.Action
po.OperationStatus = policy.OperationStatus
}
return nil
}
}

type Backend interface {
// CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent.
//
Expand All @@ -43,7 +55,7 @@ type Backend interface {

// CreateOrchestrationInstance creates a new orchestration instance with a history event that
// wraps a ExecutionStarted event.
CreateOrchestrationInstance(context.Context, *HistoryEvent) error
CreateOrchestrationInstance(context.Context, *HistoryEvent, ...OrchestrationIdReusePolicyOptions) error

// AddNewEvent adds a new orchestration event to the specified orchestration instance.
AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error
Expand Down
2 changes: 1 addition & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
defer span.End()

e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
if err := g.backend.CreateOrchestrationInstance(ctx, e); err != nil {
if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
cgillum marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}

Expand Down
184 changes: 142 additions & 42 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen
be.dsn = opts.FilePath
}

// used for local debug
// be.dsn = "file:file.sqlite"

return be
}

Expand Down Expand Up @@ -333,8 +336,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
for _, msg := range wi.State.PendingMessages() {
if es := msg.HistoryEvent.GetExecutionStarted(); es != nil {
// Need to insert a new row into the DB
var instanceID string
if err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, &instanceID); err != nil {
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx); err != nil {
if err == backend.ErrDuplicateEvent {
be.logger.Warnf(
"%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.",
Expand Down Expand Up @@ -390,7 +392,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
}

// CreateOrchestrationInstance implements backend.Backend
func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent) error {
func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, opts ...backend.OrchestrationIdReusePolicyOptions) error {
if err := be.ensureDB(); err != nil {
return err
}
Expand All @@ -402,7 +404,10 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
defer tx.Rollback()

var instanceID string
if err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil {
if instanceID, err = be.createOrchestrationInstanceInternal(ctx, e, tx, opts...); errors.Is(err, api.ErrIgnoreInstance) {
// choose to ignore, do nothing
return nil
} else if err != nil {
return err
}

Expand All @@ -429,19 +434,45 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
return nil
}

func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, instanceID *string) error {
func buildStatusSet(statuses []protos.OrchestrationStatus) map[protos.OrchestrationStatus]struct{} {
statusSet := make(map[protos.OrchestrationStatus]struct{})
for _, status := range statuses {
statusSet[status] = struct{}{}
}
return statusSet
}

func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, opts ...backend.OrchestrationIdReusePolicyOptions) (string, error) {
if e == nil {
return errors.New("HistoryEvent must be non-nil")
return "", errors.New("HistoryEvent must be non-nil")
} else if e.Timestamp == nil {
return errors.New("HistoryEvent must have a non-nil timestamp")
return "", errors.New("HistoryEvent must have a non-nil timestamp")
}

startEvent := e.GetExecutionStarted()
if startEvent == nil {
return errors.New("HistoryEvent must be an ExecutionStartedEvent")
return "", errors.New("HistoryEvent must be an ExecutionStartedEvent")
}
instanceID := startEvent.OrchestrationInstance.InstanceId

policy := &protos.OrchestrationIdReusePolicy{}

for _, opt := range opts {
opt(policy)
}

// TODO: Support for re-using orchestration instance IDs
rows, err := insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent)
if err != nil {
return "", err
}

if rows <= 0 {
return instanceID, be.handleInstanceExists(ctx, tx, startEvent, policy, e)
}
return instanceID, nil
}

func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx *sql.Tx, e *backend.HistoryEvent, startEvent *protos.ExecutionStartedEvent) (int64, error) {
res, err := tx.ExecContext(
ctx,
`INSERT OR IGNORE INTO [Instances] (
Expand All @@ -462,19 +493,114 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context
e.Timestamp.AsTime(),
)
if err != nil {
return fmt.Errorf("failed to insert into [Instances] table: %w", err)
return -1, fmt.Errorf("failed to insert into [Instances] table: %w", err)
}

rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to count the rows affected: %w", err)
return -1, fmt.Errorf("failed to count the rows affected: %w", err)
}
return rows, nil;
}

if rows <= 0 {
return backend.ErrDuplicateEvent
func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, startEvent *protos.ExecutionStartedEvent, policy *protos.OrchestrationIdReusePolicy, e *backend.HistoryEvent) error {
// query RuntimeStatus for the existing instance
queryRow := tx.QueryRowContext(
ctx,
`SELECT [RuntimeStatus] FROM Instances WHERE [InstanceID] = ?`,
startEvent.OrchestrationInstance.InstanceId,
)
var runtimeStatus *string
err := queryRow.Scan(&runtimeStatus)
if errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else if err != nil {
return fmt.Errorf("failed to scan the Instances table result: %w", err)
}

// instance already exists
targetStatusValues := buildStatusSet(policy.OperationStatus)
// status not match, return instance duplicate error
if _, ok := targetStatusValues[helpers.FromRuntimeStatusString(*runtimeStatus)]; !ok {
return api.ErrDuplicateInstance
}

// status match
switch policy.Action {
case protos.CreateOrchestrationAction_IGNORE:
// Log an warning message and ignore creating new instance
be.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", startEvent.OrchestrationInstance.InstanceId)
return api.ErrIgnoreInstance
case protos.CreateOrchestrationAction_TERMINATE:
// terminate existing instance
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId),false); err != nil {
return err
}
// create a new instance
var rows int64
if rows, err = insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent); err != nil {
return err
}

// should never happen, because we clean up instance before create new one
if rows <= 0 {
return fmt.Errorf("failed to insert into [Instances] table because entry already exists.")
}
return nil
}
// default behavior
return api.ErrDuplicateInstance
}

func (be *sqliteBackend) cleanupOrchestrationStateInternal(ctx context.Context, tx *sql.Tx, id api.InstanceID, onlyIfCompleted bool) error {
row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else if err != nil {
return fmt.Errorf("failed to scan instance existence: %w", err)
}

*instanceID = startEvent.OrchestrationInstance.InstanceId
if onlyIfCompleted {
// purge orchestration in ['COMPLETED', 'FAILED', 'TERMINATED']
dbResult, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ? AND [RuntimeStatus] IN ('COMPLETED', 'FAILED', 'TERMINATED')", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

rowsAffected, err := dbResult.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected in Instances delete operation: %w", err)
}
if rowsAffected == 0 {
return api.ErrNotCompleted
}
} else {
// clean up orchestration in all [RuntimeStatus]
_, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}
}

_, err := tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewEvents table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id))
cgillum marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to delete from NewTasks table: %w", err)
}
return nil
}

Expand Down Expand Up @@ -837,34 +963,8 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins
}
defer tx.Rollback()

row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); err == sql.ErrNoRows {
return api.ErrInstanceNotFound
} else if err != nil {
return fmt.Errorf("failed to scan instance existence: %w", err)
}

dbResult, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ? AND [RuntimeStatus] IN ('COMPLETED', 'FAILED', 'TERMINATED')", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

rowsAffected, err := dbResult.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected in Instances delete operation: %w", err)
}
if rowsAffected == 0 {
return api.ErrNotCompleted
}

_, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
if err := be.cleanupOrchestrationStateInternal(ctx, tx, id, true); err != nil {
return err
}

if err = tx.Commit(); err != nil {
Expand Down
Loading
Loading