Skip to content

Commit

Permalink
Re-activate token pool on startup
Browse files Browse the repository at this point in the history
Signed-off-by: Nicko Guyer <[email protected]>
  • Loading branch information
nguyer committed Nov 10, 2023
1 parent 0559710 commit cecee27
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 32 deletions.
2 changes: 1 addition & 1 deletion db/migrations/sqlite/000116_tx_type_not_null.up.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
UPDATE messages SET tx_parent_type = '' WHERE tx_parent_type IS NULL;
ALTER TABLE messages RENAME COLUMN tx_parent_type TO tx_parent_type_temp;
ALTER TABLE messages ADD COLUMN tx_parent_type VARCHAR(64) NOT NULL;
ALTER TABLE messages ADD COLUMN tx_parent_type VARCHAR(64) DEFAULT '' NOT NULL;
UPDATE messages SET tx_parent_type = tx_parent_type_temp;
ALTER TABLE messages DROP COLUMN tx_parent_type_temp;
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
Expand Down
25 changes: 25 additions & 0 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Manager interface {
// From operations.OperationHandler
PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error)

// Starts the namespace on each of the configured token plugins
Start(ctx context.Context) error
}

type assetManager struct {
Expand Down Expand Up @@ -170,6 +173,28 @@ func (am *assetManager) GetTokenConnectors(ctx context.Context) []*core.TokenCon
return connectors
}

func (am *assetManager) Start(ctx context.Context) error {
f := database.TokenPoolQueryFactory.NewFilter(ctx).And()
pools, _, err := am.database.GetTokenPools(ctx, am.namespace, f)
if err != nil {
return err
}

for _, plugin := range am.tokens {
activePools := []*core.TokenPool{}
for _, pool := range pools {
if pool.Connector == plugin.ConnectorName() && pool.Active {
activePools = append(activePools, pool)
}
}
err := plugin.StartNamespace(ctx, am.namespace, activePools)
if err != nil {
return err
}
}
return nil
}

func (am *assetManager) getDefaultTokenConnector(ctx context.Context) (string, error) {
tokenConnectors := am.GetTokenConnectors(ctx)
if len(tokenConnectors) != 1 {
Expand Down
10 changes: 3 additions & 7 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ func (or *orchestrator) initManagers(ctx context.Context) (err error) {
if err != nil {
return err
}
if err := or.assets.Start(ctx); err != nil {
return err
}
}

if or.defsender == nil {
Expand Down Expand Up @@ -569,13 +572,6 @@ func (or *orchestrator) initComponents(ctx context.Context) (err error) {
}
}

for _, t := range or.tokens() {
err := t.StartNamespace(ctx, or.namespace.Name)
if err != nil {
return err
}
}

if or.data == nil {
or.data, err = data.NewDataManager(ctx, or.namespace, or.database(), or.dataexchange(), or.cacheManager)
if err != nil {
Expand Down
58 changes: 48 additions & 10 deletions internal/tokens/fftokens/fftokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ func (ie *ConflictError) IsConflictError() bool {
}

type FFTokens struct {
ctx context.Context
cancelCtx context.CancelFunc
capabilities *tokens.Capabilities
callbacks callbacks
configuredName string
client *resty.Client
wsconn map[string]wsclient.WSClient
wsConfig *wsclient.WSConfig
retry *retry.Retry
ctx context.Context
cancelCtx context.CancelFunc
capabilities *tokens.Capabilities
callbacks callbacks
configuredName string
client *resty.Client
wsconn map[string]wsclient.WSClient
wsConfig *wsclient.WSConfig
retry *retry.Retry
poolsToActivate map[string][]*core.TokenPool
}

type callbacks struct {
Expand Down Expand Up @@ -153,6 +154,8 @@ const (
messageTokenBurn msgType = "token-burn"
messageTokenTransfer msgType = "token-transfer"
messageTokenApproval msgType = "token-approval"
messageStarted msgType = "started"
messageActivated msgType = "activated"
)

type tokenData struct {
Expand Down Expand Up @@ -276,6 +279,10 @@ func (ft *FFTokens) Name() string {
return "fftokens"
}

func (ft *FFTokens) ConnectorName() string {
return ft.configuredName
}

func (ft *FFTokens) Init(ctx context.Context, cancelCtx context.CancelFunc, name string, config config.Section) (err error) {
ft.ctx = log.WithLogField(ctx, "proto", "fftokens")
ft.cancelCtx = cancelCtx
Expand Down Expand Up @@ -315,14 +322,21 @@ func (ft *FFTokens) Init(ctx context.Context, cancelCtx context.CancelFunc, name
return nil
}

func (ft *FFTokens) StartNamespace(ctx context.Context, namespace string) (err error) {
func (ft *FFTokens) StartNamespace(ctx context.Context, namespace string, activePools []*core.TokenPool) (err error) {
if ft.wsconn[namespace] == nil {
ft.wsconn[namespace], err = wsclient.New(ctx, ft.wsConfig, nil, nil)
if err != nil {
return err
}
}

// Keep the list of pools we need to ensure are active
// The handleNamespaceStarted function will ensure pools are active after the namespace has finished starting
if ft.poolsToActivate == nil {
ft.poolsToActivate = make(map[string][]*core.TokenPool)
}
ft.poolsToActivate[namespace] = activePools

err = ft.wsconn[namespace].Connect()
if err != nil {
return err
Expand Down Expand Up @@ -649,6 +663,10 @@ func (ft *FFTokens) handleMessage(ctx context.Context, namespace string, msgByte
err = ft.handleTokenTransfer(ctx, core.TokenTransferTypeTransfer, msg.Data)
case messageTokenApproval:
err = ft.handleTokenApproval(ctx, msg.Data)
case messageStarted:
err = ft.handleNamespaceStarted(ctx, msg.Data)
case messageActivated:
err = ft.handlePoolActivated(ctx, msg.Data)
default:
log.L(ctx).Errorf("Message unexpected: %s", msg.Event)
// do not set error here - we will never be able to process this message so log+swallow it.
Expand Down Expand Up @@ -679,6 +697,26 @@ func (ft *FFTokens) handleMessageRetry(ctx context.Context, namespace string, ms
})
}

func (ft *FFTokens) handleNamespaceStarted(ctx context.Context, data fftypes.JSONObject) error {
// Make sure any pools that are marked as active in our DB are indeed active
namespace := data.GetString("namespace")
log.L(ctx).Debugf("Token connector '%s' started namespace '%s'. Ensuring all token pools active.", ft.Name(), namespace)
for _, pool := range ft.poolsToActivate[namespace] {
if _, err := ft.ActivateTokenPool(ctx, pool); err != nil {
// Log the error and continue trying to activate pools
// At this point we've already started
log.L(ctx).Errorf("Error auto re-activating token pool '%s': %s", pool.ID, err.Error())
}
log.L(ctx).Debugf("Activated token pool '%s'", pool.ID)
}
return nil
}

func (ft *FFTokens) handlePoolActivated(ctx context.Context, data fftypes.JSONObject) error {
// NOOP
return nil
}

func (ft *FFTokens) eventLoop(namespace string) {
wsconn := ft.wsconn[namespace]
defer wsconn.Close()
Expand Down
16 changes: 8 additions & 8 deletions internal/tokens/fftokens/fftokens_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestInitBadURL(t *testing.T) {
err := h.Init(ctx, cancelCtx, "testtokens", ffTokensConfig)
assert.NoError(t, err)

err = h.StartNamespace(ctx, "ns1")
err = h.StartNamespace(ctx, "ns1", []*core.TokenPool{})
assert.Regexp(t, "FF00149", err)
}

Expand All @@ -121,7 +121,7 @@ func TestStartNamespaceConnectFail(t *testing.T) {
err := h.Init(ctx, cancelCtx, "testtokens", ffTokensConfig)
assert.NoError(t, err)

err = h.StartNamespace(ctx, "ns1")
err = h.StartNamespace(ctx, "ns1", []*core.TokenPool{})
assert.Error(t, err)
}

Expand Down Expand Up @@ -987,7 +987,7 @@ func TestIgnoredEvents(t *testing.T) {
h, toServer, fromServer, _, done := newTestFFTokens(t)
defer done()

err := h.StartNamespace(context.Background(), "ns1")
err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{})
assert.NoError(t, err)

fromServer <- `!}` // ignored
Expand All @@ -1012,7 +1012,7 @@ func TestReceiptEvents(t *testing.T) {
h, _, fromServer, _, done := newTestFFTokens(t)
defer done()

err := h.StartNamespace(context.Background(), "ns1")
err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{})
assert.NoError(t, err)

mcb := &coremocks.OperationCallbacks{}
Expand Down Expand Up @@ -1104,7 +1104,7 @@ func TestPoolEvents(t *testing.T) {
h, toServer, fromServer, _, done := newTestFFTokens(t)
defer done()

err := h.StartNamespace(context.Background(), "ns1")
err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{})
assert.NoError(t, err)

mcb := &tokenmocks.Callbacks{}
Expand Down Expand Up @@ -1225,7 +1225,7 @@ func TestTransferEvents(t *testing.T) {
h, toServer, fromServer, _, done := newTestFFTokens(t)
defer done()

err := h.StartNamespace(context.Background(), "ns1")
err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{})
assert.NoError(t, err)

mcb := &tokenmocks.Callbacks{}
Expand Down Expand Up @@ -1449,7 +1449,7 @@ func TestApprovalEvents(t *testing.T) {
h, toServer, fromServer, _, done := newTestFFTokens(t)
defer done()

err := h.StartNamespace(context.Background(), "ns1")
err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{})
assert.NoError(t, err)

mcb := &tokenmocks.Callbacks{}
Expand Down Expand Up @@ -1601,7 +1601,7 @@ func TestStartNamespaceSendClosed(t *testing.T) {
}
wsm.On("Connect").Return(nil)
wsm.On("Send", mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
err := h.StartNamespace(context.Background(), "ns1")
err := h.StartNamespace(context.Background(), "ns1", []*core.TokenPool{})
assert.Regexp(t, "pop", err)
}

Expand Down
14 changes: 14 additions & 0 deletions mocks/assetmocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 19 additions & 5 deletions mocks/tokenmocks/plugin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion pkg/tokens/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,17 @@ type Plugin interface {
SetOperationHandler(namespace string, handler core.OperationCallbacks)

// StartNamespace starts a specific namespace within the plugin
StartNamespace(ctx context.Context, namespace string) error
StartNamespace(ctx context.Context, namespace string, tokenPools []*core.TokenPool) error

// StopNamespace removes a namespace from use within the plugin
StopNamespace(ctx context.Context, namespace string) error

// Capabilities returns capabilities - not called until after Init
Capabilities() *Capabilities

// ConnectorName returns the configured connector name (plugin instance)
ConnectorName() string

// CreateTokenPool creates a new (fungible or non-fungible) pool of tokens
CreateTokenPool(ctx context.Context, nsOpID string, pool *core.TokenPool) (phase core.OpPhase, err error)

Expand Down

0 comments on commit cecee27

Please sign in to comment.