From cecee27a723005d4fe011858475b2830e780d533 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Fri, 10 Nov 2023 16:07:52 -0500 Subject: [PATCH] Re-activate token pool on startup Signed-off-by: Nicko Guyer --- .../sqlite/000116_tx_type_not_null.up.sql | 2 +- go.sum | 1 + internal/assets/manager.go | 25 ++++++++ internal/orchestrator/orchestrator.go | 10 +--- internal/tokens/fftokens/fftokens.go | 58 +++++++++++++++---- internal/tokens/fftokens/fftokens_test.go | 16 ++--- mocks/assetmocks/manager.go | 14 +++++ mocks/tokenmocks/plugin.go | 24 ++++++-- pkg/tokens/plugin.go | 5 +- 9 files changed, 123 insertions(+), 32 deletions(-) diff --git a/db/migrations/sqlite/000116_tx_type_not_null.up.sql b/db/migrations/sqlite/000116_tx_type_not_null.up.sql index 1f18a8fa5..d76110881 100644 --- a/db/migrations/sqlite/000116_tx_type_not_null.up.sql +++ b/db/migrations/sqlite/000116_tx_type_not_null.up.sql @@ -1,5 +1,5 @@ UPDATE messages SET tx_parent_type = '' WHERE tx_parent_type IS NULL; ALTER TABLE messages RENAME COLUMN tx_parent_type TO tx_parent_type_temp; -ALTER TABLE messages ADD COLUMN tx_parent_type VARCHAR(64) NOT NULL; +ALTER TABLE messages ADD COLUMN tx_parent_type VARCHAR(64) DEFAULT '' NOT NULL; UPDATE messages SET tx_parent_type = tx_parent_type_temp; ALTER TABLE messages DROP COLUMN tx_parent_type_temp; \ No newline at end of file diff --git a/go.sum b/go.sum index c5c653a63..789faf2b3 100644 --- a/go.sum +++ b/go.sum @@ -457,6 +457,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= diff --git a/internal/assets/manager.go b/internal/assets/manager.go index 77ceaf824..e7f84669e 100644 --- a/internal/assets/manager.go +++ b/internal/assets/manager.go @@ -71,6 +71,9 @@ type Manager interface { // From operations.OperationHandler PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error) + + // Starts the namespace on each of the configured token plugins + Start(ctx context.Context) error } type assetManager struct { @@ -170,6 +173,28 @@ func (am *assetManager) GetTokenConnectors(ctx context.Context) []*core.TokenCon return connectors } +func (am *assetManager) Start(ctx context.Context) error { + f := database.TokenPoolQueryFactory.NewFilter(ctx).And() + pools, _, err := am.database.GetTokenPools(ctx, am.namespace, f) + if err != nil { + return err + } + + for _, plugin := range am.tokens { + activePools := []*core.TokenPool{} + for _, pool := range pools { + if pool.Connector == plugin.ConnectorName() && pool.Active { + activePools = append(activePools, pool) + } + } + err := plugin.StartNamespace(ctx, am.namespace, activePools) + if err != nil { + return err + } + } + return nil +} + func (am *assetManager) getDefaultTokenConnector(ctx context.Context) (string, error) { tokenConnectors := am.GetTokenConnectors(ctx) if len(tokenConnectors) != 1 { diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 3ecd40e88..1a5ac6591 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -542,6 +542,9 @@ func (or *orchestrator) initManagers(ctx context.Context) (err error) { if err != nil { return err } + if err := or.assets.Start(ctx); err != nil { + return err + } } if or.defsender == nil { @@ -569,13 +572,6 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) { } } - for _, t := range or.tokens() { - err := t.StartNamespace(ctx, or.namespace.Name) - if err != nil { - return err - } - } - if or.data == nil { or.data, err = data.NewDataManager(ctx, or.namespace, or.database(), or.dataexchange(), or.cacheManager) if err != nil { diff --git a/internal/tokens/fftokens/fftokens.go b/internal/tokens/fftokens/fftokens.go index 0864ffdcc..4700c3075 100644 --- a/internal/tokens/fftokens/fftokens.go +++ b/internal/tokens/fftokens/fftokens.go @@ -52,15 +52,16 @@ func (ie *ConflictError) IsConflictError() bool { } type FFTokens struct { - ctx context.Context - cancelCtx context.CancelFunc - capabilities *tokens.Capabilities - callbacks callbacks - configuredName string - client *resty.Client - wsconn map[string]wsclient.WSClient - wsConfig *wsclient.WSConfig - retry *retry.Retry + ctx context.Context + cancelCtx context.CancelFunc + capabilities *tokens.Capabilities + callbacks callbacks + configuredName string + client *resty.Client + wsconn map[string]wsclient.WSClient + wsConfig *wsclient.WSConfig + retry *retry.Retry + poolsToActivate map[string][]*core.TokenPool } type callbacks struct { @@ -153,6 +154,8 @@ const ( messageTokenBurn msgType = "token-burn" messageTokenTransfer msgType = "token-transfer" messageTokenApproval msgType = "token-approval" + messageStarted msgType = "started" + messageActivated msgType = "activated" ) type tokenData struct { @@ -276,6 +279,10 @@ func (ft *FFTokens) Name() string { return "fftokens" } +func (ft *FFTokens) ConnectorName() string { + return ft.configuredName +} + func (ft *FFTokens) Init(ctx context.Context, cancelCtx context.CancelFunc, name string, config config.Section) (err error) { ft.ctx = log.WithLogField(ctx, "proto", "fftokens") ft.cancelCtx = cancelCtx @@ -315,7 +322,7 @@ func (ft *FFTokens) Init(ctx context.Context, cancelCtx context.CancelFunc, name return nil } -func (ft *FFTokens) StartNamespace(ctx context.Context, namespace string) (err error) { +func (ft *FFTokens) StartNamespace(ctx context.Context, namespace string, activePools []*core.TokenPool) (err error) { if ft.wsconn[namespace] == nil { ft.wsconn[namespace], err = wsclient.New(ctx, ft.wsConfig, nil, nil) if err != nil { @@ -323,6 +330,13 @@ func (ft *FFTokens) StartNamespace(ctx context.Context, namespace string) (err e } } + // Keep the list of pools we need to ensure are active + // The handleNamespaceStarted function will ensure pools are active after the namespace has finished starting + if ft.poolsToActivate == nil { + ft.poolsToActivate = make(map[string][]*core.TokenPool) + } + ft.poolsToActivate[namespace] = activePools + err = ft.wsconn[namespace].Connect() if err != nil { return err @@ -649,6 +663,10 @@ func (ft *FFTokens) handleMessage(ctx context.Context, namespace string, msgByte err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeTransfer, msg.Data) case messageTokenApproval: err = ft.handleTokenApproval(ctx, msg.Data) + case messageStarted: + err = ft.handleNamespaceStarted(ctx, msg.Data) + case messageActivated: + err = ft.handlePoolActivated(ctx, msg.Data) default: log.L(ctx).Errorf("Message unexpected: %s", msg.Event) // do not set error here - we will never be able to process this message so log+swallow it. @@ -679,6 +697,26 @@ func (ft *FFTokens) handleMessageRetry(ctx context.Context, namespace string, ms }) } +func (ft *FFTokens) handleNamespaceStarted(ctx context.Context, data fftypes.JSONObject) error { + // Make sure any pools that are marked as active in our DB are indeed active + namespace := data.GetString("namespace") + log.L(ctx).Debugf("Token connector '%s' started namespace '%s'. Ensuring all token pools active.", ft.Name(), namespace) + for _, pool := range ft.poolsToActivate[namespace] { + if _, err := ft.ActivateTokenPool(ctx, pool); err != nil { + // Log the error and continue trying to activate pools + // At this point we've already started + log.L(ctx).Errorf("Error auto re-activating token pool '%s': %s", pool.ID, err.Error()) + } + log.L(ctx).Debugf("Activated token pool '%s'", pool.ID) + } + return nil +} + +func (ft *FFTokens) handlePoolActivated(ctx context.Context, data fftypes.JSONObject) error { + // NOOP + return nil +} + func (ft *FFTokens) eventLoop(namespace string) { wsconn := ft.wsconn[namespace] defer wsconn.Close() diff --git a/internal/tokens/fftokens/fftokens_test.go b/internal/tokens/fftokens/fftokens_test.go index 2c15b88d6..c90f76a8f 100644 --- a/internal/tokens/fftokens/fftokens_test.go +++ b/internal/tokens/fftokens/fftokens_test.go @@ -105,7 +105,7 @@ func TestInitBadURL(t *testing.T) { err := h.Init(ctx, cancelCtx, "testtokens", ffTokensConfig) assert.NoError(t, err) - err = h.StartNamespace(ctx, "ns1") + err = h.StartNamespace(ctx, "ns1", []*core.TokenPool{}) assert.Regexp(t, "FF00149", err) } @@ -121,7 +121,7 @@ func TestStartNamespaceConnectFail(t *testing.T) { err := h.Init(ctx, cancelCtx, "testtokens", ffTokensConfig) assert.NoError(t, err) - err = h.StartNamespace(ctx, "ns1") + err = h.StartNamespace(ctx, "ns1", []*core.TokenPool{}) assert.Error(t, err) } @@ -987,7 +987,7 @@ func TestIgnoredEvents(t *testing.T) { h, toServer, fromServer, _, done := newTestFFTokens(t) defer done() - err := h.StartNamespace(context.Background(), "ns1") + err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{}) assert.NoError(t, err) fromServer <- `!}` // ignored @@ -1012,7 +1012,7 @@ func TestReceiptEvents(t *testing.T) { h, _, fromServer, _, done := newTestFFTokens(t) defer done() - err := h.StartNamespace(context.Background(), "ns1") + err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{}) assert.NoError(t, err) mcb := &coremocks.OperationCallbacks{} @@ -1104,7 +1104,7 @@ func TestPoolEvents(t *testing.T) { h, toServer, fromServer, _, done := newTestFFTokens(t) defer done() - err := h.StartNamespace(context.Background(), "ns1") + err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{}) assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} @@ -1225,7 +1225,7 @@ func TestTransferEvents(t *testing.T) { h, toServer, fromServer, _, done := newTestFFTokens(t) defer done() - err := h.StartNamespace(context.Background(), "ns1") + err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{}) assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} @@ -1449,7 +1449,7 @@ func TestApprovalEvents(t *testing.T) { h, toServer, fromServer, _, done := newTestFFTokens(t) defer done() - err := h.StartNamespace(context.Background(), "ns1") + err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{}) assert.NoError(t, err) mcb := &tokenmocks.Callbacks{} @@ -1601,7 +1601,7 @@ func TestStartNamespaceSendClosed(t *testing.T) { } wsm.On("Connect").Return(nil) wsm.On("Send", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) - err := h.StartNamespace(context.Background(), "ns1") + err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{}) assert.Regexp(t, "pop", err) } diff --git a/mocks/assetmocks/manager.go b/mocks/assetmocks/manager.go index 437b2f127..50ca37cd3 100644 --- a/mocks/assetmocks/manager.go +++ b/mocks/assetmocks/manager.go @@ -575,6 +575,20 @@ func (_m *Manager) RunOperation(ctx context.Context, op *core.PreparedOperation) return r0, r1, r2 } +// Start provides a mock function with given fields: ctx +func (_m *Manager) Start(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // TokenApproval provides a mock function with given fields: ctx, approval, waitConfirm func (_m *Manager) TokenApproval(ctx context.Context, approval *core.TokenApprovalInput, waitConfirm bool) (*core.TokenApproval, error) { ret := _m.Called(ctx, approval, waitConfirm) diff --git a/mocks/tokenmocks/plugin.go b/mocks/tokenmocks/plugin.go index 696715de2..5ee88a966 100644 --- a/mocks/tokenmocks/plugin.go +++ b/mocks/tokenmocks/plugin.go @@ -101,6 +101,20 @@ func (_m *Plugin) CheckInterface(ctx context.Context, pool *core.TokenPool, meth return r0, r1 } +// ConnectorName provides a mock function with given fields: +func (_m *Plugin) ConnectorName() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + // CreateTokenPool provides a mock function with given fields: ctx, nsOpID, pool func (_m *Plugin) CreateTokenPool(ctx context.Context, nsOpID string, pool *core.TokenPool) (core.OpPhase, error) { ret := _m.Called(ctx, nsOpID, pool) @@ -196,13 +210,13 @@ func (_m *Plugin) SetOperationHandler(namespace string, handler core.OperationCa _m.Called(namespace, handler) } -// StartNamespace provides a mock function with given fields: ctx, namespace -func (_m *Plugin) StartNamespace(ctx context.Context, namespace string) error { - ret := _m.Called(ctx, namespace) +// StartNamespace provides a mock function with given fields: ctx, namespace, tokenPools +func (_m *Plugin) StartNamespace(ctx context.Context, namespace string, tokenPools []*core.TokenPool) error { + ret := _m.Called(ctx, namespace, tokenPools) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = rf(ctx, namespace) + if rf, ok := ret.Get(0).(func(context.Context, string, []*core.TokenPool) error); ok { + r0 = rf(ctx, namespace, tokenPools) } else { r0 = ret.Error(0) } diff --git a/pkg/tokens/plugin.go b/pkg/tokens/plugin.go index d6fdeb3b4..3a5ee1f15 100644 --- a/pkg/tokens/plugin.go +++ b/pkg/tokens/plugin.go @@ -44,7 +44,7 @@ type Plugin interface { SetOperationHandler(namespace string, handler core.OperationCallbacks) // StartNamespace starts a specific namespace within the plugin - StartNamespace(ctx context.Context, namespace string) error + StartNamespace(ctx context.Context, namespace string, tokenPools []*core.TokenPool) error // StopNamespace removes a namespace from use within the plugin StopNamespace(ctx context.Context, namespace string) error @@ -52,6 +52,9 @@ type Plugin interface { // Capabilities returns capabilities - not called until after Init Capabilities() *Capabilities + // ConnectorName returns the configured connector name (plugin instance) + ConnectorName() string + // CreateTokenPool creates a new (fungible or non-fungible) pool of tokens CreateTokenPool(ctx context.Context, nsOpID string, pool *core.TokenPool) (phase core.OpPhase, err error)