Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use separate event stream per namespace #1388

Merged
merged 27 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7ad5304
Use separate event stream per namespace
nguyer Aug 10, 2023
3f04c1e
Use separate event stream per namespace in tokens plugin
nguyer Aug 29, 2023
a37304f
Implement StopNamespace in fftokens. Fix unit tests.
nguyer Aug 30, 2023
596caed
Only init bc if not nil
nguyer Aug 30, 2023
66173b2
Recreate custom contract subscriptions after evenstream migration
nguyer Sep 1, 2023
e5f1ccc
Merge branch 'main' into eventstreams
nguyer Oct 13, 2023
9655d6f
Update Tezos plugin and fix Ethereum/Fabric EventStream migration
nguyer Oct 13, 2023
91a18fc
Add namespace to token connector API calls
nguyer Nov 1, 2023
94bfc75
Fix migration issues and increase test coverage
nguyer Nov 8, 2023
0559710
Test coverage back to 100%
nguyer Nov 9, 2023
cecee27
Re-activate token pool on startup
nguyer Nov 10, 2023
aa64620
Fix unit tests
nguyer Nov 14, 2023
e0b34a5
Address PR feedback
nguyer Nov 15, 2023
3fe2b08
Merge branch 'dependencies' into eventstreams
nguyer Jan 24, 2024
6b4fd88
Add AlternateLocators
nguyer Feb 1, 2024
bf4656b
Fixes for updating pool locators
nguyer Feb 2, 2024
a52d3bc
Fixes for updating pool locators
nguyer Feb 2, 2024
3ad8416
Update copyright year
nguyer Feb 13, 2024
65283ec
Fix unit test
nguyer Feb 13, 2024
813e29b
Add unit tests for updating pool locator
nguyer Feb 13, 2024
7972561
Merge branch 'main' into eventstreams
nguyer Feb 13, 2024
ba212a9
Address PR feedback
nguyer Feb 21, 2024
99c7bc1
Merge branch 'main' into eventstreams
nguyer Feb 21, 2024
ee0770d
PR feedback
nguyer Feb 26, 2024
b9b7a29
Update fftokens unit tests
nguyer Feb 26, 2024
716efcd
Merge branch 'main' into eventstreams
nguyer Mar 4, 2024
dd281b8
Use updated token connectors
nguyer Mar 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,879 changes: 28 additions & 1,851 deletions go.work.sum

Large diffs are not rendered by default.

160 changes: 75 additions & 85 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-common/pkg/retry"
"github.com/hyperledger/firefly-common/pkg/wsclient"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/hyperledger/firefly-signer/pkg/ffi2abi"
Expand Down Expand Up @@ -61,24 +60,23 @@ const (
type Ethereum struct {
ctx context.Context
cancelCtx context.CancelFunc
topic string
pluginTopic string
prefixShort string
prefixLong string
capabilities *blockchain.Capabilities
callbacks common.BlockchainCallbacks
client *resty.Client
streams *streamManager
streamID string
wsconn wsclient.WSClient
closed chan struct{}
streamID map[string]string
wsconn map[string]wsclient.WSClient
wsConfig *wsclient.WSConfig
closed map[string]chan struct{}
addressResolveAlways bool
addressResolver *addressResolver
metrics metrics.Manager
ethconnectConf config.Section
subs common.FireflySubscriptions
cache cache.CInterface
backgroundRetry *retry.Retry
backgroundStart bool
}

type eventStreamWebsocket struct {
Expand Down Expand Up @@ -162,7 +160,7 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", ethconnectConf)
}

wsConfig, err := wsclient.GenerateConfig(ctx, ethconnectConf)
e.wsConfig, err = wsclient.GenerateConfig(ctx, ethconnectConf)
if err == nil {
e.client, err = ffresty.New(e.ctx, ethconnectConf)
}
Expand All @@ -171,19 +169,15 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf
return err
}

e.topic = ethconnectConf.GetString(EthconnectConfigTopic)
if e.topic == "" {
e.pluginTopic = ethconnectConf.GetString(EthconnectConfigTopic)
if e.pluginTopic == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", ethconnectConf)
}
e.prefixShort = ethconnectConf.GetString(EthconnectPrefixShort)
e.prefixLong = ethconnectConf.GetString(EthconnectPrefixLong)

if wsConfig.WSKeyPath == "" {
wsConfig.WSKeyPath = "/ws"
}
e.wsconn, err = wsclient.New(ctx, wsConfig, nil, e.afterConnect)
if err != nil {
return err
if e.wsConfig.WSKeyPath == "" {
e.wsConfig.WSKeyPath = "/ws"
}

cache, err := cacheManager.GetCache(
Expand All @@ -199,29 +193,68 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf
}
e.cache = cache

e.streamID = make(map[string]string)
e.closed = make(map[string]chan struct{})
e.wsconn = make(map[string]wsclient.WSClient)
e.streams = newStreamManager(e.client, e.cache, e.ethconnectConf.GetUint(EthconnectConfigBatchSize), uint(e.ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds()))

e.backgroundStart = e.ethconnectConf.GetBool(EthconnectBackgroundStart)
if e.backgroundStart {
e.backgroundRetry = &retry.Retry{
InitialDelay: e.ethconnectConf.GetDuration(EthconnectBackgroundStartInitialDelay),
MaximumDelay: e.ethconnectConf.GetDuration(EthconnectBackgroundStartMaxDelay),
Factor: e.ethconnectConf.GetFloat64(EthconnectBackgroundStartFactor),
}
return nil
}

return nil
func (e *Ethereum) getTopic(namespace string) string {
return fmt.Sprintf("%s/%s", e.pluginTopic, namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it's important we document this, in the docs for the configuration of the EthconnectConfigTopic config entry

}

func (e *Ethereum) StartNamespace(ctx context.Context, namespace string) (err error) {
log.L(e.ctx).Debugf("Starting namespace: %s", namespace)
topic := e.getTopic(namespace)

e.wsconn[namespace], err = wsclient.New(ctx, e.wsConfig, nil, func(ctx context.Context, w wsclient.WSClient) error {
// Send a subscribe to our topic after each connect/reconnect
b, _ := json.Marshal(&ethWSCommandPayload{
Type: "listen",
Topic: topic,
})
err := w.Send(ctx, b)
if err == nil {
b, _ = json.Marshal(&ethWSCommandPayload{
Type: "listenreplies",
})
err = w.Send(ctx, b)
}
return err
})
if err != nil {
return err
}
// Otherwise, make sure that our event stream is in place
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise feels superfluous here

stream, err := e.streams.ensureEventStream(ctx, topic)
if err != nil {
return err
}
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", stream.ID, topic)
e.streamID[namespace] = stream.ID

stream, err := e.streams.ensureEventStream(e.ctx, e.topic)
err = e.wsconn[namespace].Connect()
if err != nil {
return err
}

e.streamID = stream.ID
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic)
e.closed[namespace] = make(chan struct{})

go e.eventLoop(namespace)

return nil
}

e.closed = make(chan struct{})
go e.eventLoop()
func (e *Ethereum) StopNamespace(ctx context.Context, namespace string) (err error) {
wsconn, ok := e.wsconn[namespace]
if ok {
wsconn.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like we should wait for the eventLoop to close

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we have e.closed[namespace] for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I added a wait for this here, but it then gets into a deadlock trying to stop the namespace

if ok {
    <-e.closed[namespace]
    wsconn.Close()
}

}
delete(e.wsconn, namespace)
delete(e.streamID, namespace)
delete(e.closed, namespace)

return nil
}
Expand All @@ -234,36 +267,6 @@ func (e *Ethereum) SetOperationHandler(namespace string, handler core.OperationC
e.callbacks.SetOperationalHandler(namespace, handler)
}

func (e *Ethereum) startBackgroundLoop() {
_ = e.backgroundRetry.Do(e.ctx, fmt.Sprintf("ethereum connector %s", e.Name()), func(attempt int) (retry bool, err error) {
stream, err := e.streams.ensureEventStream(e.ctx, e.topic)
if err != nil {
return true, err
}

e.streamID = stream.ID
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic)
err = e.wsconn.Connect()
if err != nil {
return true, err
}

e.closed = make(chan struct{})
go e.eventLoop()

return false, nil
})
}

func (e *Ethereum) Start() (err error) {
if e.backgroundStart {
go e.startBackgroundLoop()
return nil
}

return e.wsconn.Connect()
}

func (e *Ethereum) Capabilities() *blockchain.Capabilities {
return e.capabilities
}
Expand All @@ -279,7 +282,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N
return "", err
}

sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, e.streamID, batchPinEventABI)
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, e.streamID[namespace.Name], batchPinEventABI)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a check that the map entry is set (to avoid a nil panic)?

Or is the dynamic config loading threading model such that we know it's impossible for an API call to come down to this layer to create a subscription for a new listener, after the stream has been cleaned up?

if err != nil {
return "", err
}
Expand All @@ -295,22 +298,6 @@ func (e *Ethereum) RemoveFireflySubscription(ctx context.Context, subID string)
e.subs.RemoveSubscription(ctx, subID)
}

func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error {
// Send a subscribe to our topic after each connect/reconnect
b, _ := json.Marshal(&ethWSCommandPayload{
Type: "listen",
Topic: e.topic,
})
err := w.Send(ctx, b)
if err == nil {
b, _ = json.Marshal(&ethWSCommandPayload{
Type: "listenreplies",
})
err = w.Send(ctx, b)
}
return err
}

func ethHexFormatB32(b *fftypes.Bytes32) string {
if b == nil {
return "0x0000000000000000000000000000000000000000000000000000000000000000"
Expand Down Expand Up @@ -460,17 +447,19 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, batchID int64, messag
return e.callbacks.DispatchBlockchainEvents(ctx, events)
}

func (e *Ethereum) eventLoop() {
defer e.wsconn.Close()
defer close(e.closed)
func (e *Ethereum) eventLoop(namespace string) {
topic := e.getTopic(namespace)
wsconn := e.wsconn[namespace]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost certain not to be a problem, but technically I think this should be passed in on the creation of the go-routine, as there's no locking around the map. So the starting routine sets this before kicking off the eventLoop and then we're querying the map in that newly kicked off routine.

defer wsconn.Close()
defer close(e.closed[namespace])
l := log.L(e.ctx).WithField("role", "event-loop")
ctx := log.WithLogger(e.ctx, l)
for {
select {
case <-ctx.Done():
l.Debugf("Event loop exiting (context cancelled)")
return
case msgBytes, ok := <-e.wsconn.Receive():
case msgBytes, ok := <-wsconn.Receive():
if !ok {
l.Debugf("Event loop exiting (receive channel closed). Terminating server!")
e.cancelCtx()
Expand All @@ -489,9 +478,9 @@ func (e *Ethereum) eventLoop() {
if err == nil {
ack, _ := json.Marshal(&ethWSCommandPayload{
Type: "ack",
Topic: e.topic,
Topic: topic,
})
err = e.wsconn.Send(ctx, ack)
err = wsconn.Send(ctx, ack)
}
case map[string]interface{}:
isBatch := false
Expand All @@ -502,7 +491,7 @@ func (e *Ethereum) eventLoop() {
err = e.handleMessageBatch(ctx, (int64)(batchNumber), events)
// Errors processing messages are converted into nacks
ackOrNack := &ethWSCommandPayload{
Topic: e.topic,
Topic: topic,
BatchNumber: int64(batchNumber),
}
if err == nil {
Expand All @@ -513,7 +502,7 @@ func (e *Ethereum) eventLoop() {
ackOrNack.Message = err.Error()
}
b, _ := json.Marshal(&ackOrNack)
err = e.wsconn.Send(ctx, b)
err = wsconn.Send(ctx, b)
}
}
if !isBatch {
Expand Down Expand Up @@ -875,6 +864,7 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio

func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) {
var location *Location
namespace := listener.Namespace
if listener.Location != nil {
location, err = e.parseContractLocation(ctx, listener.Location)
if err != nil {
Expand All @@ -891,7 +881,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
}
result, err := e.streams.createSubscription(ctx, location, e.streamID, subName, firstEvent, abi)
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi)
if err != nil {
return err
}
Expand Down
Loading