Skip to content

Commit

Permalink
Merge pull request #1418 from kaleido-io/filters
Browse files Browse the repository at this point in the history
Enable contract listeners with multiple filters
  • Loading branch information
peterbroadhurst authored Jul 15, 2024
2 parents 07cc54a + fc18b63 commit a56d9e8
Show file tree
Hide file tree
Showing 31 changed files with 5,169 additions and 639 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
BEGIN;
ALTER TABLE contractlisteners DROP COLUMN filters;
-- no down for the VARCHAR change
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;
-- changing the length of varchar does not affect the index
ALTER TABLE contractlisteners ALTER COLUMN signature TYPE VARCHAR;
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE contractlisteners DROP COLUMN filters;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE contractlisteners ADD COLUMN filters TEXT;
-- in SQLITE VARCHAR is equivalent to TEXT so no migration for signature length
19 changes: 15 additions & 4 deletions doc-site/docs/reference/types/contractlistener.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,17 @@ title: ContractListener
| Field Name | Description | Type |
|------------|-------------|------|
| `id` | The UUID of the smart contract listener | [`UUID`](simpletypes.md#uuid) |
| `interface` | A reference to an existing FFI, containing pre-registered type information for the event | [`FFIReference`](#ffireference) |
| `interface` | Deprecated: Please use 'interface' in the array of 'filters' instead | [`FFIReference`](#ffireference) |
| `namespace` | The namespace of the listener, which defines the namespace of all blockchain events detected by this listener | `string` |
| `name` | A descriptive name for the listener | `string` |
| `backendId` | An ID assigned by the blockchain connector to this listener | `string` |
| `location` | A blockchain specific contract identifier. For example an Ethereum contract address, or a Fabric chaincode name and channel | [`JSONAny`](simpletypes.md#jsonany) |
| `location` | Deprecated: Please use 'location' in the array of 'filters' instead | [`JSONAny`](simpletypes.md#jsonany) |
| `created` | The creation time of the listener | [`FFTime`](simpletypes.md#fftime) |
| `event` | The definition of the event, either provided in-line when creating the listener, or extracted from the referenced FFI | [`FFISerializedEvent`](#ffiserializedevent) |
| `signature` | The stringified signature of the event, as computed by the blockchain plugin | `string` |
| `event` | Deprecated: Please use 'event' in the array of 'filters' instead | [`FFISerializedEvent`](#ffiserializedevent) |
| `signature` | A concatenation of all the stringified signature of the event and location, as computed by the blockchain plugin | `string` |
| `topic` | A topic to set on the FireFly event that is emitted each time a blockchain event is detected from the blockchain. Setting this topic on a number of listeners allows applications to easily subscribe to all events they need | `string` |
| `options` | Options that control how the listener subscribes to events from the underlying blockchain | [`ContractListenerOptions`](#contractlisteneroptions) |
| `filters` | A list of filters for the contract listener. Each filter is made up of an Event and an optional Location. Events matching these filters will always be emitted in the order determined by the blockchain. | [`ListenerFilter[]`](#listenerfilter) |

## FFIReference

Expand Down Expand Up @@ -92,3 +93,13 @@ title: ContractListener
| `firstEvent` | A blockchain specific string, such as a block number, to start listening from. The special strings 'oldest' and 'newest' are supported by all blockchain connectors. Default is 'newest' | `string` |


## ListenerFilter

| Field Name | Description | Type |
|------------|-------------|------|
| `event` | The definition of the event, either provided in-line when creating the listener, or extracted from the referenced FFI | [`FFISerializedEvent`](#ffiserializedevent) |
| `location` | A blockchain specific contract identifier. For example an Ethereum contract address, or a Fabric chaincode name and channel | [`JSONAny`](simpletypes.md#jsonany) |
| `interface` | A reference to an existing FFI, containing pre-registered type information for the event | [`FFIReference`](#ffireference) |
| `signature` | The stringified signature of the event and location, as computed by the blockchain plugin | `string` |


1,742 changes: 1,598 additions & 144 deletions doc-site/docs/swagger/swagger.yaml

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions internal/apiserver/route_post_contract_listeners_hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"net/http"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/orchestrator"
"github.com/hyperledger/firefly/pkg/core"
)

/*
*
This API provides the ability to retrieve the signature for the filters of a contract listener
*
*/
var postContractListenerSignature = &ffapi.Route{
Name: "postContractListenerSignature",
Path: "contracts/listeners/signature",
Method: http.MethodPost,
PathParams: nil,
QueryParams: nil,
Description: coremsgs.APIEndpointsPostContractListenerHash,
JSONInputValue: func() interface{} { return &core.ContractListenerInput{} },
JSONOutputValue: func() interface{} { return &core.ContractListenerSignatureOutput{} },
JSONOutputCodes: []int{http.StatusOK},
Extensions: &coreExtensions{
EnabledIf: func(or orchestrator.Orchestrator) bool {
return or.Contracts() != nil
},
CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) {
return cr.or.Contracts().ConstructContractListenerSignature(cr.ctx, r.Input.(*core.ContractListenerInput))
},
},
}
48 changes: 48 additions & 0 deletions internal/apiserver/route_post_contract_listeners_hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"bytes"
"encoding/json"
"net/http/httptest"
"testing"

"github.com/hyperledger/firefly/mocks/contractmocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestNewContractListenerSignature(t *testing.T) {
o, r := newTestAPIServer()
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
mcm := &contractmocks.Manager{}
o.On("Contracts").Return(mcm)
input := core.ContractListenerInput{}
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&input)
req := httptest.NewRequest("POST", "/api/v1/namespaces/mynamespace/contracts/listeners/signature", &buf)
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

mcm.On("ConstructContractListenerSignature", mock.Anything, mock.AnythingOfType("*core.ContractListenerInput")).
Return(&core.ContractListenerSignatureOutput{}, nil, nil)
r.ServeHTTP(res, req)

assert.Equal(t, 200, res.Result().StatusCode)
}
1 change: 1 addition & 0 deletions internal/apiserver/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ var routes = append(
postContractAPIPublish,
postContractAPIQuery,
postContractAPIListeners,
postContractListenerSignature,
postContractInterfaceGenerate,
postContractInterfacePublish,
postContractDeploy,
Expand Down
113 changes: 104 additions & 9 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,26 @@ func (e *Ethereum) QueryContract(ctx context.Context, signingKey string, locatio
return output, nil // note UNLIKE fabric this is just `output`, not `output.Result` - but either way the top level of what we return to the end user, is whatever the Connector sent us
}

func (e *Ethereum) CheckOverlappingLocations(ctx context.Context, left *fftypes.JSONAny, right *fftypes.JSONAny) (bool, error) {
if left == nil || right == nil {
// No location on either side so overlapping
return true, nil
}

parsedLeft, err := e.parseContractLocation(ctx, left)
if err != nil {
return false, err
}

parsedRight, err := e.parseContractLocation(ctx, right)
if err != nil {
return false, err
}

// For Ethereum just compared addresses
return strings.EqualFold(parsedLeft.Address, parsedRight.Address), nil
}

func (e *Ethereum) NormalizeContractLocation(ctx context.Context, ntype blockchain.NormalizeType, location *fftypes.JSONAny) (result *fftypes.JSONAny, err error) {
parsed, err := e.parseContractLocation(ctx, location)
if err != nil {
Expand Down Expand Up @@ -875,25 +895,56 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio
}

func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) {
var location *Location
namespace := listener.Namespace
if listener.Location != nil {
location, err = e.parseContractLocation(ctx, listener.Location)
filters := make([]*filter, 0)

if len(listener.Filters) == 0 {
return i18n.NewError(ctx, coremsgs.MsgFiltersEmpty, listener.Name)
}

// For ethconnect we need to use one event and one location as it does not support filters
// Note: the first filter event gets copied to the root of the listener for backwards
// compatibility so available here
// it will be ignored by evmconnect
var firstEventABI *abi.Entry
firstEventABI, err = ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Filters[0].Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
}

// First filter location is copied over to the root
var location *Location
if listener.Filters[0].Location != nil {
location, err = e.parseContractLocation(ctx, listener.Filters[0].Location)
if err != nil {
return err
}
}
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &listener.Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)

for _, f := range listener.Filters {
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, &f.Event.FFIEventDefinition)
if err != nil {
return i18n.WrapError(ctx, err, coremsgs.MsgContractParamInvalid)
}
evmFilter := &filter{
Event: abi,
}
if f.Location != nil {
location, err := e.parseContractLocation(ctx, f.Location)
if err != nil {
return err
}
evmFilter.Address = location.Address
}
filters = append(filters, evmFilter)
}

subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
firstEvent := string(core.SubOptsFirstEventNewest)
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
}
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi, lastProtocolID)
result, err := e.streams.createSubscription(ctx, e.streamID[namespace], subName, firstEvent, location, firstEventABI, filters, lastProtocolID)
if err != nil {
return err
}
Expand Down Expand Up @@ -934,12 +985,56 @@ func (e *Ethereum) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamVa
return &ffi2abi.ParamValidator{}, nil
}

func (e *Ethereum) GenerateEventSignature(ctx context.Context, event *fftypes.FFIEventDefinition) string {
func (e *Ethereum) GenerateEventSignature(ctx context.Context, event *fftypes.FFIEventDefinition) (string, error) {
abi, err := ffi2abi.ConvertFFIEventDefinitionToABI(ctx, event)
if err != nil {
return "", err
}
signature := ffi2abi.ABIMethodToSignature(abi)
indexedSignature := ABIMethodToIndexedSignature(abi)
if indexedSignature != "" {
signature = fmt.Sprintf("%s %s", signature, indexedSignature)
}
return signature, nil
}

func (e *Ethereum) GenerateEventSignatureWithLocation(ctx context.Context, event *fftypes.FFIEventDefinition, location *fftypes.JSONAny) (string, error) {
eventSignature, err := e.GenerateEventSignature(ctx, event)
if err != nil {
// new error here needed
return "", err
}

// No location set
if location == nil {
return fmt.Sprintf("*:%s", eventSignature), nil
}

parsed, err := e.parseContractLocation(ctx, location)
if err != nil {
return "", err
}

return fmt.Sprintf("%s:%s", parsed.Address, eventSignature), nil
}

func ABIMethodToIndexedSignature(abi *abi.Entry) string {
if len(abi.Inputs) == 0 {
return ""
}
return ffi2abi.ABIMethodToSignature(abi)
positions := []string{}
for i, param := range abi.Inputs {
if param.Indexed {
positions = append(positions, fmt.Sprint(i))
}
}

// No indexed fields
if len(positions) == 0 {
return ""
}

return "[i=" + strings.Join(positions, ",") + "]"
}

func (e *Ethereum) GenerateErrorSignature(ctx context.Context, errorDef *fftypes.FFIErrorDefinition) string {
Expand Down
Loading

0 comments on commit a56d9e8

Please sign in to comment.