Skip to content

Commit 011dda3

Browse files
Honor batch timeout correctly in poller, and set default to 0 as it was previously inert
Signed-off-by: Peter Broadhurst <[email protected]>
1 parent 1c5f7e6 commit 011dda3

File tree

3 files changed

+27
-18
lines changed

3 files changed

+27
-18
lines changed

internal/coreconfig/coreconfig.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ func setDefaults() {
406406
viper.SetDefault(string(DownloadRetryFactor), 2.0)
407407
viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest)
408408
viper.SetDefault(string(EventAggregatorBatchSize), 200)
409-
viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms")
409+
viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms")
410410
viper.SetDefault(string(EventAggregatorPollTimeout), "30s")
411411
viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms")
412412
viper.SetDefault(string(EventAggregatorRewindQueueLength), 10)
@@ -416,7 +416,7 @@ func setDefaults() {
416416
viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s")
417417
viper.SetDefault(string(EventDBEventsBufferSize), 100)
418418
viper.SetDefault(string(EventDispatcherBufferLength), 5)
419-
viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms")
419+
viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms")
420420
viper.SetDefault(string(EventDispatcherPollTimeout), "30s")
421421
viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"})
422422
viper.SetDefault(string(EventTransportsDefault), "websockets")

internal/events/event_poller.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,12 @@ func (ep *eventPoller) eventLoop() {
196196
close(ep.offsetCommitted)
197197
}()
198198

199+
doBatchDelay := false
199200
for {
201+
if doBatchDelay {
202+
ep.waitForBatchTimeout()
203+
}
204+
200205
// Read messages from the DB - in an error condition we retry until success, or a closed context
201206
events, err := ep.readPage()
202207
if err != nil {
@@ -205,6 +210,15 @@ func (ep *eventPoller) eventLoop() {
205210
}
206211

207212
eventCount := len(events)
213+
214+
// We might want to wait for the batch to fill - so we delay and re-poll
215+
if ep.conf.eventBatchTimeout > 0 && !doBatchDelay && eventCount < ep.conf.eventBatchSize {
216+
doBatchDelay = true
217+
l.Tracef("Batch delay: detected=%d, batchSize=%d batchTimeout=%s", eventCount, ep.conf.eventBatchSize, ep.conf.eventBatchTimeout)
218+
continue
219+
}
220+
doBatchDelay = false // clear any batch delay for next iteration
221+
208222
repoll := false
209223
if eventCount > 0 {
210224
// We process all the events in the page in a single database run group, and
@@ -280,6 +294,16 @@ func (ep *eventPoller) shoulderTap() {
280294
}
281295
}
282296

297+
func (ep *eventPoller) waitForBatchTimeout() {
298+
// For throughput optimized environments, we can set an eventBatchingTimeout to allow
299+
// dispatching of incomplete batches at a shorter timeout than the
300+
// long timeout between polling cycles (at the cost of some dispatch latency)
301+
select {
302+
case <-time.After(ep.conf.eventBatchTimeout):
303+
case <-ep.ctx.Done():
304+
}
305+
}
306+
283307
func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool {
284308
l := log.L(ep.ctx)
285309
longTimeoutDuration := ep.conf.eventPollTimeout
@@ -289,21 +313,6 @@ func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool
289313
return true
290314
}
291315

292-
// For throughput optimized environments, we can set an eventBatchingTimeout to allow
293-
// dispatching of incomplete batches at a shorter timeout than the
294-
// long timeout between polling cycles (at the cost of some dispatch latency)
295-
if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 {
296-
shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout)
297-
select {
298-
case <-shortTimeout.C:
299-
l.Tracef("Woken after batch timeout")
300-
return true
301-
case <-ep.ctx.Done():
302-
l.Debugf("Exiting due to cancelled context")
303-
return false
304-
}
305-
}
306-
307316
longTimeout := time.NewTimer(longTimeoutDuration)
308317
select {
309318
case <-longTimeout.C:

internal/events/event_poller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func newTestEventPoller(t *testing.T, mdi *databasemocks.Plugin, neh newEventsHa
3636
ctx, cancel := context.WithCancel(context.Background())
3737
ep = newEventPoller(ctx, mdi, newEventNotifier(ctx, "ut"), &eventPollerConf{
3838
eventBatchSize: 10,
39-
eventBatchTimeout: 1 * time.Millisecond,
39+
eventBatchTimeout: 0, // customized for individual tests that enable this
4040
eventPollTimeout: 10 * time.Second,
4141
startupOffsetRetryAttempts: 1,
4242
retry: retry.Retry{

0 commit comments

Comments
 (0)