Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable contract listeners with multiple filters #1418

Merged
merged 32 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
25c32e3
Enable contract listeners with multiple filters
nguyer Oct 12, 2023
135cfb0
WIP: More work on multiple filters
nguyer Feb 28, 2024
7f233a0
Fixes for multiple filters
nguyer Mar 6, 2024
5a73d6f
New contract listener Hash API
EnriqueL8 Jun 7, 2024
ad9cca2
Review comments
EnriqueL8 Jun 12, 2024
0ddb3d1
revert this
EnriqueL8 Jun 12, 2024
e5b91e7
More review comments
EnriqueL8 Jun 12, 2024
d004e15
check filters in blockchain plugins
EnriqueL8 Jun 12, 2024
230d707
more fixes and test
EnriqueL8 Jun 12, 2024
3cb7f78
Merge remote-tracking branch 'origin/main' into filters
EnriqueL8 Jun 12, 2024
1d37c4c
fix mocks and tests
EnriqueL8 Jun 12, 2024
165bfa1
New implementation of filters for contract listeners
EnriqueL8 Jun 21, 2024
d713123
Add indexed position to ABI event signature
EnriqueL8 Jun 21, 2024
0ea504f
more fixes and 100% test coverage
EnriqueL8 Jul 2, 2024
5ba07f8
fix filters in evmconnect
EnriqueL8 Jul 2, 2024
1d86f95
fix migration
EnriqueL8 Jul 3, 2024
1d6ae91
more fixes and migrations
EnriqueL8 Jul 3, 2024
c83c6be
Move migrations to startup
EnriqueL8 Jul 3, 2024
69fd561
do not migrate the signature
EnriqueL8 Jul 3, 2024
ef8edb9
fix collection event
EnriqueL8 Jul 3, 2024
0b61f86
fix previous signature lookup and location
EnriqueL8 Jul 4, 2024
8320273
fix PSQL migration
EnriqueL8 Jul 4, 2024
26efff4
fix ethconnect subscriptions
EnriqueL8 Jul 4, 2024
66436d3
a few more tweaks
EnriqueL8 Jul 8, 2024
4ac0235
finally got to 100% coverage
EnriqueL8 Jul 8, 2024
12e018b
Move the signature with location to the blockchain plugins
EnriqueL8 Jul 11, 2024
d6cfa8f
Merge remote-tracking branch 'origin/main' into filters
EnriqueL8 Jul 11, 2024
e43809c
fix test after conflicts
EnriqueL8 Jul 11, 2024
c85c247
fix coverage for sqlcommon
EnriqueL8 Jul 11, 2024
9777588
Update internal/blockchain/ethereum/ethereum.go
EnriqueL8 Jul 11, 2024
a468891
fix linting
EnriqueL8 Jul 11, 2024
fc18b63
move to VARCHAR for postgres
EnriqueL8 Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
BEGIN;
ALTER TABLE contractlisteners DROP COLUMN filters;
-- no down for the VARCHAR change
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;
peterbroadhurst marked this conversation as resolved.
Show resolved Hide resolved
-- changing the length of varchar does not affect the index
ALTER TABLE contractlisteners ALTER COLUMN signature TYPE VARCHAR(MAX);
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE contractlisteners DROP COLUMN filters;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;
-- in SQLITE VARCHAR is equivalent to TEXT so no migration for signature length
19 changes: 15 additions & 4 deletions doc-site/docs/reference/types/contractlistener.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@ title: ContractListener
| Field Name | Description | Type |
|------------|-------------|------|
| `id` | The UUID of the smart contract listener | [`UUID`](simpletypes.md#uuid) |
| `interface` | A reference to an existing FFI, containing pre-registered type information for the event | [`FFIReference`](#ffireference) |
| `interface` | Deprecated: Please use 'interface' in the array of 'filters' instead | [`FFIReference`](#ffireference) |
| `namespace` | The namespace of the listener, which defines the namespace of all blockchain events detected by this listener | `string` |
| `name` | A descriptive name for the listener | `string` |
| `backendId` | An ID assigned by the blockchain connector to this listener | `string` |
| `location` | A blockchain specific contract identifier. For example an Ethereum contract address, or a Fabric chaincode name and channel | [`JSONAny`](simpletypes.md#jsonany) |
| `location` | Deprecated: Please use 'location' in the array of 'filters' instead | [`JSONAny`](simpletypes.md#jsonany) |
| `created` | The creation time of the listener | [`FFTime`](simpletypes.md#fftime) |
| `event` | The definition of the event, either provided in-line when creating the listener, or extracted from the referenced FFI | [`FFISerializedEvent`](#ffiserializedevent) |
| `signature` | The stringified signature of the event, as computed by the blockchain plugin | `string` |
| `event` | Deprecated: Please use 'event' in the array of 'filters' instead | [`FFISerializedEvent`](#ffiserializedevent) |
| `signature` | A concatenation of all the stringified signature of the event and location, as computed by the blockchain plugin | `string` |
| `topic` | A topic to set on the FireFly event that is emitted each time a blockchain event is detected from the blockchain. Setting this topic on a number of listeners allows applications to easily subscribe to all events they need | `string` |
| `options` | Options that control how the listener subscribes to events from the underlying blockchain | [`ContractListenerOptions`](#contractlisteneroptions) |
| `filters` | A list of filters for the contract listener. Each filter is made up of an Event and an optional Location. Events matching these filters will always be emitted in the order determined by the blockchain. | [`ListenerFilter[]`](#listenerfilter) |

## FFIReference

Expand Down Expand Up @@ -92,3 +93,13 @@ title: ContractListener
| `firstEvent` | A blockchain specific string, such as a block number, to start listening from. The special strings 'oldest' and 'newest' are supported by all blockchain connectors. Default is 'newest' | `string` |


## ListenerFilter

| Field Name | Description | Type |
|------------|-------------|------|
| `event` | The definition of the event, either provided in-line when creating the listener, or extracted from the referenced FFI | [`FFISerializedEvent`](#ffiserializedevent) |
| `location` | A blockchain specific contract identifier. For example an Ethereum contract address, or a Fabric chaincode name and channel | [`JSONAny`](simpletypes.md#jsonany) |
| `interface` | A reference to an existing FFI, containing pre-registered type information for the event | [`FFIReference`](#ffireference) |
| `signature` | The stringified signature of the event and location, as computed by the blockchain plugin | `string` |


1,742 changes: 1,598 additions & 144 deletions doc-site/docs/swagger/swagger.yaml

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions internal/apiserver/route_post_contract_listeners_hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"net/http"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/orchestrator"
"github.com/hyperledger/firefly/pkg/core"
)

/*
*

This API provides the ability to retrieve the signature for the filters of a contract listener

*
*/
var postContractListenerSignature = &ffapi.Route{
Name: "postContractListenerSignature",
Path: "contracts/listeners/signature",
Method: http.MethodPost,
PathParams: nil,
QueryParams: nil,
Description: coremsgs.APIEndpointsPostContractListenerHash,
JSONInputValue: func() interface{} { return &core.ContractListenerInput{} },
JSONOutputValue: func() interface{} { return &core.ContractListenerSignatureOutput{} },
JSONOutputCodes: []int{http.StatusOK},
Extensions: &coreExtensions{
EnabledIf: func(or orchestrator.Orchestrator) bool {
return or.Contracts() != nil
},
CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) {
return cr.or.Contracts().ConstructContractListenerSignature(cr.ctx, r.Input.(*core.ContractListenerInput))
},
},
}
48 changes: 48 additions & 0 deletions internal/apiserver/route_post_contract_listeners_hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"bytes"
"encoding/json"
"net/http/httptest"
"testing"

"github.com/hyperledger/firefly/mocks/contractmocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestNewContractListenerSignature(t *testing.T) {
o, r := newTestAPIServer()
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
mcm := &contractmocks.Manager{}
o.On("Contracts").Return(mcm)
input := core.ContractListenerInput{}
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&input)
req := httptest.NewRequest("POST", "/api/v1/namespaces/mynamespace/contracts/listeners/signature", &buf)
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

mcm.On("ConstructContractListenerSignature", mock.Anything, mock.AnythingOfType("*core.ContractListenerInput")).
Return(&core.ContractListenerSignatureOutput{}, nil, nil)
r.ServeHTTP(res, req)

assert.Equal(t, 200, res.Result().StatusCode)
}
1 change: 1 addition & 0 deletions internal/apiserver/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ var routes = append(
postContractAPIPublish,
postContractAPIQuery,
postContractAPIListeners,
postContractListenerSignature,
postContractInterfaceGenerate,
postContractInterfacePublish,
postContractDeploy,
Expand Down
113 changes: 104 additions & 9 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,26 @@ func (e *Ethereum) QueryContract(ctx context.Context, signingKey string, locatio
return output, nil // note UNLIKE fabric this is just `output`, not `output.Result` - but either way the top level of what we return to the end user, is whatever the Connector sent us
}

func (e *Ethereum) CheckOverlappingLocations(ctx context.Context, left *fftypes.JSONAny, right *fftypes.JSONAny) (bool, error) {
if left == nil || right == nil {
// No location on either side so overlapping
return true, nil
}

parsedLeft, err := e.parseContractLocation(ctx, left)
if err != nil {
return false, err
}

parsedRight, err := e.parseContractLocation(ctx, right)
if err != nil {
return false, err
}

// For Ethereum just compared addresses
return parsedLeft.Address == parsedRight.Address, nil
EnriqueL8 marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *Ethereum) NormalizeContractLocation(ctx context.Context, ntype blockchain.NormalizeType, location *fftypes.JSONAny) (result *fftypes.JSONAny, err error) {
parsed, err := e.parseContractLocation(ctx, location)
if err != nil {
Expand Down Expand Up @@ -875,25 +895,56 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio
}

func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) {
var location *Location
namespace := listener.Namespace
if listener.Location != nil {
location, err = e.parseContractLocation(ctx, listener.Location)
filters := make([]*filter, 0)

if len(listener.Filters) == 0 {
return i18n.NewError(ctx, coremsgs.MsgFiltersEmpty, listener.Name)
}

// For ethconnect we need to use one event and one location as it does not support filters
// Note: the first filter event gets copied to the root of the listener for backwards
// compatibility so available here
// it will be ignored by evmconnect
var firstEventABI *abi.Entry
firstEventABI, err = ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Filters[0].Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
}

// First filter location is copied over to the root
var location *Location
if listener.Filters[0].Location != nil {
location, err = e.parseContractLocation(ctx, listener.Filters[0].Location)
if err != nil {
return err
}
}
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)

for _, f := range listener.Filters {
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &f.Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
}
evmFilter := &filter{
Event: abi,
}
if f.Location != nil {
location, err := e.parseContractLocation(ctx, f.Location)
if err != nil {
return err
}
evmFilter.Address = location.Address
}
filters = append(filters, evmFilter)
}

subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
firstEvent := string(core.SubOptsFirstEventNewest)
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
}
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi, lastProtocolID)
result, err := e.streams.createSubscription(ctx, e.streamID[namespace], subName, firstEvent, location, firstEventABI, filters, lastProtocolID)
if err != nil {
return err
}
Expand Down Expand Up @@ -934,12 +985,56 @@ func (e *Ethereum) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamVa
return &ffi2abi.ParamValidator{}, nil
}

func (e *Ethereum) GenerateEventSignature(ctx context.Context, event *fftypes.FFIEventDefinition) string {
func (e *Ethereum) GenerateEventSignature(ctx context.Context, event *fftypes.FFIEventDefinition) (string, error) {
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, event)
if err != nil {
return "", err
}
signature := ffi2abi.ABIMethodToSignature(abi)
indexedSignature := ABIMethodToIndexedSignature(abi)
if indexedSignature != "" {
signature = fmt.Sprintf("%s %s", signature, indexedSignature)
}
return signature, nil
}

func (e *Ethereum) GenerateEventSignatureWithLocation(ctx context.Context, event *fftypes.FFIEventDefinition, location *fftypes.JSONAny) (string, error) {
eventSignature, err := e.GenerateEventSignature(ctx, event)
if err != nil {
// new error here needed
return "", err
}

// No location set
if location == nil {
return fmt.Sprintf("*:%s", eventSignature), nil
}

parsed, err := e.parseContractLocation(ctx, location)
if err != nil {
return "", err
}

return fmt.Sprintf("%s:%s", parsed.Address, eventSignature), nil
}

func ABIMethodToIndexedSignature(abi *abi.Entry) string {
if len(abi.Inputs) == 0 {
return ""
}
return ffi2abi.ABIMethodToSignature(abi)
positions := []string{}
for i, param := range abi.Inputs {
if param.Indexed {
positions = append(positions, fmt.Sprint(i))
}
}

// No indexed fields
if len(positions) == 0 {
return ""
}

return "[i=" + strings.Join(positions, ",") + "]"
}

func (e *Ethereum) GenerateErrorSignature(ctx context.Context, errorDef *fftypes.FFIErrorDefinition) string {
Expand Down
Loading
Loading