Skip to content

Commit

Permalink
Work through validation of batch logic
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Jan 16, 2024
1 parent 011dda3 commit 93894ff
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 2 deletions.
4 changes: 2 additions & 2 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ nav_order: 2
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000`
Expand All @@ -249,7 +249,7 @@ nav_order: 2

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms`
|bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5`
|pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`

Expand Down
165 changes: 165 additions & 0 deletions internal/events/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,171 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) {
mdm.AssertExpectations(t)
}

func TestEventDispatcherBatchBased(t *testing.T) {
log.SetLevel("debug")
three := uint16(3)
longTime := "1m"
subID := fftypes.NewUUID()
truthy := true
sub := &subscription{
dispatcherElection: make(chan bool, 1),
definition: &core.Subscription{
SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"},
Options: core.SubscriptionOptions{
SubscriptionCoreOptions: core.SubscriptionCoreOptions{
Batch: &truthy,
ReadAhead: &three,
BatchTimeout: &longTime, // because the batch should fill
},
},
},
eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
}

ed, cancel := newTestEventDispatcher(sub)
defer cancel()
go ed.deliverEvents()
ed.eventPoller.offsetCommitted = make(chan int64, 3)
mdi := ed.database.(*databasemocks.Plugin)
mei := ed.transport.(*eventsmocks.Plugin)
mdm := ed.data.(*datamocks.Manager)

eventDeliveries := make(chan []*core.CombinedEventDataDelivery)
deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
deliveryRequestMock.RunFn = func(a mock.Arguments) {
eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery)
}

// Setup the IDs
ref1 := fftypes.NewUUID()
ev1 := fftypes.NewUUID()
ref2 := fftypes.NewUUID()
ev2 := fftypes.NewUUID()
ref3 := fftypes.NewUUID()
ev3 := fftypes.NewUUID()
ref4 := fftypes.NewUUID()
ev4 := fftypes.NewUUID()

// Setup enrichment
mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{
Header: core.MessageHeader{ID: ref1},
}, nil, true, nil)
mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{
Header: core.MessageHeader{ID: ref2},
}, nil, true, nil)
mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{
Header: core.MessageHeader{ID: ref3},
}, nil, true, nil)
mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{
Header: core.MessageHeader{ID: ref4},
}, nil, true, nil)

// Deliver a batch of messages
batch1Done := make(chan struct{})
go func() {
repoll, err := ed.bufferedDelivery([]core.LocallySequenced{
&core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match
&core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected},
&core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match
&core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match
})
assert.NoError(t, err)
assert.True(t, repoll)
close(batch1Done)
}()

// Expect to get the batch dispatched - with the three matching events
events := <-eventDeliveries
assert.Len(t, events, 3)
assert.Equal(t, *ev1, *events[0].Event.ID)
assert.Equal(t, *ref1, *events[0].Event.Message.Header.ID)
assert.Equal(t, *ev3, *events[1].Event.ID)
assert.Equal(t, *ref3, *events[1].Event.Message.Header.ID)
assert.Equal(t, *ev4, *events[2].Event.ID)
assert.Equal(t, *ref4, *events[2].Event.Message.Header.ID)

// Ack the batch
go func() {
ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[0].Event.ID})
ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[1].Event.ID})
ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[2].Event.ID})
}()

assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted)
assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted)
assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted)

// This should complete the batch
<-batch1Done

mdi.AssertExpectations(t)
mei.AssertExpectations(t)
mdm.AssertExpectations(t)
}

func TestEventDispatcherBatchDispatchFail(t *testing.T) {
log.SetLevel("debug")
two := uint16(2)
longTime := "1m"
subID := fftypes.NewUUID()
truthy := true
sub := &subscription{
dispatcherElection: make(chan bool, 1),
definition: &core.Subscription{
SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"},
Options: core.SubscriptionOptions{
SubscriptionCoreOptions: core.SubscriptionCoreOptions{
Batch: &truthy,
ReadAhead: &two,
BatchTimeout: &longTime, // because the batch should fill
},
},
},
eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)),
}

ed, cancel := newTestEventDispatcher(sub)
defer cancel()
go ed.deliverEvents()
ed.eventPoller.offsetCommitted = make(chan int64, 3)
mdi := ed.database.(*databasemocks.Plugin)
mei := ed.transport.(*eventsmocks.Plugin)
mdm := ed.data.(*datamocks.Manager)

eventDeliveries := make(chan []*core.CombinedEventDataDelivery)
deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
deliveryRequestMock.RunFn = func(a mock.Arguments) {
eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery)
}

// Deliver a batch of messages
batch1Done := make(chan struct{})
go func() {
repoll, err := ed.bufferedDelivery([]core.LocallySequenced{
&core.Event{ID: fftypes.NewUUID(), Sequence: 10000001, Type: core.EventTypeMessageConfirmed},
&core.Event{ID: fftypes.NewUUID(), Sequence: 10000002, Type: core.EventTypeMessageConfirmed},
})
assert.NoError(t, err)
assert.True(t, repoll)
close(batch1Done)
}()

mdm.On("GetMessageWithDataCached", mock.Anything, mock.Anything).Return(&core.Message{
Header: core.MessageHeader{ID: fftypes.NewUUID()},
}, nil, true, nil)

// Expect to get the batch dispatched - with the three matching events
events := <-eventDeliveries
assert.Len(t, events, 2)

// This should complete the batch
<-batch1Done

mdi.AssertExpectations(t)
mei.AssertExpectations(t)
mdm.AssertExpectations(t)
}

func TestEnrichEventsFailGetMessages(t *testing.T) {

sub := &subscription{
Expand Down
59 changes: 59 additions & 0 deletions internal/events/event_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,57 @@ func TestReadPageSingleCommitEvent(t *testing.T) {
mdi.AssertExpectations(t)
}

func TestReadPageBatchTimeoutNotFull(t *testing.T) {
mdi := &databasemocks.Plugin{}
processEventCalled := make(chan []core.LocallySequenced, 1)
ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) {
processEventCalled <- events
return false, nil
}, nil)
ep.conf.eventBatchTimeout = 1 * time.Microsecond
ep.conf.eventBatchSize = 3
ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1}, nil, nil).Once() // half batch
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) {
ep.shoulderTap()
}).Once()
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) {
cancel()
})
ep.eventLoop()

events := <-processEventCalled
assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID)
assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID)
mdi.AssertExpectations(t)
}

func TestReadPageBatchFull(t *testing.T) {
mdi := &databasemocks.Plugin{}
processEventCalled := make(chan []core.LocallySequenced, 1)
ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) {
processEventCalled <- events
return false, nil
}, nil)
ep.conf.eventBatchTimeout = 1 * time.Microsecond
ep.conf.eventBatchSize = 2
ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "")
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) {
ep.shoulderTap()
}).Once()
mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) {
cancel()
})
ep.eventLoop()

events := <-processEventCalled
assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID)
assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID)
mdi.AssertExpectations(t)
}

func TestReadPageRewind(t *testing.T) {
mdi := &databasemocks.Plugin{}
processEventCalled := make(chan core.LocallySequenced, 1)
Expand Down Expand Up @@ -325,6 +376,14 @@ func TestDoubleTap(t *testing.T) {
ep.shoulderTap() // this should not block
}

func TestWaitForBatchTimeoutClosedContext(t *testing.T) {
mdi := &databasemocks.Plugin{}
ep, cancel := newTestEventPoller(t, mdi, nil, nil)
ep.conf.eventBatchTimeout = 1 * time.Minute
cancel()
ep.waitForBatchTimeout()
}

func TestDoubleConfirm(t *testing.T) {
mdi := &databasemocks.Plugin{}
ep, cancel := newTestEventPoller(t, mdi, nil, nil)
Expand Down

0 comments on commit 93894ff

Please sign in to comment.