Skip to content

Commit

Permalink
Fix nil pointer dereference when consuming events (#48)
Browse files Browse the repository at this point in the history
If an event is received after the internal timer waiting for it has
elapsed, it should be placed into the pending events queue to be handled
by the next WaitForSingleEvent invocation. However, the internal timer
expiration callback does not reset the pending task list correctly,
causing the event consumer to attempt to dereference an element of an
empty list.

This change adjusts the logic of the timer to completely eradicate the
list from the pending event key map when it's empty, which matches the
behavior of other orchestrator cleanup routines.
  • Loading branch information
impl authored Dec 14, 2023
1 parent 86338c4 commit aa335e2
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Support reusing orchestration id ([#46](https://github.com/microsoft/durabletask-go/pull/46)) - contributed by [@kaibocai](https://github.com/kaibocai)

### Fixed

- Fix nil pointer dereference when consuming events ([#48](https://github.com/microsoft/durabletask-go/pull/48)) - contributed by [@impl](https://github.com/impl)

## [v0.3.1] - 2023-09-08

### Fixed
Expand Down
6 changes: 5 additions & 1 deletion task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,11 @@ func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout ti
if timeout > 0 {
ctx.createTimerInternal(timeout).onCompleted(func() {
task.cancel()
taskList.Remove(taskElement)
if taskList.Len() > 1 {
taskList.Remove(taskElement)
} else {
delete(ctx.pendingExternalEventTasks, key)
}
})
}
}
Expand Down
49 changes: 49 additions & 0 deletions tests/orchestrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,55 @@ func Test_ContinueAsNew_Events(t *testing.T) {
assert.Equal(t, `10`, metadata.SerializedOutput)
}

func Test_ExternalEventContention(t *testing.T) {
// Registration
r := task.NewTaskRegistry()
r.AddOrchestratorN("ContinueAsNewTest", func(ctx *task.OrchestrationContext) (any, error) {
var data int32
if err := ctx.WaitForSingleEvent("MyEventData", 1*time.Second).Await(&data); err != nil && !errors.Is(err, task.ErrTaskCanceled) {
return nil, err
}

var complete bool
if err := ctx.WaitForSingleEvent("MyEventSignal", -1).Await(&complete); err != nil {
return nil, err
}

if complete {
return data, nil
}

ctx.ContinueAsNew(nil, task.WithKeepUnprocessedEvents())
return nil, nil
})

// Initialization
ctx := context.Background()
client, worker := initTaskHubWorker(ctx, r)
defer worker.Shutdown(ctx)

// Run the orchestration
id, err := client.ScheduleNewOrchestration(ctx, "ContinueAsNewTest")
require.NoError(t, err)

// Wait for the timer to elapse
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
_, err = client.WaitForOrchestrationCompletion(timeoutCtx, id)
require.ErrorIs(t, err, timeoutCtx.Err())

// Now raise the event, which should queue correctly for the next time
// around
require.NoError(t, client.RaiseEvent(ctx, id, "MyEventData", api.WithEventPayload(42)))
require.NoError(t, client.RaiseEvent(ctx, id, "MyEventSignal", api.WithEventPayload(false)))
require.NoError(t, client.RaiseEvent(ctx, id, "MyEventSignal", api.WithEventPayload(true)))

metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
require.NoError(t, err)
assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus)
assert.Equal(t, `42`, metadata.SerializedOutput)
}

func Test_ExternalEventOrchestration(t *testing.T) {
const eventCount = 10

Expand Down

0 comments on commit aa335e2

Please sign in to comment.