From 8dd5a0d2899a0b002d080a60f15ab64770ef1be0 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Wed, 26 Jun 2024 21:43:15 -0400 Subject: [PATCH] Avoid replaying entire chain for zero or explicit block catchup Signed-off-by: Peter Broadhurst --- internal/blockchain/ethereum/eventstream.go | 54 +++++++++++---- .../blockchain/ethereum/eventstream_test.go | 69 +++++++++++++++++++ internal/blockchain/fabric/eventstream.go | 54 +++++++++++---- .../blockchain/fabric/eventstream_test.go | 69 +++++++++++++++++++ internal/coremsgs/en_error_messages.go | 2 + 5 files changed, 221 insertions(+), 27 deletions(-) create mode 100644 internal/blockchain/ethereum/eventstream_test.go create mode 100644 internal/blockchain/fabric/eventstream_test.go diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index f3853e5ea..e913139f4 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -203,31 +203,57 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func latestOrLastBlock(protocolID string) string { - if len(protocolID) > 0 { - blockStr := strings.Split(protocolID, "/")[0] - blockNumber, err := strconv.ParseUint(blockStr, 10, 64) - if err == nil { +func resolveFromBlock(ctx context.Context, firstEvent, lastProtocolID string) (string, error) { + // Parse the lastProtocolID if supplied + var blockBeforeNewestEvent *uint64 + if len(lastProtocolID) > 0 { + blockStr := strings.Split(lastProtocolID, "/")[0] + parsedUint, err := strconv.ParseUint(blockStr, 10, 64) + if err != nil { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidLastEventProtocolID, lastProtocolID) + } + if parsedUint > 0 { // We jump back on block from the last event, to minimize re-delivery while ensuring // we get all events since the last delivered (including subsequent events in the same block) - return strconv.FormatUint(blockNumber-1, 10) + parsedUint-- + blockBeforeNewestEvent = &parsedUint + } + } + + // If the user requested newest, then we use the last block number if we have one, + // or we pass the request for newest down to the connector + if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) || firstEvent == "latest" { + if blockBeforeNewestEvent != nil { + return strconv.FormatUint(*blockBeforeNewestEvent, 10), nil } + return "latest", nil + } + + // Otherwise we expect to be able to parse the block, with "oldest" being the same as "0" + if firstEvent == string(core.SubOptsFirstEventOldest) { + firstEvent = "0" } - return "latest" + blockNumber, err := strconv.ParseUint(firstEvent, 10, 64) + if err != nil { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidFromBlockNumber, firstEvent) + } + // If the last event is already dispatched after this block, recreate the listener from that block + if blockBeforeNewestEvent != nil && *blockBeforeNewestEvent > blockNumber { + blockNumber = *blockBeforeNewestEvent + } + return strconv.FormatUint(blockNumber, 10), nil } func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, lastProtocolID string) (*subscription, error) { - // Map FireFly "firstEvent" values to Ethereum "fromBlock" values - switch firstEvent { - case string(core.SubOptsFirstEventOldest): - firstEvent = "0" - case string(core.SubOptsFirstEventNewest): - firstEvent = latestOrLastBlock(lastProtocolID) + fromBlock, err := resolveFromBlock(ctx, firstEvent, lastProtocolID) + if err != nil { + return nil, err } + sub := subscription{ Name: subName, Stream: stream, - FromBlock: firstEvent, + FromBlock: fromBlock, EthCompatEvent: abi, } diff --git a/internal/blockchain/ethereum/eventstream_test.go b/internal/blockchain/ethereum/eventstream_test.go new file mode 100644 index 000000000..dfb28f872 --- /dev/null +++ b/internal/blockchain/ethereum/eventstream_test.go @@ -0,0 +1,69 @@ +// Copyright © 2024 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ethereum + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCreateSubscriptionBadBlock(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + + _, err := e.streams.createSubscription(context.Background(), nil, "", "", "wrongness", nil, "") + assert.Regexp(t, "FF10473", err) +} + +func TestResolveFromBlockCombinations(t *testing.T) { + + ctx := context.Background() + + fromBlock, err := resolveFromBlock(ctx, "", "") + assert.Equal(t, "latest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "latest", "") + assert.Equal(t, "latest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "newest", "") + assert.Equal(t, "latest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "0", "") + assert.Equal(t, "0", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "0", "000000000010/000000/000050") + assert.Equal(t, "9", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "20", "000000000010/000000/000050") + assert.Equal(t, "20", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "", "000000000010/000000/000050") + assert.Equal(t, "9", fromBlock) + assert.NoError(t, err) + + _, err = resolveFromBlock(ctx, "", "wrong") + assert.Regexp(t, "FF10472", err) + +} diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index 04d15a680..3b38142b4 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -179,26 +179,54 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func newestOrLastBlock(protocolID string) string { - if len(protocolID) > 0 { - blockStr := strings.Split(protocolID, "/")[0] - blockNumber, err := strconv.ParseUint(blockStr, 10, 64) - if err == nil { +func resolveFromBlock(ctx context.Context, firstEvent, lastProtocolID string) (string, error) { + // Parse the lastProtocolID if supplied + var blockBeforeNewestEvent *uint64 + if len(lastProtocolID) > 0 { + blockStr := strings.Split(lastProtocolID, "/")[0] + parsedUint, err := strconv.ParseUint(blockStr, 10, 64) + if err != nil { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidLastEventProtocolID, lastProtocolID) + } + if parsedUint > 0 { // We jump back on block from the last event, to minimize re-delivery while ensuring // we get all events since the last delivered (including subsequent events in the same block) - return strconv.FormatUint(blockNumber-1, 10) + parsedUint-- + blockBeforeNewestEvent = &parsedUint } } - return "newest" -} -func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) { - // Map FireFly "firstEvent" values to Fabric "fromBlock" values + // If the user requested newest, then we use the last block number if we have one, + // or we pass the request for newest down to the connector + if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) || firstEvent == "latest" { + if blockBeforeNewestEvent != nil { + return strconv.FormatUint(*blockBeforeNewestEvent, 10), nil + } + return "newest", nil + } + + // Otherwise we expect to be able to parse the block, with "oldest" being the same as "0" if firstEvent == string(core.SubOptsFirstEventOldest) { firstEvent = "0" - } else if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) { - firstEvent = newestOrLastBlock(lastProtocolID) } + blockNumber, err := strconv.ParseUint(firstEvent, 10, 64) + if err != nil { + return "", i18n.NewError(ctx, coremsgs.MsgInvalidFromBlockNumber, firstEvent) + } + // If the last event is already dispatched after this block, recreate the listener from that block + if blockBeforeNewestEvent != nil && *blockBeforeNewestEvent > blockNumber { + blockNumber = *blockBeforeNewestEvent + } + return strconv.FormatUint(blockNumber, 10), nil +} + +func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) { + + fromBlock, err := resolveFromBlock(ctx, firstEvent, lastProtocolID) + if err != nil { + return nil, err + } + sub := subscription{ Name: name, Channel: location.Channel, @@ -207,7 +235,7 @@ func (s *streamManager) createSubscription(ctx context.Context, location *Locati Filter: eventFilter{ EventFilter: event, }, - FromBlock: firstEvent, + FromBlock: fromBlock, } if location.Chaincode != "" { diff --git a/internal/blockchain/fabric/eventstream_test.go b/internal/blockchain/fabric/eventstream_test.go new file mode 100644 index 000000000..b5a15eda4 --- /dev/null +++ b/internal/blockchain/fabric/eventstream_test.go @@ -0,0 +1,69 @@ +// Copyright © 2024 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fabric + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCreateSubscriptionBadBlock(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + + _, err := e.streams.createSubscription(context.Background(), nil, "", "", "", "wrongness", "") + assert.Regexp(t, "FF10473", err) +} + +func TestResolveFromBlockCombinations(t *testing.T) { + + ctx := context.Background() + + fromBlock, err := resolveFromBlock(ctx, "", "") + assert.Equal(t, "newest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "latest", "") + assert.Equal(t, "newest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "newest", "") + assert.Equal(t, "newest", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "0", "") + assert.Equal(t, "0", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "0", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d") + assert.Equal(t, "9", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "20", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d") + assert.Equal(t, "20", fromBlock) + assert.NoError(t, err) + + fromBlock, err = resolveFromBlock(ctx, "", "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d") + assert.Equal(t, "9", fromBlock) + assert.NoError(t, err) + + _, err = resolveFromBlock(ctx, "", "wrong") + assert.Regexp(t, "FF10472", err) + +} diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index efb2e9db2..aeaea5b0d 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -310,4 +310,6 @@ var ( MsgNoRegistrationMessageData = ffe("FF10469", "Unable to check message registration data for org %s", 500) MsgUnexpectedRegistrationType = ffe("FF10470", "Unexpected type checking registration status: %s", 500) MsgUnableToParseRegistrationData = ffe("FF10471", "Unable to parse registration message data: %s", 500) + MsgInvalidLastEventProtocolID = ffe("FF10472", "Unable to parse protocol ID of previous event: %s", 500) + MsgInvalidFromBlockNumber = ffe("FF10473", "Unable to parse block number: %s", 500) )