Skip to content

Commit 8dd5a0d

Browse files
Avoid replaying entire chain for zero or explicit block catchup
Signed-off-by: Peter Broadhurst <[email protected]>
1 parent 752c510 commit 8dd5a0d

File tree

5 files changed

+221
-27
lines changed

5 files changed

+221
-27
lines changed

internal/blockchain/ethereum/eventstream.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -203,31 +203,57 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (
203203
return sub.Name, nil
204204
}
205205

206-
func latestOrLastBlock(protocolID string) string {
207-
if len(protocolID) > 0 {
208-
blockStr := strings.Split(protocolID, "/")[0]
209-
blockNumber, err := strconv.ParseUint(blockStr, 10, 64)
210-
if err == nil {
206+
func resolveFromBlock(ctx context.Context, firstEvent, lastProtocolID string) (string, error) {
207+
// Parse the lastProtocolID if supplied
208+
var blockBeforeNewestEvent *uint64
209+
if len(lastProtocolID) > 0 {
210+
blockStr := strings.Split(lastProtocolID, "/")[0]
211+
parsedUint, err := strconv.ParseUint(blockStr, 10, 64)
212+
if err != nil {
213+
return "", i18n.NewError(ctx, coremsgs.MsgInvalidLastEventProtocolID, lastProtocolID)
214+
}
215+
if parsedUint > 0 {
211216
// We jump back on block from the last event, to minimize re-delivery while ensuring
212217
// we get all events since the last delivered (including subsequent events in the same block)
213-
return strconv.FormatUint(blockNumber-1, 10)
218+
parsedUint--
219+
blockBeforeNewestEvent = &parsedUint
220+
}
221+
}
222+
223+
// If the user requested newest, then we use the last block number if we have one,
224+
// or we pass the request for newest down to the connector
225+
if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) || firstEvent == "latest" {
226+
if blockBeforeNewestEvent != nil {
227+
return strconv.FormatUint(*blockBeforeNewestEvent, 10), nil
214228
}
229+
return "latest", nil
230+
}
231+
232+
// Otherwise we expect to be able to parse the block, with "oldest" being the same as "0"
233+
if firstEvent == string(core.SubOptsFirstEventOldest) {
234+
firstEvent = "0"
215235
}
216-
return "latest"
236+
blockNumber, err := strconv.ParseUint(firstEvent, 10, 64)
237+
if err != nil {
238+
return "", i18n.NewError(ctx, coremsgs.MsgInvalidFromBlockNumber, firstEvent)
239+
}
240+
// If the last event is already dispatched after this block, recreate the listener from that block
241+
if blockBeforeNewestEvent != nil && *blockBeforeNewestEvent > blockNumber {
242+
blockNumber = *blockBeforeNewestEvent
243+
}
244+
return strconv.FormatUint(blockNumber, 10), nil
217245
}
218246

219247
func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, lastProtocolID string) (*subscription, error) {
220-
// Map FireFly "firstEvent" values to Ethereum "fromBlock" values
221-
switch firstEvent {
222-
case string(core.SubOptsFirstEventOldest):
223-
firstEvent = "0"
224-
case string(core.SubOptsFirstEventNewest):
225-
firstEvent = latestOrLastBlock(lastProtocolID)
248+
fromBlock, err := resolveFromBlock(ctx, firstEvent, lastProtocolID)
249+
if err != nil {
250+
return nil, err
226251
}
252+
227253
sub := subscription{
228254
Name: subName,
229255
Stream: stream,
230-
FromBlock: firstEvent,
256+
FromBlock: fromBlock,
231257
EthCompatEvent: abi,
232258
}
233259

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright © 2024 Kaleido, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package ethereum
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
)
25+
26+
func TestCreateSubscriptionBadBlock(t *testing.T) {
27+
e, cancel := newTestEthereum()
28+
defer cancel()
29+
30+
_, err := e.streams.createSubscription(context.Background(), nil, "", "", "wrongness", nil, "")
31+
assert.Regexp(t, "FF10473", err)
32+
}
33+
34+
func TestResolveFromBlockCombinations(t *testing.T) {
35+
36+
ctx := context.Background()
37+
38+
fromBlock, err := resolveFromBlock(ctx, "", "")
39+
assert.Equal(t, "latest", fromBlock)
40+
assert.NoError(t, err)
41+
42+
fromBlock, err = resolveFromBlock(ctx, "latest", "")
43+
assert.Equal(t, "latest", fromBlock)
44+
assert.NoError(t, err)
45+
46+
fromBlock, err = resolveFromBlock(ctx, "newest", "")
47+
assert.Equal(t, "latest", fromBlock)
48+
assert.NoError(t, err)
49+
50+
fromBlock, err = resolveFromBlock(ctx, "0", "")
51+
assert.Equal(t, "0", fromBlock)
52+
assert.NoError(t, err)
53+
54+
fromBlock, err = resolveFromBlock(ctx, "0", "000000000010/000000/000050")
55+
assert.Equal(t, "9", fromBlock)
56+
assert.NoError(t, err)
57+
58+
fromBlock, err = resolveFromBlock(ctx, "20", "000000000010/000000/000050")
59+
assert.Equal(t, "20", fromBlock)
60+
assert.NoError(t, err)
61+
62+
fromBlock, err = resolveFromBlock(ctx, "", "000000000010/000000/000050")
63+
assert.Equal(t, "9", fromBlock)
64+
assert.NoError(t, err)
65+
66+
_, err = resolveFromBlock(ctx, "", "wrong")
67+
assert.Regexp(t, "FF10472", err)
68+
69+
}

internal/blockchain/fabric/eventstream.go

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -179,26 +179,54 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (
179179
return sub.Name, nil
180180
}
181181

182-
func newestOrLastBlock(protocolID string) string {
183-
if len(protocolID) > 0 {
184-
blockStr := strings.Split(protocolID, "/")[0]
185-
blockNumber, err := strconv.ParseUint(blockStr, 10, 64)
186-
if err == nil {
182+
func resolveFromBlock(ctx context.Context, firstEvent, lastProtocolID string) (string, error) {
183+
// Parse the lastProtocolID if supplied
184+
var blockBeforeNewestEvent *uint64
185+
if len(lastProtocolID) > 0 {
186+
blockStr := strings.Split(lastProtocolID, "/")[0]
187+
parsedUint, err := strconv.ParseUint(blockStr, 10, 64)
188+
if err != nil {
189+
return "", i18n.NewError(ctx, coremsgs.MsgInvalidLastEventProtocolID, lastProtocolID)
190+
}
191+
if parsedUint > 0 {
187192
// We jump back on block from the last event, to minimize re-delivery while ensuring
188193
// we get all events since the last delivered (including subsequent events in the same block)
189-
return strconv.FormatUint(blockNumber-1, 10)
194+
parsedUint--
195+
blockBeforeNewestEvent = &parsedUint
190196
}
191197
}
192-
return "newest"
193-
}
194198

195-
func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) {
196-
// Map FireFly "firstEvent" values to Fabric "fromBlock" values
199+
// If the user requested newest, then we use the last block number if we have one,
200+
// or we pass the request for newest down to the connector
201+
if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) || firstEvent == "latest" {
202+
if blockBeforeNewestEvent != nil {
203+
return strconv.FormatUint(*blockBeforeNewestEvent, 10), nil
204+
}
205+
return "newest", nil
206+
}
207+
208+
// Otherwise we expect to be able to parse the block, with "oldest" being the same as "0"
197209
if firstEvent == string(core.SubOptsFirstEventOldest) {
198210
firstEvent = "0"
199-
} else if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) {
200-
firstEvent = newestOrLastBlock(lastProtocolID)
201211
}
212+
blockNumber, err := strconv.ParseUint(firstEvent, 10, 64)
213+
if err != nil {
214+
return "", i18n.NewError(ctx, coremsgs.MsgInvalidFromBlockNumber, firstEvent)
215+
}
216+
// If the last event is already dispatched after this block, recreate the listener from that block
217+
if blockBeforeNewestEvent != nil && *blockBeforeNewestEvent > blockNumber {
218+
blockNumber = *blockBeforeNewestEvent
219+
}
220+
return strconv.FormatUint(blockNumber, 10), nil
221+
}
222+
223+
func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) {
224+
225+
fromBlock, err := resolveFromBlock(ctx, firstEvent, lastProtocolID)
226+
if err != nil {
227+
return nil, err
228+
}
229+
202230
sub := subscription{
203231
Name: name,
204232
Channel: location.Channel,
@@ -207,7 +235,7 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati
207235
Filter: eventFilter{
208236
EventFilter: event,
209237
},
210-
FromBlock: firstEvent,
238+
FromBlock: fromBlock,
211239
}
212240

213241
if location.Chaincode != "" {
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright © 2024 Kaleido, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package fabric
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
)
25+
26+
func TestCreateSubscriptionBadBlock(t *testing.T) {
27+
e, cancel := newTestFabric()
28+
defer cancel()
29+
30+
_, err := e.streams.createSubscription(context.Background(), nil, "", "", "", "wrongness", "")
31+
assert.Regexp(t, "FF10473", err)
32+
}
33+
34+
func TestResolveFromBlockCombinations(t *testing.T) {
35+
36+
ctx := context.Background()
37+
38+
fromBlock, err := resolveFromBlock(ctx, "", "")
39+
assert.Equal(t, "newest", fromBlock)
40+
assert.NoError(t, err)
41+
42+
fromBlock, err = resolveFromBlock(ctx, "latest", "")
43+
assert.Equal(t, "newest", fromBlock)
44+
assert.NoError(t, err)
45+
46+
fromBlock, err = resolveFromBlock(ctx, "newest", "")
47+
assert.Equal(t, "newest", fromBlock)
48+
assert.NoError(t, err)
49+
50+
fromBlock, err = resolveFromBlock(ctx, "0", "")
51+
assert.Equal(t, "0", fromBlock)
52+
assert.NoError(t, err)
53+
54+
fromBlock, err = resolveFromBlock(ctx, "0", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d")
55+
assert.Equal(t, "9", fromBlock)
56+
assert.NoError(t, err)
57+
58+
fromBlock, err = resolveFromBlock(ctx, "20", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d")
59+
assert.Equal(t, "20", fromBlock)
60+
assert.NoError(t, err)
61+
62+
fromBlock, err = resolveFromBlock(ctx, "", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d")
63+
assert.Equal(t, "9", fromBlock)
64+
assert.NoError(t, err)
65+
66+
_, err = resolveFromBlock(ctx, "", "wrong")
67+
assert.Regexp(t, "FF10472", err)
68+
69+
}

internal/coremsgs/en_error_messages.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,4 +310,6 @@ var (
310310
MsgNoRegistrationMessageData = ffe("FF10469", "Unable to check message registration data for org %s", 500)
311311
MsgUnexpectedRegistrationType = ffe("FF10470", "Unexpected type checking registration status: %s", 500)
312312
MsgUnableToParseRegistrationData = ffe("FF10471", "Unable to parse registration message data: %s", 500)
313+
MsgInvalidLastEventProtocolID = ffe("FF10472", "Unable to parse protocol ID of previous event: %s", 500)
314+
MsgInvalidFromBlockNumber = ffe("FF10473", "Unable to parse block number: %s", 500)
313315
)

0 commit comments

Comments
 (0)