diff --git a/CHANGELOG.md b/CHANGELOG.md index f2f399b..b3903ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support reusing orchestration id ([#46](https://github.com/microsoft/durabletask-go/pull/46)) - contributed by [@kaibocai](https://github.com/kaibocai) +### Fixed + +- Fix nil pointer dereference when consuming events ([#48](https://github.com/microsoft/durabletask-go/pull/48)) - contributed by [@impl](https://github.com/impl) + ## [v0.3.1] - 2023-09-08 ### Fixed diff --git a/task/orchestrator.go b/task/orchestrator.go index 9c3a75a..d51041a 100644 --- a/task/orchestrator.go +++ b/task/orchestrator.go @@ -323,7 +323,11 @@ func (ctx *OrchestrationContext) WaitForSingleEvent(eventName string, timeout ti if timeout > 0 { ctx.createTimerInternal(timeout).onCompleted(func() { task.cancel() - taskList.Remove(taskElement) + if taskList.Len() > 1 { + taskList.Remove(taskElement) + } else { + delete(ctx.pendingExternalEventTasks, key) + } }) } } diff --git a/tests/orchestrations_test.go b/tests/orchestrations_test.go index 87da470..318c7e4 100644 --- a/tests/orchestrations_test.go +++ b/tests/orchestrations_test.go @@ -473,6 +473,55 @@ func Test_ContinueAsNew_Events(t *testing.T) { assert.Equal(t, `10`, metadata.SerializedOutput) } +func Test_ExternalEventContention(t *testing.T) { + // Registration + r := task.NewTaskRegistry() + r.AddOrchestratorN("ContinueAsNewTest", func(ctx *task.OrchestrationContext) (any, error) { + var data int32 + if err := ctx.WaitForSingleEvent("MyEventData", 1*time.Second).Await(&data); err != nil && !errors.Is(err, task.ErrTaskCanceled) { + return nil, err + } + + var complete bool + if err := ctx.WaitForSingleEvent("MyEventSignal", -1).Await(&complete); err != nil { + return nil, err + } + + if complete { + return data, nil + } + + ctx.ContinueAsNew(nil, task.WithKeepUnprocessedEvents()) + return nil, nil + }) + + // Initialization + ctx := context.Background() + client, worker := initTaskHubWorker(ctx, r) + defer worker.Shutdown(ctx) + + // Run the orchestration + id, err := client.ScheduleNewOrchestration(ctx, "ContinueAsNewTest") + require.NoError(t, err) + + // Wait for the timer to elapse + timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + _, err = client.WaitForOrchestrationCompletion(timeoutCtx, id) + require.ErrorIs(t, err, timeoutCtx.Err()) + + // Now raise the event, which should queue correctly for the next time + // around + require.NoError(t, client.RaiseEvent(ctx, id, "MyEventData", api.WithEventPayload(42))) + require.NoError(t, client.RaiseEvent(ctx, id, "MyEventSignal", api.WithEventPayload(false))) + require.NoError(t, client.RaiseEvent(ctx, id, "MyEventSignal", api.WithEventPayload(true))) + + metadata, err := client.WaitForOrchestrationCompletion(ctx, id) + require.NoError(t, err) + assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus) + assert.Equal(t, `42`, metadata.SerializedOutput) +} + func Test_ExternalEventOrchestration(t *testing.T) { const eventCount = 10