From 011dda37a5f718fbd75c176b9fb8a4bad8bde7d6 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 16 Jan 2024 11:56:48 -0500 Subject: [PATCH] Honor batch timeout correctly in poller, and set default to 0 as it was previously inert Signed-off-by: Peter Broadhurst --- internal/coreconfig/coreconfig.go | 4 +-- internal/events/event_poller.go | 39 +++++++++++++++++----------- internal/events/event_poller_test.go | 2 +- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/internal/coreconfig/coreconfig.go b/internal/coreconfig/coreconfig.go index 6c2f7812f..6ae2b8a78 100644 --- a/internal/coreconfig/coreconfig.go +++ b/internal/coreconfig/coreconfig.go @@ -406,7 +406,7 @@ func setDefaults() { viper.SetDefault(string(DownloadRetryFactor), 2.0) viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest) viper.SetDefault(string(EventAggregatorBatchSize), 200) - viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms") + viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms") viper.SetDefault(string(EventAggregatorPollTimeout), "30s") viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms") viper.SetDefault(string(EventAggregatorRewindQueueLength), 10) @@ -416,7 +416,7 @@ func setDefaults() { viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s") viper.SetDefault(string(EventDBEventsBufferSize), 100) viper.SetDefault(string(EventDispatcherBufferLength), 5) - viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms") + viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms") viper.SetDefault(string(EventDispatcherPollTimeout), "30s") viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"}) viper.SetDefault(string(EventTransportsDefault), "websockets") diff --git a/internal/events/event_poller.go b/internal/events/event_poller.go index 065b4ed8c..cd509cc20 100644 --- a/internal/events/event_poller.go +++ b/internal/events/event_poller.go @@ -196,7 +196,12 @@ func (ep *eventPoller) eventLoop() { close(ep.offsetCommitted) }() + doBatchDelay := false for { + if doBatchDelay { + ep.waitForBatchTimeout() + } + // Read messages from the DB - in an error condition we retry until success, or a closed context events, err := ep.readPage() if err != nil { @@ -205,6 +210,15 @@ func (ep *eventPoller) eventLoop() { } eventCount := len(events) + + // We might want to wait for the batch to fill - so we delay and re-poll + if ep.conf.eventBatchTimeout > 0 && !doBatchDelay && eventCount < ep.conf.eventBatchSize { + doBatchDelay = true + l.Tracef("Batch delay: detected=%d, batchSize=%d batchTimeout=%s", eventCount, ep.conf.eventBatchSize, ep.conf.eventBatchTimeout) + continue + } + doBatchDelay = false // clear any batch delay for next iteration + repoll := false if eventCount > 0 { // We process all the events in the page in a single database run group, and @@ -280,6 +294,16 @@ func (ep *eventPoller) shoulderTap() { } } +func (ep *eventPoller) waitForBatchTimeout() { + // For throughput optimized environments, we can set an eventBatchingTimeout to allow + // dispatching of incomplete batches at a shorter timeout than the + // long timeout between polling cycles (at the cost of some dispatch latency) + select { + case <-time.After(ep.conf.eventBatchTimeout): + case <-ep.ctx.Done(): + } +} + func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool { l := log.L(ep.ctx) longTimeoutDuration := ep.conf.eventPollTimeout @@ -289,21 +313,6 @@ func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool return true } - // For throughput optimized environments, we can set an eventBatchingTimeout to allow - // dispatching of incomplete batches at a shorter timeout than the - // long timeout between polling cycles (at the cost of some dispatch latency) - if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 { - shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout) - select { - case <-shortTimeout.C: - l.Tracef("Woken after batch timeout") - return true - case <-ep.ctx.Done(): - l.Debugf("Exiting due to cancelled context") - return false - } - } - longTimeout := time.NewTimer(longTimeoutDuration) select { case <-longTimeout.C: diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go index 8215d2c8a..04f2ac8fc 100644 --- a/internal/events/event_poller_test.go +++ b/internal/events/event_poller_test.go @@ -36,7 +36,7 @@ func newTestEventPoller(t *testing.T, mdi *databasemocks.Plugin, neh newEventsHa ctx, cancel := context.WithCancel(context.Background()) ep = newEventPoller(ctx, mdi, newEventNotifier(ctx, "ut"), &eventPollerConf{ eventBatchSize: 10, - eventBatchTimeout: 1 * time.Millisecond, + eventBatchTimeout: 0, // customized for individual tests that enable this eventPollTimeout: 10 * time.Second, startupOffsetRetryAttempts: 1, retry: retry.Retry{