diff --git a/db/migrations/postgres/000116_update_contractlisteners_add_events_column.down.sql b/db/migrations/postgres/000116_update_contractlisteners_add_events_column.down.sql deleted file mode 100644 index 5340286b5..000000000 --- a/db/migrations/postgres/000116_update_contractlisteners_add_events_column.down.sql +++ /dev/null @@ -1,3 +0,0 @@ -BEGIN; -ALTER TABLE contractlisteners DROP COLUMN filters; -COMMIT; \ No newline at end of file diff --git a/db/migrations/postgres/000116_update_contractlisteners_add_filters_column.down.sql b/db/migrations/postgres/000116_update_contractlisteners_add_filters_column.down.sql new file mode 100644 index 000000000..ee65b3b99 --- /dev/null +++ b/db/migrations/postgres/000116_update_contractlisteners_add_filters_column.down.sql @@ -0,0 +1,5 @@ +BEGIN; +ALTER TABLE contractlisteners DROP COLUMN filters; +ALTER TABLE contractlisteners DROP COLUMN filter_hash; +DROP INDEX contractlisteners_filter_hash; +COMMIT; \ No newline at end of file diff --git a/db/migrations/postgres/000116_update_contractlisteners_add_events_column.up.sql b/db/migrations/postgres/000116_update_contractlisteners_add_filters_column.up.sql similarity index 58% rename from db/migrations/postgres/000116_update_contractlisteners_add_events_column.up.sql rename to db/migrations/postgres/000116_update_contractlisteners_add_filters_column.up.sql index 6ff35a34f..c1d97078e 100644 --- a/db/migrations/postgres/000116_update_contractlisteners_add_events_column.up.sql +++ b/db/migrations/postgres/000116_update_contractlisteners_add_filters_column.up.sql @@ -2,4 +2,6 @@ BEGIN; ALTER TABLE contractlisteners ADD COLUMN filters TEXT; ALTER TABLE contractlisteners ALTER COLUMN event DROP NOT NULL; ALTER TABLE contractlisteners ALTER COLUMN location DROP NOT NULL; +ALTER TABLE contractlisteners ADD COLUMN filter_hash CHAR(64); +CREATE INDEX contractlisteners_filter_hash ON contractlisteners(filter_hash); COMMIT: \ No newline at end of file diff --git a/db/migrations/sqlite/000116_update_contractlisteners_add_events_column.down.sql b/db/migrations/sqlite/000116_update_contractlisteners_add_events_column.down.sql deleted file mode 100644 index 234055a4f..000000000 --- a/db/migrations/sqlite/000116_update_contractlisteners_add_events_column.down.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE contractlisteners DROP COLUMN filters; \ No newline at end of file diff --git a/db/migrations/sqlite/000116_update_contractlisteners_add_filters_column.down.sql b/db/migrations/sqlite/000116_update_contractlisteners_add_filters_column.down.sql new file mode 100644 index 000000000..ce3979784 --- /dev/null +++ b/db/migrations/sqlite/000116_update_contractlisteners_add_filters_column.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE contractlisteners DROP COLUMN filters; +ALTER TABLE contractlisteners DROP COLUMN filter_hash; +DROP INDEX contractlisteners_filter_hash; diff --git a/db/migrations/sqlite/000116_update_contractlisteners_add_events_column.up.sql b/db/migrations/sqlite/000116_update_contractlisteners_add_filters_column.up.sql similarity index 75% rename from db/migrations/sqlite/000116_update_contractlisteners_add_events_column.up.sql rename to db/migrations/sqlite/000116_update_contractlisteners_add_filters_column.up.sql index 3a7875e26..2abe16455 100644 --- a/db/migrations/sqlite/000116_update_contractlisteners_add_events_column.up.sql +++ b/db/migrations/sqlite/000116_update_contractlisteners_add_filters_column.up.sql @@ -9,4 +9,7 @@ ALTER TABLE contractlisteners ADD LOCATION event TEXT; UPDATE contractlisteners SET event = event_tmp; UPDATE contractlisteners SET location = location_tmp; ALTER TABLE contractlisteners DROP COLUMN event_tmp; -ALTER TABLE contractlisteners DROP COLUMN location_tmp; \ No newline at end of file +ALTER TABLE contractlisteners DROP COLUMN location_tmp; + +ALTER TABLE contractlisteners ADD COLUMN filter_hash CHAR(64); +CREATE INDEX contractlisteners_filter_hash ON contractlisteners(filter_hash); diff --git a/internal/contracts/manager.go b/internal/contracts/manager.go index ce179c084..23c57c80b 100644 --- a/internal/contracts/manager.go +++ b/internal/contracts/manager.go @@ -19,9 +19,11 @@ package contracts import ( "context" "crypto/sha256" + "database/sql/driver" "encoding/hex" "fmt" "hash" + "sort" "strings" "github.com/hyperledger/firefly-common/pkg/ffapi" @@ -809,6 +811,9 @@ func (cm *contractManager) checkContractListenerExists(ctx context.Context, list database.ContractListenerQueryFactory.NewUpdate(ctx).Set("backendid", listener.BackendID)) } +// TODO: clean this up to make it less complex +// +//nolint:gocyclo func (cm *contractManager) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) (output *core.ContractListener, err error) { listener.ID = fftypes.NewUUID() listener.Namespace = cm.namespace @@ -846,15 +851,6 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co }) } - // Namespace + Name must be unique - if listener.Name != "" { - if existing, err := cm.database.GetContractListener(ctx, cm.namespace, listener.Name); err != nil { - return nil, err - } else if existing != nil { - return nil, i18n.NewError(ctx, coremsgs.MsgContractListenerNameExists, cm.namespace, listener.Name) - } - } - for _, filter := range listener.Filters { if filter.Event == nil { if filter.EventPath == "" || filter.Interface == nil { @@ -880,6 +876,72 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co Signature: filter.Signature, }) } + + filterHash, err := cm.generateFilterHash(listener.ContractListener.Filters) + if err != nil { + return nil, err + } + listener.ContractListener.FilterHash = filterHash + + err = cm.database.RunAsGroup(ctx, func(ctx context.Context) (err error) { + // Namespace + Name must be unique + if listener.Name != "" { + if existing, err := cm.database.GetContractListener(ctx, cm.namespace, listener.Name); err != nil { + return err + } else if existing != nil { + return i18n.NewError(ctx, coremsgs.MsgContractListenerNameExists, cm.namespace, listener.Name) + } + } + + if listener.Event == nil { + if listener.EventPath == "" || listener.Interface == nil { + return i18n.NewError(ctx, coremsgs.MsgListenerNoEvent) + } + // Copy the event definition into the listener + if listener.Event, err = cm.resolveEvent(ctx, listener.Interface, listener.EventPath); err != nil { + return err + } + } else { + listener.Interface = nil + } + + // Namespace + Topic + Location + Signature must be unique + listener.Signature = cm.blockchain.GenerateEventSignature(ctx, &listener.Event.FFIEventDefinition) + // Above we only call NormalizeContractLocation if the listener is non-nil, and that means + // for an unset location we will have a nil value. Using an fftypes.JSONAny in a query + // of nil does not yield the right result, so we need to do an explicit nil query. + var locationLookup driver.Value = nil + if !listener.Location.IsNil() { + locationLookup = listener.Location.String() + } + fb := database.ContractListenerQueryFactory.NewFilter(ctx) + if existing, _, err := cm.database.GetContractListeners(ctx, cm.namespace, fb.Or(fb.And( + fb.Eq("topic", listener.Topic), + fb.Eq("location", locationLookup), + fb.Eq("signature", listener.Signature), + ), fb.And( + fb.Eq("topic", listener.Topic), + fb.Eq("filterhash", listener.FilterHash), + ))); err != nil { + return err + } else if len(existing) > 0 { + return i18n.NewError(ctx, coremsgs.MsgContractListenerExists) + } + return nil + }) + if err != nil { + return nil, err + } + + // Namespace + Name must be unique + if listener.Name != "" { + if existing, err := cm.database.GetContractListener(ctx, cm.namespace, listener.Name); err != nil { + return nil, err + } else if existing != nil { + return nil, i18n.NewError(ctx, coremsgs.MsgContractListenerNameExists, cm.namespace, listener.Name) + } + } + if err = cm.blockchain.AddContractListener(ctx, &listener.ContractListener); err != nil { return nil, err } @@ -893,6 +955,25 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co return &listener.ContractListener, err } +// Sort the filters by signature then location and generate a hash +func (cm *contractManager) generateFilterHash(filters core.ListenerFilters) (*fftypes.Bytes32, error) { + sort.Slice(filters, func(i, j int) bool { + if filters[i].Signature == filters[j].Signature { + return filters[i].Location.String() < filters[j].Location.String() + } else { + return filters[i].Signature < filters[j].Signature + } + }) + var sb strings.Builder + for _, filter := range filters { + sb.WriteString(filter.Location.String()) + sb.WriteString(filter.Signature) + } + hash := sha256.New() + hash.Write([]byte(sb.String())) + return fftypes.HashResult(hash), nil +} + func (cm *contractManager) AddContractAPIListener(ctx context.Context, apiName, eventPath string, listener *core.ContractListener) (output *core.ContractListener, err error) { api, err := cm.database.GetContractAPIByName(ctx, cm.namespace, apiName) if err != nil { diff --git a/internal/contracts/manager_test.go b/internal/contracts/manager_test.go index 5a8e7ddb6..d728cea62 100644 --- a/internal/contracts/manager_test.go +++ b/internal/contracts/manager_test.go @@ -3800,3 +3800,62 @@ func TestEnsureParamNamesIncludedInCacheKeys(t *testing.T) { assert.NotEqual(t, hex.EncodeToString(paramUniqueHash1.Sum(nil)), hex.EncodeToString(paramUniqueHash2.Sum(nil))) } + +func TestFilterHash(t *testing.T) { + cm := newTestContractManager() + + filterSet1 := core.ListenerFilters{ + &core.ListenerFilter{ + Signature: "Changed(uint256)", + Location: fftypes.JSONAnyPtr(`{"address":"0x1fa04bd8ca1b9ce9f19794faf790961134518434"}`), + }, + } + + filterSet2 := core.ListenerFilters{ + &core.ListenerFilter{ + Signature: "Changed(uint256)", + Location: fftypes.JSONAnyPtr(`{"address":"0x1fa04bd8ca1b9ce9f19794faf790961134518434"}`), + }, + } + + hash1, err := cm.generateFilterHash(filterSet1) + assert.NoError(t, err) + hash2, err := cm.generateFilterHash(filterSet2) + assert.NoError(t, err) + + assert.Equal(t, "5b4b14f9da842c8db443b9e4542f84baf0e1a216c3d06b17383b68389db82df2", hash1.String()) + assert.Equal(t, hash1.String(), hash2.String()) + + filterSet1 = core.ListenerFilters{ + &core.ListenerFilter{ + Signature: "Changed(uint256)", + Location: fftypes.JSONAnyPtr(`{"address":"0x1fa04bd8ca1b9ce9f19794faf790961134518434"}`), + }, + &core.ListenerFilter{ + Signature: "Changed(uint256)", + Location: fftypes.JSONAnyPtr(`{"address":"0x217e63be04ddac2a6e28eb653131aeb00a3fd0f4"}`), + }, + } + + filterSet2 = core.ListenerFilters{ + &core.ListenerFilter{ + Signature: "Changed(uint256)", + Location: fftypes.JSONAnyPtr(`{"address":"0x217e63be04ddac2a6e28eb653131aeb00a3fd0f4"}`), + }, + &core.ListenerFilter{ + Signature: "Changed(uint256)", + Interface: &fftypes.FFIReference{ + ID: fftypes.NewUUID(), + }, + Location: fftypes.JSONAnyPtr(`{"address":"0x1fa04bd8ca1b9ce9f19794faf790961134518434"}`), + }, + } + + hash1, err = cm.generateFilterHash(filterSet1) + assert.NoError(t, err) + hash2, err = cm.generateFilterHash(filterSet2) + assert.NoError(t, err) + + assert.Equal(t, "ecf24a607244d2dcdc245f694665ce8acd21391a2291978a27a5fbe82c0d4689", hash1.String()) + assert.Equal(t, hash1.String(), hash2.String()) +} diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index 7ad5cec26..02a174496 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -223,7 +223,7 @@ var ( MsgOperationDataIncorrect = ffe("FF10378", "Operation data type incorrect: %T", 400) MsgDataMissingBlobHash = ffe("FF10379", "Blob for data %s cannot be transferred as it is missing a hash", 500) MsgUnexpectedDXMessageType = ffe("FF10380", "Unexpected websocket event type from DX plugin: %s", 500) - MsgContractListenerExists = ffe("FF10383", "A contract listener already exists for this combination of topic + location + event", 409) + MsgContractListenerExists = ffe("FF10383", "A contract listener already exists for this combination of topic + filters (location + event)", 409) MsgInvalidOutputOption = ffe("FF10385", "invalid output option '%s'") MsgInvalidPluginConfiguration = ffe("FF10386", "Invalid %s plugin configuration - name and type are required") MsgReferenceMarkdownMissing = ffe("FF10387", "Reference markdown file missing: '%s'") diff --git a/internal/database/sqlcommon/contractlisteners_sql.go b/internal/database/sqlcommon/contractlisteners_sql.go index 1f9ccfc63..10b76671b 100644 --- a/internal/database/sqlcommon/contractlisteners_sql.go +++ b/internal/database/sqlcommon/contractlisteners_sql.go @@ -45,6 +45,7 @@ var ( "options", "created", "filters", + "filter_hash", } contractListenerFilterFieldMap = map[string]string{ "interface": "interface_id", @@ -83,6 +84,7 @@ func (s *SQLCommon) InsertContractListener(ctx context.Context, listener *core.C listener.Options, listener.Created, listener.Filters, + listener.FilterHash, ), func() { s.callbacks.UUIDCollectionNSEvent(database.CollectionContractListeners, core.ChangeEventTypeCreated, listener.Namespace, listener.ID) @@ -111,6 +113,7 @@ func (s *SQLCommon) contractListenerResult(ctx context.Context, row *sql.Rows) ( &listener.Options, &listener.Created, &listener.Filters, + &listener.FilterHash, ) if err != nil { return nil, i18n.WrapError(ctx, err, coremsgs.MsgDBReadErr, contractlistenersTable) diff --git a/internal/database/sqlcommon/contractlisteners_sql_test.go b/internal/database/sqlcommon/contractlisteners_sql_test.go index 6902148b1..dbe448556 100644 --- a/internal/database/sqlcommon/contractlisteners_sql_test.go +++ b/internal/database/sqlcommon/contractlisteners_sql_test.go @@ -237,7 +237,7 @@ func TestContractListenerDeleteFail(t *testing.T) { s, mock := newMockProvider().init() mock.ExpectBegin() mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows(contractListenerColumns).AddRow( - fftypes.NewUUID(), nil, []byte("{}"), "ns1", "sub1", "123", "{}", "sig", "topic1", nil, fftypes.Now(), "[]"), + fftypes.NewUUID(), nil, []byte("{}"), "ns1", "sub1", "123", "{}", "sig", "topic1", nil, fftypes.Now(), "[]", ""), ) mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop")) err := s.DeleteContractListenerByID(context.Background(), "ns", fftypes.NewUUID()) diff --git a/pkg/core/contract_listener.go b/pkg/core/contract_listener.go index 6db9d2931..8a2347bb8 100644 --- a/pkg/core/contract_listener.go +++ b/pkg/core/contract_listener.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -26,18 +26,19 @@ import ( ) type ContractListener struct { - ID *fftypes.UUID `ffstruct:"ContractListener" json:"id,omitempty" ffexcludeinput:"true"` - Interface *fftypes.FFIReference `ffstruct:"ContractListener" json:"interface,omitempty" ffexcludeinput:"postContractAPIListeners"` - Namespace string `ffstruct:"ContractListener" json:"namespace,omitempty" ffexcludeinput:"true"` - Name string `ffstruct:"ContractListener" json:"name,omitempty"` - BackendID string `ffstruct:"ContractListener" json:"backendId,omitempty" ffexcludeinput:"true"` - Location *fftypes.JSONAny `ffstruct:"ContractListener" json:"location,omitempty" ffexcludeinput:"true"` - Created *fftypes.FFTime `ffstruct:"ContractListener" json:"created,omitempty" ffexcludeinput:"true"` - Event *FFISerializedEvent `ffstruct:"ContractListener" json:"event,omitempty" ffexcludeinput:"true"` - Filters ListenerFilters `ffstruct:"ContractListener" json:"filters,omitempty" ffexcludeinput:"postContractAPIListeners"` - Signature string `ffstruct:"ContractListener" json:"signature,omitempty" ffexcludeinput:"true"` - Topic string `ffstruct:"ContractListener" json:"topic,omitempty"` - Options *ContractListenerOptions `ffstruct:"ContractListener" json:"options,omitempty"` + ID *fftypes.UUID `ffstruct:"ContractListener" json:"id,omitempty" ffexcludeinput:"true"` + Interface *fftypes.FFIReference `ffstruct:"ContractListener" json:"interface,omitempty" ffexcludeinput:"postContractAPIListeners"` + Namespace string `ffstruct:"ContractListener" json:"namespace,omitempty" ffexcludeinput:"true"` + Name string `ffstruct:"ContractListener" json:"name,omitempty"` + BackendID string `ffstruct:"ContractListener" json:"backendId,omitempty" ffexcludeinput:"true"` + Location *fftypes.JSONAny `ffstruct:"ContractListener" json:"location,omitempty" ffexcludeinput:"true"` + Created *fftypes.FFTime `ffstruct:"ContractListener" json:"created,omitempty" ffexcludeinput:"true"` + Event *FFISerializedEvent `ffstruct:"ContractListener" json:"event,omitempty" ffexcludeinput:"true"` + Filters ListenerFilters `ffstruct:"ContractListener" json:"filters,omitempty" ffexcludeinput:"postContractAPIListeners"` + Signature string `ffstruct:"ContractListener" json:"signature,omitempty" ffexcludeinput:"true"` + Topic string `ffstruct:"ContractListener" json:"topic,omitempty"` + Options *ContractListenerOptions `ffstruct:"ContractListener" json:"options,omitempty"` + FilterHash *fftypes.Bytes32 `json:"-"` // For internal use } type ContractListenerWithStatus struct {