Skip to content

Commit

Permalink
WIP: More work on multiple filters
Browse files Browse the repository at this point in the history
Signed-off-by: Nicko Guyer <[email protected]>
  • Loading branch information
nguyer committed Feb 28, 2024
1 parent 62f55ef commit 9d14273
Show file tree
Hide file tree
Showing 12 changed files with 182 additions and 29 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
ALTER TABLE contractlisteners DROP COLUMN filters;
ALTER TABLE contractlisteners DROP COLUMN filter_hash;
DROP INDEX contractlisteners_filter_hash;
COMMIT;
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ BEGIN;
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;
ALTER TABLE contractlisteners ALTER COLUMN event DROP NOT NULL;
ALTER TABLE contractlisteners ALTER COLUMN location DROP NOT NULL;
ALTER TABLE contractlisteners ADD COLUMN filter_hash CHAR(64);
CREATE INDEX contractlisteners_filter_hash ON contractlisteners(filter_hash);
COMMIT:

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE contractlisteners DROP COLUMN filters;
ALTER TABLE contractlisteners DROP COLUMN filter_hash;
DROP INDEX contractlisteners_filter_hash;
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ ALTER TABLE contractlisteners ADD LOCATION event TEXT;
UPDATE contractlisteners SET event = event_tmp;
UPDATE contractlisteners SET location = location_tmp;
ALTER TABLE contractlisteners DROP COLUMN event_tmp;
ALTER TABLE contractlisteners DROP COLUMN location_tmp;
ALTER TABLE contractlisteners DROP COLUMN location_tmp;

ALTER TABLE contractlisteners ADD COLUMN filter_hash CHAR(64);
CREATE INDEX contractlisteners_filter_hash ON contractlisteners(filter_hash);
99 changes: 90 additions & 9 deletions internal/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package contracts
import (
"context"
"crypto/sha256"
"database/sql/driver"
"encoding/hex"
"fmt"
"hash"
"sort"
"strings"

"github.com/hyperledger/firefly-common/pkg/ffapi"
Expand Down Expand Up @@ -809,6 +811,9 @@ func (cm *contractManager) checkContractListenerExists(ctx context.Context, list
database.ContractListenerQueryFactory.NewUpdate(ctx).Set("backendid", listener.BackendID))
}

// TODO: clean this up to make it less complex
//
//nolint:gocyclo
func (cm *contractManager) AddContractListener(ctx context.Context, listener *core.ContractListenerInput) (output *core.ContractListener, err error) {
listener.ID = fftypes.NewUUID()
listener.Namespace = cm.namespace
Expand Down Expand Up @@ -846,15 +851,6 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co
})
}

// Namespace + Name must be unique
if listener.Name != "" {
if existing, err := cm.database.GetContractListener(ctx, cm.namespace, listener.Name); err != nil {
return nil, err
} else if existing != nil {
return nil, i18n.NewError(ctx, coremsgs.MsgContractListenerNameExists, cm.namespace, listener.Name)
}
}

for _, filter := range listener.Filters {
if filter.Event == nil {
if filter.EventPath == "" || filter.Interface == nil {
Expand All @@ -880,6 +876,72 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co
Signature: filter.Signature,
})
}

filterHash, err := cm.generateFilterHash(listener.ContractListener.Filters)
if err != nil {
return nil, err
}
listener.ContractListener.FilterHash = filterHash

err = cm.database.RunAsGroup(ctx, func(ctx context.Context) (err error) {
// Namespace + Name must be unique
if listener.Name != "" {
if existing, err := cm.database.GetContractListener(ctx, cm.namespace, listener.Name); err != nil {
return err
} else if existing != nil {
return i18n.NewError(ctx, coremsgs.MsgContractListenerNameExists, cm.namespace, listener.Name)
}
}

if listener.Event == nil {
if listener.EventPath == "" || listener.Interface == nil {
return i18n.NewError(ctx, coremsgs.MsgListenerNoEvent)
}
// Copy the event definition into the listener
if listener.Event, err = cm.resolveEvent(ctx, listener.Interface, listener.EventPath); err != nil {
return err
}
} else {
listener.Interface = nil
}

// Namespace + Topic + Location + Signature must be unique
listener.Signature = cm.blockchain.GenerateEventSignature(ctx, &listener.Event.FFIEventDefinition)
// Above we only call NormalizeContractLocation if the listener is non-nil, and that means
// for an unset location we will have a nil value. Using an fftypes.JSONAny in a query
// of nil does not yield the right result, so we need to do an explicit nil query.
var locationLookup driver.Value = nil
if !listener.Location.IsNil() {
locationLookup = listener.Location.String()
}
fb := database.ContractListenerQueryFactory.NewFilter(ctx)
if existing, _, err := cm.database.GetContractListeners(ctx, cm.namespace, fb.Or(fb.And(
fb.Eq("topic", listener.Topic),
fb.Eq("location", locationLookup),
fb.Eq("signature", listener.Signature),
), fb.And(
fb.Eq("topic", listener.Topic),
fb.Eq("filterhash", listener.FilterHash),
))); err != nil {
return err
} else if len(existing) > 0 {
return i18n.NewError(ctx, coremsgs.MsgContractListenerExists)
}
return nil
})
if err != nil {
return nil, err
}

// Namespace + Name must be unique
if listener.Name != "" {
if existing, err := cm.database.GetContractListener(ctx, cm.namespace, listener.Name); err != nil {
return nil, err
} else if existing != nil {
return nil, i18n.NewError(ctx, coremsgs.MsgContractListenerNameExists, cm.namespace, listener.Name)
}
}

if err = cm.blockchain.AddContractListener(ctx, &listener.ContractListener); err != nil {
return nil, err
}
Expand All @@ -893,6 +955,25 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co
return &listener.ContractListener, err
}

// Sort the filters by signature then location and generate a hash
func (cm *contractManager) generateFilterHash(filters core.ListenerFilters) (*fftypes.Bytes32, error) {
sort.Slice(filters, func(i, j int) bool {
if filters[i].Signature == filters[j].Signature {
return filters[i].Location.String() < filters[j].Location.String()
} else {
return filters[i].Signature < filters[j].Signature
}
})
var sb strings.Builder
for _, filter := range filters {
sb.WriteString(filter.Location.String())
sb.WriteString(filter.Signature)
}
hash := sha256.New()
hash.Write([]byte(sb.String()))
return fftypes.HashResult(hash), nil
}

func (cm *contractManager) AddContractAPIListener(ctx context.Context, apiName, eventPath string, listener *core.ContractListener) (output *core.ContractListener, err error) {
api, err := cm.database.GetContractAPIByName(ctx, cm.namespace, apiName)
if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions internal/contracts/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3800,3 +3800,62 @@ func TestEnsureParamNamesIncludedInCacheKeys(t *testing.T) {
assert.NotEqual(t, hex.EncodeToString(paramUniqueHash1.Sum(nil)), hex.EncodeToString(paramUniqueHash2.Sum(nil)))

}

func TestFilterHash(t *testing.T) {
cm := newTestContractManager()

filterSet1 := core.ListenerFilters{
&core.ListenerFilter{
Signature: "Changed(uint256)",
Location: fftypes.JSONAnyPtr(`{"address":"0x1fa04bd8ca1b9ce9f19794faf790961134518434"}`),
},
}

filterSet2 := core.ListenerFilters{
&core.ListenerFilter{
Signature: "Changed(uint256)",
Location: fftypes.JSONAnyPtr(`{"address":"0x1fa04bd8ca1b9ce9f19794faf790961134518434"}`),
},
}

hash1, err := cm.generateFilterHash(filterSet1)
assert.NoError(t, err)
hash2, err := cm.generateFilterHash(filterSet2)
assert.NoError(t, err)

assert.Equal(t, "5b4b14f9da842c8db443b9e4542f84baf0e1a216c3d06b17383b68389db82df2", hash1.String())
assert.Equal(t, hash1.String(), hash2.String())

filterSet1 = core.ListenerFilters{
&core.ListenerFilter{
Signature: "Changed(uint256)",
Location: fftypes.JSONAnyPtr(`{"address":"0x1fa04bd8ca1b9ce9f19794faf790961134518434"}`),
},
&core.ListenerFilter{
Signature: "Changed(uint256)",
Location: fftypes.JSONAnyPtr(`{"address":"0x217e63be04ddac2a6e28eb653131aeb00a3fd0f4"}`),
},
}

filterSet2 = core.ListenerFilters{
&core.ListenerFilter{
Signature: "Changed(uint256)",
Location: fftypes.JSONAnyPtr(`{"address":"0x217e63be04ddac2a6e28eb653131aeb00a3fd0f4"}`),
},
&core.ListenerFilter{
Signature: "Changed(uint256)",
Interface: &fftypes.FFIReference{
ID: fftypes.NewUUID(),
},
Location: fftypes.JSONAnyPtr(`{"address":"0x1fa04bd8ca1b9ce9f19794faf790961134518434"}`),
},
}

hash1, err = cm.generateFilterHash(filterSet1)
assert.NoError(t, err)
hash2, err = cm.generateFilterHash(filterSet2)
assert.NoError(t, err)

assert.Equal(t, "ecf24a607244d2dcdc245f694665ce8acd21391a2291978a27a5fbe82c0d4689", hash1.String())
assert.Equal(t, hash1.String(), hash2.String())
}
2 changes: 1 addition & 1 deletion internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ var (
MsgOperationDataIncorrect = ffe("FF10378", "Operation data type incorrect: %T", 400)
MsgDataMissingBlobHash = ffe("FF10379", "Blob for data %s cannot be transferred as it is missing a hash", 500)
MsgUnexpectedDXMessageType = ffe("FF10380", "Unexpected websocket event type from DX plugin: %s", 500)
MsgContractListenerExists = ffe("FF10383", "A contract listener already exists for this combination of topic + location + event", 409)
MsgContractListenerExists = ffe("FF10383", "A contract listener already exists for this combination of topic + filters (location + event)", 409)
MsgInvalidOutputOption = ffe("FF10385", "invalid output option '%s'")
MsgInvalidPluginConfiguration = ffe("FF10386", "Invalid %s plugin configuration - name and type are required")
MsgReferenceMarkdownMissing = ffe("FF10387", "Reference markdown file missing: '%s'")
Expand Down
3 changes: 3 additions & 0 deletions internal/database/sqlcommon/contractlisteners_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
"options",
"created",
"filters",
"filter_hash",
}
contractListenerFilterFieldMap = map[string]string{
"interface": "interface_id",
Expand Down Expand Up @@ -83,6 +84,7 @@ func (s *SQLCommon) InsertContractListener(ctx context.Context, listener *core.C
listener.Options,
listener.Created,
listener.Filters,
listener.FilterHash,
),
func() {
s.callbacks.UUIDCollectionNSEvent(database.CollectionContractListeners, core.ChangeEventTypeCreated, listener.Namespace, listener.ID)
Expand Down Expand Up @@ -111,6 +113,7 @@ func (s *SQLCommon) contractListenerResult(ctx context.Context, row *sql.Rows) (
&listener.Options,
&listener.Created,
&listener.Filters,
&listener.FilterHash,
)
if err != nil {
return nil, i18n.WrapError(ctx, err, coremsgs.MsgDBReadErr, contractlistenersTable)
Expand Down
2 changes: 1 addition & 1 deletion internal/database/sqlcommon/contractlisteners_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestContractListenerDeleteFail(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows(contractListenerColumns).AddRow(
fftypes.NewUUID(), nil, []byte("{}"), "ns1", "sub1", "123", "{}", "sig", "topic1", nil, fftypes.Now(), "[]"),
fftypes.NewUUID(), nil, []byte("{}"), "ns1", "sub1", "123", "{}", "sig", "topic1", nil, fftypes.Now(), "[]", ""),
)
mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop"))
err := s.DeleteContractListenerByID(context.Background(), "ns", fftypes.NewUUID())
Expand Down
27 changes: 14 additions & 13 deletions pkg/core/contract_listener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -26,18 +26,19 @@ import (
)

type ContractListener struct {
ID *fftypes.UUID `ffstruct:"ContractListener" json:"id,omitempty" ffexcludeinput:"true"`
Interface *fftypes.FFIReference `ffstruct:"ContractListener" json:"interface,omitempty" ffexcludeinput:"postContractAPIListeners"`
Namespace string `ffstruct:"ContractListener" json:"namespace,omitempty" ffexcludeinput:"true"`
Name string `ffstruct:"ContractListener" json:"name,omitempty"`
BackendID string `ffstruct:"ContractListener" json:"backendId,omitempty" ffexcludeinput:"true"`
Location *fftypes.JSONAny `ffstruct:"ContractListener" json:"location,omitempty" ffexcludeinput:"true"`
Created *fftypes.FFTime `ffstruct:"ContractListener" json:"created,omitempty" ffexcludeinput:"true"`
Event *FFISerializedEvent `ffstruct:"ContractListener" json:"event,omitempty" ffexcludeinput:"true"`
Filters ListenerFilters `ffstruct:"ContractListener" json:"filters,omitempty" ffexcludeinput:"postContractAPIListeners"`
Signature string `ffstruct:"ContractListener" json:"signature,omitempty" ffexcludeinput:"true"`
Topic string `ffstruct:"ContractListener" json:"topic,omitempty"`
Options *ContractListenerOptions `ffstruct:"ContractListener" json:"options,omitempty"`
ID *fftypes.UUID `ffstruct:"ContractListener" json:"id,omitempty" ffexcludeinput:"true"`
Interface *fftypes.FFIReference `ffstruct:"ContractListener" json:"interface,omitempty" ffexcludeinput:"postContractAPIListeners"`
Namespace string `ffstruct:"ContractListener" json:"namespace,omitempty" ffexcludeinput:"true"`
Name string `ffstruct:"ContractListener" json:"name,omitempty"`
BackendID string `ffstruct:"ContractListener" json:"backendId,omitempty" ffexcludeinput:"true"`
Location *fftypes.JSONAny `ffstruct:"ContractListener" json:"location,omitempty" ffexcludeinput:"true"`
Created *fftypes.FFTime `ffstruct:"ContractListener" json:"created,omitempty" ffexcludeinput:"true"`
Event *FFISerializedEvent `ffstruct:"ContractListener" json:"event,omitempty" ffexcludeinput:"true"`
Filters ListenerFilters `ffstruct:"ContractListener" json:"filters,omitempty" ffexcludeinput:"postContractAPIListeners"`
Signature string `ffstruct:"ContractListener" json:"signature,omitempty" ffexcludeinput:"true"`
Topic string `ffstruct:"ContractListener" json:"topic,omitempty"`
Options *ContractListenerOptions `ffstruct:"ContractListener" json:"options,omitempty"`
FilterHash *fftypes.Bytes32 `json:"-"` // For internal use
}

type ContractListenerWithStatus struct {
Expand Down

0 comments on commit 9d14273

Please sign in to comment.