From 2758bee0f45d4c80f85a21848c4ecdb61e3ce513 Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Thu, 12 Sep 2024 10:59:26 +0200 Subject: [PATCH 01/10] pass decode log function pointer for custom decode --- relayer/main/main.go | 2 ++ relayer/message_coordinator.go | 13 +++++++++++-- tests/contracts/lib/teleporter | 2 +- types/types.go | 23 +++++++++++++++-------- 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/relayer/main/main.go b/relayer/main/main.go index 06e4c6e5..f6bf3762 100644 --- a/relayer/main/main.go +++ b/relayer/main/main.go @@ -29,6 +29,7 @@ import ( "github.com/ava-labs/awm-relayer/relayer/config" "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator" sigAggMetrics "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" + relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/ethclient" @@ -255,6 +256,7 @@ func main() { messageHandlerFactories, applicationRelayers, sourceClients, + relayerTypes.DefaultNewWarpMessageInfo, ) // Each Listener goroutine will have an atomic bool that it can set to false to indicate an unrecoverable error diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go index acf7317d..91cf5772 100644 --- a/relayer/message_coordinator.go +++ b/relayer/message_coordinator.go @@ -31,6 +31,7 @@ type MessageCoordinator struct { messageHandlerFactories map[ids.ID]map[common.Address]messages.MessageHandlerFactory applicationRelayers map[common.Hash]*ApplicationRelayer sourceClients map[ids.ID]ethclient.Client + warpMessageDecoder func(log types.Log) (*relayerTypes.WarpMessageInfo, bool, error) } func NewMessageCoordinator( @@ -38,12 +39,14 @@ func NewMessageCoordinator( messageHandlerFactories map[ids.ID]map[common.Address]messages.MessageHandlerFactory, applicationRelayers map[common.Hash]*ApplicationRelayer, sourceClients map[ids.ID]ethclient.Client, + warpMessageDecoder func(log types.Log) (*relayerTypes.WarpMessageInfo, bool, error), ) *MessageCoordinator { return &MessageCoordinator{ logger: logger, messageHandlerFactories: messageHandlerFactories, applicationRelayers: applicationRelayers, sourceClients: sourceClients, + warpMessageDecoder: warpMessageDecoder, } } @@ -229,7 +232,7 @@ func (mc *MessageCoordinator) ProcessBlock( errChan chan error, ) { // Parse the logs in the block, and group by application relayer - block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient) + block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient, mc.warpMessageDecoder) if err != nil { mc.logger.Error("Failed to create Warp block info", zap.Error(err)) errChan <- err @@ -286,5 +289,11 @@ func FetchWarpMessage( return nil, fmt.Errorf("found more than 1 log: %d", len(logs)) } - return relayerTypes.NewWarpMessageInfo(logs[0]) + message, ok, err := relayerTypes.DefaultNewWarpMessageInfo(logs[0]) + if err != nil { + return nil, err + } else if !ok { + return nil, errors.New("failed to decode log") + } + return message, nil } diff --git a/tests/contracts/lib/teleporter b/tests/contracts/lib/teleporter index f23da2da..d0431354 160000 --- a/tests/contracts/lib/teleporter +++ b/tests/contracts/lib/teleporter @@ -1 +1 @@ -Subproject commit f23da2da8fdd5ef4e3bad46358054c4b36dec78b +Subproject commit d04313542b05d87502433d6d72c6f4ca49d648dd diff --git a/types/types.go b/types/types.go index b71a6805..c9acae25 100644 --- a/types/types.go +++ b/types/types.go @@ -38,7 +38,11 @@ type WarpMessageInfo struct { } // Extract Warp logs from the block, if they exist -func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBlockInfo, error) { +func NewWarpBlockInfo( + header *types.Header, + ethClient ethclient.Client, + warpMessageDecoder func(log types.Log) (*WarpMessageInfo, bool, error), +) (*WarpBlockInfo, error) { var ( logs []types.Log err error @@ -61,12 +65,15 @@ func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBl return nil, err } } - messages := make([]*WarpMessageInfo, len(logs)) + messages := make([]*WarpMessageInfo, 0, len(logs)) for i, log := range logs { - warpLog, err := NewWarpMessageInfo(log) + warpLog, ok, err := warpMessageDecoder(log) if err != nil { return nil, err } + if !ok { + continue + } messages[i] = warpLog } @@ -77,22 +84,22 @@ func NewWarpBlockInfo(header *types.Header, ethClient ethclient.Client) (*WarpBl } // Extract the Warp message information from the raw log -func NewWarpMessageInfo(log types.Log) (*WarpMessageInfo, error) { +func DefaultNewWarpMessageInfo(log types.Log) (*WarpMessageInfo, bool, error) { if len(log.Topics) != 3 { - return nil, ErrInvalidLog + return nil, false, nil } if log.Topics[0] != WarpPrecompileLogFilter { - return nil, ErrInvalidLog + return nil, false, nil } unsignedMsg, err := UnpackWarpMessage(log.Data) if err != nil { - return nil, err + return nil, false, err } return &WarpMessageInfo{ SourceAddress: common.BytesToAddress(log.Topics[1][:]), UnsignedMessage: unsignedMsg, - }, nil + }, true, nil } func UnpackWarpMessage(unsignedMsgBytes []byte) (*avalancheWarp.UnsignedMessage, error) { From 69d1e75dea14a370312f2721cecbc510f1e2736f Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Fri, 13 Sep 2024 19:58:24 +0200 Subject: [PATCH 02/10] add importEvent message handling --- abi-bindings/eventimporter/EventImporter.go | 397 ++++++++++++++++++++ abi-bindings/eventimporter/pack.go | 26 ++ messages/chainlink/message_handler.go | 171 +++++++++ 3 files changed, 594 insertions(+) create mode 100644 abi-bindings/eventimporter/EventImporter.go create mode 100644 abi-bindings/eventimporter/pack.go create mode 100644 messages/chainlink/message_handler.go diff --git a/abi-bindings/eventimporter/EventImporter.go b/abi-bindings/eventimporter/EventImporter.go new file mode 100644 index 00000000..7438ef3c --- /dev/null +++ b/abi-bindings/eventimporter/EventImporter.go @@ -0,0 +1,397 @@ +// Code generated - DO NOT EDIT. +// This file is a generated binding and any manual changes will be lost. + +package eventimporter + +import ( + "errors" + "math/big" + "strings" + + "github.com/ava-labs/subnet-evm/accounts/abi" + "github.com/ava-labs/subnet-evm/accounts/abi/bind" + "github.com/ava-labs/subnet-evm/core/types" + "github.com/ava-labs/subnet-evm/interfaces" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" +) + +// Reference imports to suppress errors if they are not otherwise used. +var ( + _ = errors.New + _ = big.NewInt + _ = strings.NewReader + _ = interfaces.NotFound + _ = bind.Bind + _ = common.Big1 + _ = types.BloomLookup + _ = event.NewSubscription + _ = abi.ConvertType +) + +// EventImporterMetaData contains all meta data concerning the EventImporter contract. +var EventImporterMetaData = &bind.MetaData{ + ABI: "[{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"bytes32\",\"name\":\"sourceBlockchainID\",\"type\":\"bytes32\"},{\"indexed\":true,\"internalType\":\"bytes32\",\"name\":\"sourceBlockHash\",\"type\":\"bytes32\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"loggerAddress\",\"type\":\"address\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"txIndex\",\"type\":\"uint256\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"logIndex\",\"type\":\"uint256\"}],\"name\":\"EventImported\",\"type\":\"event\"},{\"inputs\":[{\"internalType\":\"bytes32\",\"name\":\"\",\"type\":\"bytes32\"},{\"internalType\":\"bytes\",\"name\":\"blockHeader\",\"type\":\"bytes\"},{\"internalType\":\"uint256\",\"name\":\"txIndex\",\"type\":\"uint256\"},{\"internalType\":\"bytes[]\",\"name\":\"receiptProof\",\"type\":\"bytes[]\"},{\"internalType\":\"uint256\",\"name\":\"logIndex\",\"type\":\"uint256\"}],\"name\":\"importEvent\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"warpMessenger\",\"outputs\":[{\"internalType\":\"contractIWarpMessenger\",\"name\":\"\",\"type\":\"address\"}],\"stateMutability\":\"view\",\"type\":\"function\"}]", +} + +// EventImporterABI is the input ABI used to generate the binding from. +// Deprecated: Use EventImporterMetaData.ABI instead. +var EventImporterABI = EventImporterMetaData.ABI + +// EventImporter is an auto generated Go binding around an Ethereum contract. +type EventImporter struct { + EventImporterCaller // Read-only binding to the contract + EventImporterTransactor // Write-only binding to the contract + EventImporterFilterer // Log filterer for contract events +} + +// EventImporterCaller is an auto generated read-only Go binding around an Ethereum contract. +type EventImporterCaller struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// EventImporterTransactor is an auto generated write-only Go binding around an Ethereum contract. +type EventImporterTransactor struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// EventImporterFilterer is an auto generated log filtering Go binding around an Ethereum contract events. +type EventImporterFilterer struct { + contract *bind.BoundContract // Generic contract wrapper for the low level calls +} + +// EventImporterSession is an auto generated Go binding around an Ethereum contract, +// with pre-set call and transact options. +type EventImporterSession struct { + Contract *EventImporter // Generic contract binding to set the session for + CallOpts bind.CallOpts // Call options to use throughout this session + TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session +} + +// EventImporterCallerSession is an auto generated read-only Go binding around an Ethereum contract, +// with pre-set call options. +type EventImporterCallerSession struct { + Contract *EventImporterCaller // Generic contract caller binding to set the session for + CallOpts bind.CallOpts // Call options to use throughout this session +} + +// EventImporterTransactorSession is an auto generated write-only Go binding around an Ethereum contract, +// with pre-set transact options. +type EventImporterTransactorSession struct { + Contract *EventImporterTransactor // Generic contract transactor binding to set the session for + TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session +} + +// EventImporterRaw is an auto generated low-level Go binding around an Ethereum contract. +type EventImporterRaw struct { + Contract *EventImporter // Generic contract binding to access the raw methods on +} + +// EventImporterCallerRaw is an auto generated low-level read-only Go binding around an Ethereum contract. +type EventImporterCallerRaw struct { + Contract *EventImporterCaller // Generic read-only contract binding to access the raw methods on +} + +// EventImporterTransactorRaw is an auto generated low-level write-only Go binding around an Ethereum contract. +type EventImporterTransactorRaw struct { + Contract *EventImporterTransactor // Generic write-only contract binding to access the raw methods on +} + +// NewEventImporter creates a new instance of EventImporter, bound to a specific deployed contract. +func NewEventImporter(address common.Address, backend bind.ContractBackend) (*EventImporter, error) { + contract, err := bindEventImporter(address, backend, backend, backend) + if err != nil { + return nil, err + } + return &EventImporter{EventImporterCaller: EventImporterCaller{contract: contract}, EventImporterTransactor: EventImporterTransactor{contract: contract}, EventImporterFilterer: EventImporterFilterer{contract: contract}}, nil +} + +// NewEventImporterCaller creates a new read-only instance of EventImporter, bound to a specific deployed contract. +func NewEventImporterCaller(address common.Address, caller bind.ContractCaller) (*EventImporterCaller, error) { + contract, err := bindEventImporter(address, caller, nil, nil) + if err != nil { + return nil, err + } + return &EventImporterCaller{contract: contract}, nil +} + +// NewEventImporterTransactor creates a new write-only instance of EventImporter, bound to a specific deployed contract. +func NewEventImporterTransactor(address common.Address, transactor bind.ContractTransactor) (*EventImporterTransactor, error) { + contract, err := bindEventImporter(address, nil, transactor, nil) + if err != nil { + return nil, err + } + return &EventImporterTransactor{contract: contract}, nil +} + +// NewEventImporterFilterer creates a new log filterer instance of EventImporter, bound to a specific deployed contract. +func NewEventImporterFilterer(address common.Address, filterer bind.ContractFilterer) (*EventImporterFilterer, error) { + contract, err := bindEventImporter(address, nil, nil, filterer) + if err != nil { + return nil, err + } + return &EventImporterFilterer{contract: contract}, nil +} + +// bindEventImporter binds a generic wrapper to an already deployed contract. +func bindEventImporter(address common.Address, caller bind.ContractCaller, transactor bind.ContractTransactor, filterer bind.ContractFilterer) (*bind.BoundContract, error) { + parsed, err := EventImporterMetaData.GetAbi() + if err != nil { + return nil, err + } + return bind.NewBoundContract(address, *parsed, caller, transactor, filterer), nil +} + +// Call invokes the (constant) contract method with params as input values and +// sets the output to result. The result type might be a single field for simple +// returns, a slice of interfaces for anonymous returns and a struct for named +// returns. +func (_EventImporter *EventImporterRaw) Call(opts *bind.CallOpts, result *[]interface{}, method string, params ...interface{}) error { + return _EventImporter.Contract.EventImporterCaller.contract.Call(opts, result, method, params...) +} + +// Transfer initiates a plain transaction to move funds to the contract, calling +// its default method if one is available. +func (_EventImporter *EventImporterRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { + return _EventImporter.Contract.EventImporterTransactor.contract.Transfer(opts) +} + +// Transact invokes the (paid) contract method with params as input values. +func (_EventImporter *EventImporterRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { + return _EventImporter.Contract.EventImporterTransactor.contract.Transact(opts, method, params...) +} + +// Call invokes the (constant) contract method with params as input values and +// sets the output to result. The result type might be a single field for simple +// returns, a slice of interfaces for anonymous returns and a struct for named +// returns. +func (_EventImporter *EventImporterCallerRaw) Call(opts *bind.CallOpts, result *[]interface{}, method string, params ...interface{}) error { + return _EventImporter.Contract.contract.Call(opts, result, method, params...) +} + +// Transfer initiates a plain transaction to move funds to the contract, calling +// its default method if one is available. +func (_EventImporter *EventImporterTransactorRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { + return _EventImporter.Contract.contract.Transfer(opts) +} + +// Transact invokes the (paid) contract method with params as input values. +func (_EventImporter *EventImporterTransactorRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { + return _EventImporter.Contract.contract.Transact(opts, method, params...) +} + +// WarpMessenger is a free data retrieval call binding the contract method 0xc9572e14. +// +// Solidity: function warpMessenger() view returns(address) +func (_EventImporter *EventImporterCaller) WarpMessenger(opts *bind.CallOpts) (common.Address, error) { + var out []interface{} + err := _EventImporter.contract.Call(opts, &out, "warpMessenger") + + if err != nil { + return *new(common.Address), err + } + + out0 := *abi.ConvertType(out[0], new(common.Address)).(*common.Address) + + return out0, err + +} + +// WarpMessenger is a free data retrieval call binding the contract method 0xc9572e14. +// +// Solidity: function warpMessenger() view returns(address) +func (_EventImporter *EventImporterSession) WarpMessenger() (common.Address, error) { + return _EventImporter.Contract.WarpMessenger(&_EventImporter.CallOpts) +} + +// WarpMessenger is a free data retrieval call binding the contract method 0xc9572e14. +// +// Solidity: function warpMessenger() view returns(address) +func (_EventImporter *EventImporterCallerSession) WarpMessenger() (common.Address, error) { + return _EventImporter.Contract.WarpMessenger(&_EventImporter.CallOpts) +} + +// ImportEvent is a paid mutator transaction binding the contract method 0x0a8bfac9. +// +// Solidity: function importEvent(bytes32 , bytes blockHeader, uint256 txIndex, bytes[] receiptProof, uint256 logIndex) returns() +func (_EventImporter *EventImporterTransactor) ImportEvent(opts *bind.TransactOpts, arg0 [32]byte, blockHeader []byte, txIndex *big.Int, receiptProof [][]byte, logIndex *big.Int) (*types.Transaction, error) { + return _EventImporter.contract.Transact(opts, "importEvent", arg0, blockHeader, txIndex, receiptProof, logIndex) +} + +// ImportEvent is a paid mutator transaction binding the contract method 0x0a8bfac9. +// +// Solidity: function importEvent(bytes32 , bytes blockHeader, uint256 txIndex, bytes[] receiptProof, uint256 logIndex) returns() +func (_EventImporter *EventImporterSession) ImportEvent(arg0 [32]byte, blockHeader []byte, txIndex *big.Int, receiptProof [][]byte, logIndex *big.Int) (*types.Transaction, error) { + return _EventImporter.Contract.ImportEvent(&_EventImporter.TransactOpts, arg0, blockHeader, txIndex, receiptProof, logIndex) +} + +// ImportEvent is a paid mutator transaction binding the contract method 0x0a8bfac9. +// +// Solidity: function importEvent(bytes32 , bytes blockHeader, uint256 txIndex, bytes[] receiptProof, uint256 logIndex) returns() +func (_EventImporter *EventImporterTransactorSession) ImportEvent(arg0 [32]byte, blockHeader []byte, txIndex *big.Int, receiptProof [][]byte, logIndex *big.Int) (*types.Transaction, error) { + return _EventImporter.Contract.ImportEvent(&_EventImporter.TransactOpts, arg0, blockHeader, txIndex, receiptProof, logIndex) +} + +// EventImporterEventImportedIterator is returned from FilterEventImported and is used to iterate over the raw logs and unpacked data for EventImported events raised by the EventImporter contract. +type EventImporterEventImportedIterator struct { + Event *EventImporterEventImported // Event containing the contract specifics and raw log + + contract *bind.BoundContract // Generic contract to use for unpacking event data + event string // Event name to use for unpacking event data + + logs chan types.Log // Log channel receiving the found contract events + sub interfaces.Subscription // Subscription for errors, completion and termination + done bool // Whether the subscription completed delivering logs + fail error // Occurred error to stop iteration +} + +// Next advances the iterator to the subsequent event, returning whether there +// are any more events found. In case of a retrieval or parsing error, false is +// returned and Error() can be queried for the exact failure. +func (it *EventImporterEventImportedIterator) Next() bool { + // If the iterator failed, stop iterating + if it.fail != nil { + return false + } + // If the iterator completed, deliver directly whatever's available + if it.done { + select { + case log := <-it.logs: + it.Event = new(EventImporterEventImported) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + default: + return false + } + } + // Iterator still in progress, wait for either a data or an error event + select { + case log := <-it.logs: + it.Event = new(EventImporterEventImported) + if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { + it.fail = err + return false + } + it.Event.Raw = log + return true + + case err := <-it.sub.Err(): + it.done = true + it.fail = err + return it.Next() + } +} + +// Error returns any retrieval or parsing error occurred during filtering. +func (it *EventImporterEventImportedIterator) Error() error { + return it.fail +} + +// Close terminates the iteration process, releasing any pending underlying +// resources. +func (it *EventImporterEventImportedIterator) Close() error { + it.sub.Unsubscribe() + return nil +} + +// EventImporterEventImported represents a EventImported event raised by the EventImporter contract. +type EventImporterEventImported struct { + SourceBlockchainID [32]byte + SourceBlockHash [32]byte + LoggerAddress common.Address + TxIndex *big.Int + LogIndex *big.Int + Raw types.Log // Blockchain specific contextual infos +} + +// FilterEventImported is a free log retrieval operation binding the contract event 0xfdb2f8239033f2b8c1122b2a4b6af55bb0b0b05e4050b5ecd9eafa153d3cd41d. +// +// Solidity: event EventImported(bytes32 indexed sourceBlockchainID, bytes32 indexed sourceBlockHash, address indexed loggerAddress, uint256 txIndex, uint256 logIndex) +func (_EventImporter *EventImporterFilterer) FilterEventImported(opts *bind.FilterOpts, sourceBlockchainID [][32]byte, sourceBlockHash [][32]byte, loggerAddress []common.Address) (*EventImporterEventImportedIterator, error) { + + var sourceBlockchainIDRule []interface{} + for _, sourceBlockchainIDItem := range sourceBlockchainID { + sourceBlockchainIDRule = append(sourceBlockchainIDRule, sourceBlockchainIDItem) + } + var sourceBlockHashRule []interface{} + for _, sourceBlockHashItem := range sourceBlockHash { + sourceBlockHashRule = append(sourceBlockHashRule, sourceBlockHashItem) + } + var loggerAddressRule []interface{} + for _, loggerAddressItem := range loggerAddress { + loggerAddressRule = append(loggerAddressRule, loggerAddressItem) + } + + logs, sub, err := _EventImporter.contract.FilterLogs(opts, "EventImported", sourceBlockchainIDRule, sourceBlockHashRule, loggerAddressRule) + if err != nil { + return nil, err + } + return &EventImporterEventImportedIterator{contract: _EventImporter.contract, event: "EventImported", logs: logs, sub: sub}, nil +} + +// WatchEventImported is a free log subscription operation binding the contract event 0xfdb2f8239033f2b8c1122b2a4b6af55bb0b0b05e4050b5ecd9eafa153d3cd41d. +// +// Solidity: event EventImported(bytes32 indexed sourceBlockchainID, bytes32 indexed sourceBlockHash, address indexed loggerAddress, uint256 txIndex, uint256 logIndex) +func (_EventImporter *EventImporterFilterer) WatchEventImported(opts *bind.WatchOpts, sink chan<- *EventImporterEventImported, sourceBlockchainID [][32]byte, sourceBlockHash [][32]byte, loggerAddress []common.Address) (event.Subscription, error) { + + var sourceBlockchainIDRule []interface{} + for _, sourceBlockchainIDItem := range sourceBlockchainID { + sourceBlockchainIDRule = append(sourceBlockchainIDRule, sourceBlockchainIDItem) + } + var sourceBlockHashRule []interface{} + for _, sourceBlockHashItem := range sourceBlockHash { + sourceBlockHashRule = append(sourceBlockHashRule, sourceBlockHashItem) + } + var loggerAddressRule []interface{} + for _, loggerAddressItem := range loggerAddress { + loggerAddressRule = append(loggerAddressRule, loggerAddressItem) + } + + logs, sub, err := _EventImporter.contract.WatchLogs(opts, "EventImported", sourceBlockchainIDRule, sourceBlockHashRule, loggerAddressRule) + if err != nil { + return nil, err + } + return event.NewSubscription(func(quit <-chan struct{}) error { + defer sub.Unsubscribe() + for { + select { + case log := <-logs: + // New log arrived, parse the event and forward to the user + event := new(EventImporterEventImported) + if err := _EventImporter.contract.UnpackLog(event, "EventImported", log); err != nil { + return err + } + event.Raw = log + + select { + case sink <- event: + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + case err := <-sub.Err(): + return err + case <-quit: + return nil + } + } + }), nil +} + +// ParseEventImported is a log parse operation binding the contract event 0xfdb2f8239033f2b8c1122b2a4b6af55bb0b0b05e4050b5ecd9eafa153d3cd41d. +// +// Solidity: event EventImported(bytes32 indexed sourceBlockchainID, bytes32 indexed sourceBlockHash, address indexed loggerAddress, uint256 txIndex, uint256 logIndex) +func (_EventImporter *EventImporterFilterer) ParseEventImported(log types.Log) (*EventImporterEventImported, error) { + event := new(EventImporterEventImported) + if err := _EventImporter.contract.UnpackLog(event, "EventImported", log); err != nil { + return nil, err + } + event.Raw = log + return event, nil +} diff --git a/abi-bindings/eventimporter/pack.go b/abi-bindings/eventimporter/pack.go new file mode 100644 index 00000000..33631cb1 --- /dev/null +++ b/abi-bindings/eventimporter/pack.go @@ -0,0 +1,26 @@ +// (c) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package eventimporter + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" +) + +// PackImportEvent packs input to form a call to the importEvent function +func PackImportEvent( + blockHeader []byte, + txIndex *big.Int, + receiptProof [][]byte, + logIndex *big.Int, +) ([]byte, error) { + abi, err := EventImporterMetaData.GetAbi() + if err != nil { + return nil, errors.Wrap(err, "failed to get abi") + } + + return abi.Pack("importEvent", common.Hash{}, blockHeader, txIndex, receiptProof, logIndex) +} diff --git a/messages/chainlink/message_handler.go b/messages/chainlink/message_handler.go new file mode 100644 index 00000000..1693a3c5 --- /dev/null +++ b/messages/chainlink/message_handler.go @@ -0,0 +1,171 @@ +package chainlink + +import ( + "context" + "fmt" + "math/big" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/awm-relayer/abi-bindings/eventimporter" + "github.com/ava-labs/awm-relayer/messages" + "github.com/ava-labs/awm-relayer/relayer/config" + "github.com/ava-labs/awm-relayer/utils" + "github.com/ava-labs/awm-relayer/vms" + "github.com/ava-labs/coreth/core/types" + "github.com/ava-labs/coreth/ethclient" + + "github.com/ethereum/go-ethereum/common" + "go.uber.org/zap" +) + +type factory struct { + logger logging.Logger + aggregator common.Address +} + +type ChainlinkMessageHandler struct { + unsignedMessage *warp.UnsignedMessage + logger logging.Logger + aggregator common.Address +} + +func NewMessageHandlerFactory( + logger logging.Logger, + messageProtocolConfig config.MessageProtocolConfig, +) (messages.MessageHandlerFactory, error) { + return &factory{ + logger: logger, + aggregator: common.Address{}, + }, nil +} + +func (f *factory) NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (messages.MessageHandler, error) { + return &ChainlinkMessageHandler{ + logger: f.logger, + unsignedMessage: unsignedMessage, + aggregator: f.aggregator, + }, nil +} + +func CalculateImportEventGasLimit() (uint64, error) { + return 0, nil +} + +func (c *ChainlinkMessageHandler) ShouldSendMessage(destinationClient vms.DestinationClient) (bool, error) { + return true, nil +} + +func (c *ChainlinkMessageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) { + destinationBlockchainID := destinationClient.DestinationBlockchainID() + + c.logger.Info( + "Sending message to destination chain", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + ) + + gasLimit, err := CalculateImportEventGasLimit() + if err != nil { + c.logger.Error( + "Failed to calculate gas limit for receiveCrossChainMessage call", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + ) + return common.Hash{}, err + } + blockHeader := []byte{} + txIndex := big.NewInt(0) + receiptProof := [][]byte{} + logIndex := big.NewInt(0) + callData, err := eventimporter.PackImportEvent(blockHeader, txIndex, receiptProof, logIndex) + if err != nil { + c.logger.Error( + "Failed packing importEvent call data", + // zap.String("destinationBlockchainID", destinationBlockchainID.String()), + // zap.String("warpMessageID", signedMessage.ID().String()), + ) + return common.Hash{}, err + } + + txHash, err := destinationClient.SendTx( + signedMessage, + c.aggregator.Hex(), + gasLimit, + callData, + ) + if err != nil { + c.logger.Error( + "Failed to send tx.", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.Error(err), + ) + return common.Hash{}, err + } + + // Wait for the message to be included in a block before returning + err = c.waitForReceipt(signedMessage, destinationClient, txHash) + if err != nil { + return common.Hash{}, err + } + + c.logger.Info( + "Delivered message to destination chain", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("txHash", txHash.String()), + ) + return txHash, nil +} + +func (c *ChainlinkMessageHandler) waitForReceipt( + signedMessage *warp.Message, + destinationClient vms.DestinationClient, + txHash common.Hash, +) error { + destinationBlockchainID := destinationClient.DestinationBlockchainID() + callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer callCtxCancel() + receipt, err := utils.CallWithRetry[*types.Receipt]( + callCtx, + func() (*types.Receipt, error) { + return destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash) + }, + ) + if err != nil { + c.logger.Error( + "Failed to get transaction receipt", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.Error(err), + ) + return err + } + if receipt.Status != types.ReceiptStatusSuccessful { + c.logger.Error( + "Transaction failed", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("txHash", txHash.String()), + ) + return fmt.Errorf("transaction failed with status: %d", receipt.Status) + } + return nil +} + +func (c *ChainlinkMessageHandler) GetMessageRoutingInfo() ( + ids.ID, + common.Address, + ids.ID, + common.Address, + error, +) { + return ids.Empty, common.Address{}, ids.Empty, common.Address{}, nil +} + +func (c *ChainlinkMessageHandler) GetUnsignedMessage() *warp.UnsignedMessage { + return c.unsignedMessage +} From fcd11dd9b9c073e70be5cbc8c4f5b301a4536f89 Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Mon, 16 Sep 2024 11:05:43 +0200 Subject: [PATCH 03/10] wait for receipt --- messages/chainlink/message_handler.go | 48 +++----------------------- messages/message_handler.go | 48 ++++++++++++++++++++++++++ messages/teleporter/message_handler.go | 42 +--------------------- 3 files changed, 53 insertions(+), 85 deletions(-) diff --git a/messages/chainlink/message_handler.go b/messages/chainlink/message_handler.go index 1693a3c5..a040a1fa 100644 --- a/messages/chainlink/message_handler.go +++ b/messages/chainlink/message_handler.go @@ -1,10 +1,7 @@ package chainlink import ( - "context" - "fmt" "math/big" - "time" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" @@ -12,10 +9,7 @@ import ( "github.com/ava-labs/awm-relayer/abi-bindings/eventimporter" "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/relayer/config" - "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" - "github.com/ava-labs/coreth/core/types" - "github.com/ava-labs/coreth/ethclient" "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" @@ -84,8 +78,8 @@ func (c *ChainlinkMessageHandler) SendMessage(signedMessage *warp.Message, desti if err != nil { c.logger.Error( "Failed packing importEvent call data", - // zap.String("destinationBlockchainID", destinationBlockchainID.String()), - // zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), ) return common.Hash{}, err } @@ -106,8 +100,9 @@ func (c *ChainlinkMessageHandler) SendMessage(signedMessage *warp.Message, desti return common.Hash{}, err } + teleporterMessageID := ids.Empty // Wait for the message to be included in a block before returning - err = c.waitForReceipt(signedMessage, destinationClient, txHash) + err = messages.WaitForReceipt(c.logger, signedMessage, destinationClient, txHash, teleporterMessageID) if err != nil { return common.Hash{}, err } @@ -121,41 +116,6 @@ func (c *ChainlinkMessageHandler) SendMessage(signedMessage *warp.Message, desti return txHash, nil } -func (c *ChainlinkMessageHandler) waitForReceipt( - signedMessage *warp.Message, - destinationClient vms.DestinationClient, - txHash common.Hash, -) error { - destinationBlockchainID := destinationClient.DestinationBlockchainID() - callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second) - defer callCtxCancel() - receipt, err := utils.CallWithRetry[*types.Receipt]( - callCtx, - func() (*types.Receipt, error) { - return destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash) - }, - ) - if err != nil { - c.logger.Error( - "Failed to get transaction receipt", - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.String("warpMessageID", signedMessage.ID().String()), - zap.Error(err), - ) - return err - } - if receipt.Status != types.ReceiptStatusSuccessful { - c.logger.Error( - "Transaction failed", - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.String("warpMessageID", signedMessage.ID().String()), - zap.String("txHash", txHash.String()), - ) - return fmt.Errorf("transaction failed with status: %d", receipt.Status) - } - return nil -} - func (c *ChainlinkMessageHandler) GetMessageRoutingInfo() ( ids.ID, common.Address, diff --git a/messages/message_handler.go b/messages/message_handler.go index 8a7419bc..0a1ff6a6 100644 --- a/messages/message_handler.go +++ b/messages/message_handler.go @@ -6,10 +6,19 @@ package messages import ( + "context" + "fmt" + "time" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" + "github.com/ava-labs/coreth/core/types" + "github.com/ava-labs/coreth/ethclient" "github.com/ethereum/go-ethereum/common" + "go.uber.org/zap" ) // MessageManager is specific to each message protocol. The interface handles choosing which messages to send @@ -43,3 +52,42 @@ type MessageHandler interface { // GetUnsignedMessage returns the unsigned message GetUnsignedMessage() *warp.UnsignedMessage } + +func WaitForReceipt( + logger logging.Logger, + signedMessage *warp.Message, + destinationClient vms.DestinationClient, + txHash common.Hash, + teleporterMessageID ids.ID, +) error { + destinationBlockchainID := destinationClient.DestinationBlockchainID() + callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer callCtxCancel() + receipt, err := utils.CallWithRetry[*types.Receipt]( + callCtx, + func() (*types.Receipt, error) { + return destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash) + }, + ) + if err != nil { + logger.Error( + "Failed to get transaction receipt", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessageID.String()), + zap.Error(err), + ) + return err + } + if receipt.Status != types.ReceiptStatusSuccessful { + logger.Error( + "Transaction failed", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessageID.String()), + zap.String("txHash", txHash.String()), + ) + return fmt.Errorf("transaction failed with status: %d", receipt.Status) + } + return nil +} diff --git a/messages/teleporter/message_handler.go b/messages/teleporter/message_handler.go index 0335d6f6..4cbd6041 100644 --- a/messages/teleporter/message_handler.go +++ b/messages/teleporter/message_handler.go @@ -16,10 +16,8 @@ import ( "github.com/ava-labs/awm-relayer/messages" pbDecider "github.com/ava-labs/awm-relayer/proto/pb/decider" "github.com/ava-labs/awm-relayer/relayer/config" - "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/accounts/abi/bind" - "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/ethclient" teleportermessenger "github.com/ava-labs/teleporter/abi-bindings/go/teleporter/TeleporterMessenger" gasUtils "github.com/ava-labs/teleporter/utils/gas-utils" @@ -340,7 +338,7 @@ func (m *messageHandler) SendMessage( } // Wait for the message to be included in a block before returning - err = m.waitForReceipt(signedMessage, destinationClient, txHash, teleporterMessageID) + err = messages.WaitForReceipt(m.logger, signedMessage, destinationClient, txHash, teleporterMessageID) if err != nil { return common.Hash{}, err } @@ -355,44 +353,6 @@ func (m *messageHandler) SendMessage( return txHash, nil } -func (m *messageHandler) waitForReceipt( - signedMessage *warp.Message, - destinationClient vms.DestinationClient, - txHash common.Hash, - teleporterMessageID ids.ID, -) error { - destinationBlockchainID := destinationClient.DestinationBlockchainID() - callCtx, callCtxCancel := context.WithTimeout(context.Background(), 30*time.Second) - defer callCtxCancel() - receipt, err := utils.CallWithRetry[*types.Receipt]( - callCtx, - func() (*types.Receipt, error) { - return destinationClient.Client().(ethclient.Client).TransactionReceipt(callCtx, txHash) - }, - ) - if err != nil { - m.logger.Error( - "Failed to get transaction receipt", - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.String("warpMessageID", signedMessage.ID().String()), - zap.String("teleporterMessageID", teleporterMessageID.String()), - zap.Error(err), - ) - return err - } - if receipt.Status != types.ReceiptStatusSuccessful { - m.logger.Error( - "Transaction failed", - zap.String("destinationBlockchainID", destinationBlockchainID.String()), - zap.String("warpMessageID", signedMessage.ID().String()), - zap.String("teleporterMessageID", teleporterMessageID.String()), - zap.String("txHash", txHash.String()), - ) - return fmt.Errorf("transaction failed with status: %d", receipt.Status) - } - return nil -} - // parseTeleporterMessage returns the Warp message's corresponding Teleporter message from the cache if it exists. // Otherwise parses the Warp message payload. func (f *factory) parseTeleporterMessage( From 0ab32a4ca57f16bb47009eea4877339c2f5e9b84 Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Mon, 16 Sep 2024 14:28:03 +0200 Subject: [PATCH 04/10] implement default message decoder --- messages/chainlink/message_handler.go | 5 +- messages/message_handler.go | 94 +++++++++++++++++++++++++-- relayer/api/relay_message.go | 3 +- relayer/config/types.go | 1 + relayer/main/main.go | 14 +++- relayer/message_coordinator.go | 38 ++++++----- signature-aggregator/api/api.go | 4 +- types/types.go | 84 ------------------------ 8 files changed, 133 insertions(+), 110 deletions(-) diff --git a/messages/chainlink/message_handler.go b/messages/chainlink/message_handler.go index a040a1fa..c7a0b4fd 100644 --- a/messages/chainlink/message_handler.go +++ b/messages/chainlink/message_handler.go @@ -52,7 +52,10 @@ func (c *ChainlinkMessageHandler) ShouldSendMessage(destinationClient vms.Destin return true, nil } -func (c *ChainlinkMessageHandler) SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) { +func (c *ChainlinkMessageHandler) SendMessage( + signedMessage *warp.Message, + destinationClient vms.DestinationClient, +) (common.Hash, error) { destinationBlockchainID := destinationClient.DestinationBlockchainID() c.logger.Info( diff --git a/messages/message_handler.go b/messages/message_handler.go index 0a1ff6a6..55f7f8b4 100644 --- a/messages/message_handler.go +++ b/messages/message_handler.go @@ -7,16 +7,22 @@ package messages import ( "context" + "errors" "fmt" "time" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/vms/platformvm/warp" + avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/ethclient" + subnetTypes "github.com/ava-labs/subnet-evm/core/types" + subnetEthclient "github.com/ava-labs/subnet-evm/ethclient" + "github.com/ava-labs/subnet-evm/interfaces" + "github.com/ava-labs/subnet-evm/precompile/contracts/warp" "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" ) @@ -25,7 +31,7 @@ import ( // for each message protocol, and performs the sending to the destination chain. type MessageHandlerFactory interface { // Create a message handler to relay the Warp message - NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (MessageHandler, error) + NewMessageHandler(unsignedMessage *avalancheWarp.UnsignedMessage) (MessageHandler, error) } // MessageHandlers relay a single Warp message. A new instance should be created for each Warp message. @@ -37,7 +43,7 @@ type MessageHandler interface { // SendMessage sends the signed message to the destination chain. The payload parsed according to // the VM rules is also passed in, since MessageManager does not assume any particular VM // returns the transaction hash if the transaction is successful. - SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) + SendMessage(signedMessage *avalancheWarp.Message, destinationClient vms.DestinationClient) (common.Hash, error) // GetMessageRoutingInfo returns the source chain ID, origin sender address, // destination chain ID, and destination address. @@ -50,12 +56,90 @@ type MessageHandler interface { ) // GetUnsignedMessage returns the unsigned message - GetUnsignedMessage() *warp.UnsignedMessage + GetUnsignedMessage() *avalancheWarp.UnsignedMessage +} + +type MessageDecoder interface { + Decode(header *subnetTypes.Header, ethClient subnetEthclient.Client) ([]*relayerTypes.WarpMessageInfo, error) +} + +type WarpMessageDecoder struct{} + +// Extract Warp logs from the block, if they exist +func (w *WarpMessageDecoder) Decode( + header *subnetTypes.Header, + ethClient subnetEthclient.Client, +) ([]*relayerTypes.WarpMessageInfo, error) { + var ( + logs []subnetTypes.Log + err error + ) + // Check if the block contains warp logs, and fetch them from the client if it does + if header.Bloom.Test(relayerTypes.WarpPrecompileLogFilter[:]) { + cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCRetryTimeout) + defer cancel() + logs, err = utils.CallWithRetry[[]subnetTypes.Log]( + cctx, + func() ([]subnetTypes.Log, error) { + return ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{ + Topics: [][]common.Hash{{relayerTypes.WarpPrecompileLogFilter}}, + Addresses: []common.Address{warp.ContractAddress}, + FromBlock: header.Number, + ToBlock: header.Number, + }) + }) + if err != nil { + return nil, err + } + } + messages := make([]*relayerTypes.WarpMessageInfo, len(logs)) + for i, log := range logs { + warpLog, err := NewWarpMessageInfo(log) + if err != nil { + return nil, err + } + messages[i] = warpLog + } + + return messages, nil +} + +// Extract the Warp message information from the raw log +func NewWarpMessageInfo(log subnetTypes.Log) (*relayerTypes.WarpMessageInfo, error) { + if len(log.Topics) != 3 { + return nil, relayerTypes.ErrInvalidLog + } + if log.Topics[0] != relayerTypes.WarpPrecompileLogFilter { + return nil, relayerTypes.ErrInvalidLog + } + unsignedMsg, err := UnpackWarpMessage(log.Data) + if err != nil { + return nil, err + } + + return &relayerTypes.WarpMessageInfo{ + SourceAddress: common.BytesToAddress(log.Topics[1][:]), + UnsignedMessage: unsignedMsg, + }, nil +} + +func UnpackWarpMessage(unsignedMsgBytes []byte) (*avalancheWarp.UnsignedMessage, error) { + unsignedMsg, err := warp.UnpackSendWarpEventDataToMessage(unsignedMsgBytes) + if err != nil { + // If we failed to parse the message as a log, attempt to parse it as a standalone message + var standaloneErr error + unsignedMsg, standaloneErr = avalancheWarp.ParseUnsignedMessage(unsignedMsgBytes) + if standaloneErr != nil { + err = errors.Join(err, standaloneErr) + return nil, err + } + } + return unsignedMsg, nil } func WaitForReceipt( logger logging.Logger, - signedMessage *warp.Message, + signedMessage *avalancheWarp.Message, destinationClient vms.DestinationClient, txHash common.Hash, teleporterMessageID ids.ID, diff --git a/relayer/api/relay_message.go b/relayer/api/relay_message.go index 8b0d479a..bb6d7f19 100644 --- a/relayer/api/relay_message.go +++ b/relayer/api/relay_message.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/relayer" "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" @@ -56,7 +57,7 @@ func relayMessageAPIHandler(logger logging.Logger, messageCoordinator *relayer.M return } - unsignedMessage, err := types.UnpackWarpMessage(req.UnsignedMessageBytes) + unsignedMessage, err := messages.UnpackWarpMessage(req.UnsignedMessageBytes) if err != nil { logger.Warn("Error unpacking warp message", zap.Error(err)) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/relayer/config/types.go b/relayer/config/types.go index 394efa2d..e1238063 100644 --- a/relayer/config/types.go +++ b/relayer/config/types.go @@ -37,6 +37,7 @@ const ( UNKNOWN_MESSAGE_PROTOCOL MessageProtocol = iota TELEPORTER OFF_CHAIN_REGISTRY + CHAINLINK_PRICE_FEED ) func (msg MessageProtocol) String() string { diff --git a/relayer/main/main.go b/relayer/main/main.go index f6bf3762..3ed2a199 100644 --- a/relayer/main/main.go +++ b/relayer/main/main.go @@ -29,7 +29,6 @@ import ( "github.com/ava-labs/awm-relayer/relayer/config" "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator" sigAggMetrics "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" - relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/ethclient" @@ -251,12 +250,13 @@ func main() { logger.Fatal("Failed to create application relayers", zap.Error(err)) panic(err) } + messagesDecoders := createMessageDecoders(&cfg) messageCoordinator := relayer.NewMessageCoordinator( logger, messageHandlerFactories, applicationRelayers, sourceClients, - relayerTypes.DefaultNewWarpMessageInfo, + messagesDecoders, ) // Each Listener goroutine will have an atomic bool that it can set to false to indicate an unrecoverable error @@ -322,6 +322,12 @@ func createMessageHandlerFactories( logger, cfg, ) + case config.CHAINLINK_PRICE_FEED: + // TODO + m, err = offchainregistry.NewMessageHandlerFactory( + logger, + cfg, + ) default: m, err = nil, fmt.Errorf("invalid message format %s", format) } @@ -546,6 +552,10 @@ func createHealthTrackers(cfg *config.Config) map[ids.ID]*atomic.Bool { return healthTrackers } +func createMessageDecoders(cfg *config.Config) []messages.MessageDecoder { + return nil +} + func startMetricsServer(logger logging.Logger, gatherer prometheus.Gatherer, port uint16) { http.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})) diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go index 91cf5772..54534080 100644 --- a/relayer/message_coordinator.go +++ b/relayer/message_coordinator.go @@ -31,7 +31,7 @@ type MessageCoordinator struct { messageHandlerFactories map[ids.ID]map[common.Address]messages.MessageHandlerFactory applicationRelayers map[common.Hash]*ApplicationRelayer sourceClients map[ids.ID]ethclient.Client - warpMessageDecoder func(log types.Log) (*relayerTypes.WarpMessageInfo, bool, error) + messageDecoders []messages.MessageDecoder } func NewMessageCoordinator( @@ -39,14 +39,14 @@ func NewMessageCoordinator( messageHandlerFactories map[ids.ID]map[common.Address]messages.MessageHandlerFactory, applicationRelayers map[common.Hash]*ApplicationRelayer, sourceClients map[ids.ID]ethclient.Client, - warpMessageDecoder func(log types.Log) (*relayerTypes.WarpMessageInfo, bool, error), + messageDecoders []messages.MessageDecoder, ) *MessageCoordinator { return &MessageCoordinator{ logger: logger, messageHandlerFactories: messageHandlerFactories, applicationRelayers: applicationRelayers, sourceClients: sourceClients, - warpMessageDecoder: warpMessageDecoder, + messageDecoders: messageDecoders, } } @@ -232,11 +232,25 @@ func (mc *MessageCoordinator) ProcessBlock( errChan chan error, ) { // Parse the logs in the block, and group by application relayer - block, err := relayerTypes.NewWarpBlockInfo(blockHeader, ethClient, mc.warpMessageDecoder) - if err != nil { - mc.logger.Error("Failed to create Warp block info", zap.Error(err)) - errChan <- err - return + number := blockHeader.Number.Uint64() + msgs := make([]*relayerTypes.WarpMessageInfo, 0) + for _, msgDecoder := range mc.messageDecoders { + message, err := msgDecoder.Decode(blockHeader, ethClient) + if err != nil { + mc.logger.Error( + "Failed to create Warp block info", + zap.Uint64("blockNumber", number), + zap.String("blockchainID", blockchainID.String()), + zap.Error(err), + ) + errChan <- err + return + } + msgs = append(msgs, message...) + } + block := &relayerTypes.WarpBlockInfo{ + BlockNumber: number, + Messages: msgs, } // Register each message in the block with the appropriate application relayer @@ -289,11 +303,5 @@ func FetchWarpMessage( return nil, fmt.Errorf("found more than 1 log: %d", len(logs)) } - message, ok, err := relayerTypes.DefaultNewWarpMessageInfo(logs[0]) - if err != nil { - return nil, err - } else if !ok { - return nil, errors.New("failed to decode log") - } - return message, nil + return messages.NewWarpMessageInfo(logs[0]) } diff --git a/signature-aggregator/api/api.go b/signature-aggregator/api/api.go index f1a324c1..4480f055 100644 --- a/signature-aggregator/api/api.go +++ b/signature-aggregator/api/api.go @@ -12,9 +12,9 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/signature-aggregator/aggregator" "github.com/ava-labs/awm-relayer/signature-aggregator/metrics" - "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" "go.uber.org/zap" ) @@ -109,7 +109,7 @@ func signatureAggregationAPIHandler( writeJSONError(logger, w, msg) return } - message, err := types.UnpackWarpMessage(decodedMessage) + message, err := messages.UnpackWarpMessage(decodedMessage) if err != nil { msg := "Error unpacking warp message" logger.Warn(msg, zap.Error(err)) diff --git a/types/types.go b/types/types.go index c9acae25..f878bdc5 100644 --- a/types/types.go +++ b/types/types.go @@ -4,14 +4,9 @@ package types import ( - "context" "errors" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" - "github.com/ava-labs/awm-relayer/utils" - "github.com/ava-labs/subnet-evm/core/types" - "github.com/ava-labs/subnet-evm/ethclient" - "github.com/ava-labs/subnet-evm/interfaces" "github.com/ava-labs/subnet-evm/precompile/contracts/warp" "github.com/ethereum/go-ethereum/common" ) @@ -36,82 +31,3 @@ type WarpMessageInfo struct { SourceAddress common.Address UnsignedMessage *avalancheWarp.UnsignedMessage } - -// Extract Warp logs from the block, if they exist -func NewWarpBlockInfo( - header *types.Header, - ethClient ethclient.Client, - warpMessageDecoder func(log types.Log) (*WarpMessageInfo, bool, error), -) (*WarpBlockInfo, error) { - var ( - logs []types.Log - err error - ) - // Check if the block contains warp logs, and fetch them from the client if it does - if header.Bloom.Test(WarpPrecompileLogFilter[:]) { - cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCRetryTimeout) - defer cancel() - logs, err = utils.CallWithRetry[[]types.Log]( - cctx, - func() ([]types.Log, error) { - return ethClient.FilterLogs(context.Background(), interfaces.FilterQuery{ - Topics: [][]common.Hash{{WarpPrecompileLogFilter}}, - Addresses: []common.Address{warp.ContractAddress}, - FromBlock: header.Number, - ToBlock: header.Number, - }) - }) - if err != nil { - return nil, err - } - } - messages := make([]*WarpMessageInfo, 0, len(logs)) - for i, log := range logs { - warpLog, ok, err := warpMessageDecoder(log) - if err != nil { - return nil, err - } - if !ok { - continue - } - messages[i] = warpLog - } - - return &WarpBlockInfo{ - BlockNumber: header.Number.Uint64(), - Messages: messages, - }, nil -} - -// Extract the Warp message information from the raw log -func DefaultNewWarpMessageInfo(log types.Log) (*WarpMessageInfo, bool, error) { - if len(log.Topics) != 3 { - return nil, false, nil - } - if log.Topics[0] != WarpPrecompileLogFilter { - return nil, false, nil - } - unsignedMsg, err := UnpackWarpMessage(log.Data) - if err != nil { - return nil, false, err - } - - return &WarpMessageInfo{ - SourceAddress: common.BytesToAddress(log.Topics[1][:]), - UnsignedMessage: unsignedMsg, - }, true, nil -} - -func UnpackWarpMessage(unsignedMsgBytes []byte) (*avalancheWarp.UnsignedMessage, error) { - unsignedMsg, err := warp.UnpackSendWarpEventDataToMessage(unsignedMsgBytes) - if err != nil { - // If we failed to parse the message as a log, attempt to parse it as a standalone message - var standaloneErr error - unsignedMsg, standaloneErr = avalancheWarp.ParseUnsignedMessage(unsignedMsgBytes) - if standaloneErr != nil { - err = errors.Join(err, standaloneErr) - return nil, err - } - } - return unsignedMsg, nil -} From c681ff3b61b3c64c47a33e6e745a8cf7a25952fa Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Mon, 16 Sep 2024 14:34:35 +0200 Subject: [PATCH 05/10] pass message decoder --- messages/message_handler.go | 2 +- messages/mocks/mock_message_handler.go | 41 ++++++++++++++++++++++++++ relayer/main/main.go | 6 +--- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/messages/message_handler.go b/messages/message_handler.go index 55f7f8b4..19709915 100644 --- a/messages/message_handler.go +++ b/messages/message_handler.go @@ -66,7 +66,7 @@ type MessageDecoder interface { type WarpMessageDecoder struct{} // Extract Warp logs from the block, if they exist -func (w *WarpMessageDecoder) Decode( +func (w WarpMessageDecoder) Decode( header *subnetTypes.Header, ethClient subnetEthclient.Client, ) ([]*relayerTypes.WarpMessageInfo, error) { diff --git a/messages/mocks/mock_message_handler.go b/messages/mocks/mock_message_handler.go index c7a43c37..61b4db67 100644 --- a/messages/mocks/mock_message_handler.go +++ b/messages/mocks/mock_message_handler.go @@ -15,7 +15,10 @@ import ( ids "github.com/ava-labs/avalanchego/ids" warp "github.com/ava-labs/avalanchego/vms/platformvm/warp" messages "github.com/ava-labs/awm-relayer/messages" + types "github.com/ava-labs/awm-relayer/types" vms "github.com/ava-labs/awm-relayer/vms" + types0 "github.com/ava-labs/subnet-evm/core/types" + ethclient "github.com/ava-labs/subnet-evm/ethclient" common "github.com/ethereum/go-ethereum/common" gomock "go.uber.org/mock/gomock" ) @@ -142,3 +145,41 @@ func (mr *MockMessageHandlerMockRecorder) ShouldSendMessage(destinationClient an mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShouldSendMessage", reflect.TypeOf((*MockMessageHandler)(nil).ShouldSendMessage), destinationClient) } + +// MockMessageDecoder is a mock of MessageDecoder interface. +type MockMessageDecoder struct { + ctrl *gomock.Controller + recorder *MockMessageDecoderMockRecorder +} + +// MockMessageDecoderMockRecorder is the mock recorder for MockMessageDecoder. +type MockMessageDecoderMockRecorder struct { + mock *MockMessageDecoder +} + +// NewMockMessageDecoder creates a new mock instance. +func NewMockMessageDecoder(ctrl *gomock.Controller) *MockMessageDecoder { + mock := &MockMessageDecoder{ctrl: ctrl} + mock.recorder = &MockMessageDecoderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMessageDecoder) EXPECT() *MockMessageDecoderMockRecorder { + return m.recorder +} + +// Decode mocks base method. +func (m *MockMessageDecoder) Decode(header *types0.Header, ethClient ethclient.Client) ([]*types.WarpMessageInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Decode", header, ethClient) + ret0, _ := ret[0].([]*types.WarpMessageInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Decode indicates an expected call of Decode. +func (mr *MockMessageDecoderMockRecorder) Decode(header, ethClient any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Decode", reflect.TypeOf((*MockMessageDecoder)(nil).Decode), header, ethClient) +} diff --git a/relayer/main/main.go b/relayer/main/main.go index 3ed2a199..2b85d9d2 100644 --- a/relayer/main/main.go +++ b/relayer/main/main.go @@ -250,7 +250,7 @@ func main() { logger.Fatal("Failed to create application relayers", zap.Error(err)) panic(err) } - messagesDecoders := createMessageDecoders(&cfg) + messagesDecoders := []messages.MessageDecoder{messages.WarpMessageDecoder{}} messageCoordinator := relayer.NewMessageCoordinator( logger, messageHandlerFactories, @@ -552,10 +552,6 @@ func createHealthTrackers(cfg *config.Config) map[ids.ID]*atomic.Bool { return healthTrackers } -func createMessageDecoders(cfg *config.Config) []messages.MessageDecoder { - return nil -} - func startMetricsServer(logger logging.Logger, gatherer prometheus.Gatherer, port uint16) { http.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{})) From 297b49cf097b4d1a0e1bdc6ef3fc089a477a12b5 Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Wed, 18 Sep 2024 10:29:41 +0200 Subject: [PATCH 06/10] encode info in message --- messages/chainlink/config.go | 44 ++++++ messages/chainlink/message_handler.go | 185 +++++++++++++++++++++++--- messages/message_handler.go | 10 +- relayer/listener.go | 1 + relayer/main/main.go | 4 +- relayer/message_coordinator.go | 3 +- 6 files changed, 227 insertions(+), 20 deletions(-) create mode 100644 messages/chainlink/config.go diff --git a/messages/chainlink/config.go b/messages/chainlink/config.go new file mode 100644 index 00000000..aff98f46 --- /dev/null +++ b/messages/chainlink/config.go @@ -0,0 +1,44 @@ +package chainlink + +import ( + "fmt" + "strconv" + + "github.com/ethereum/go-ethereum/common" +) + +type RawConfig struct { + AggregatorsToReplicas map[string]string `json:"aggregators-to-replicas"` + MaxFilterAdresses string `json:"max-filter-addresses"` +} + +type Config struct { + AggregatorsToReplicas map[common.Address]common.Address + MaxFilterAdresses uint64 +} + +func (c *RawConfig) Parse() (*Config, error) { + maxFilterAdresses, err := strconv.ParseUint(c.MaxFilterAdresses, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid integer for max-filter-addresses cannot be parsed to uint64: %s", c.MaxFilterAdresses) + } + aggregatorToReplicas := make(map[common.Address]common.Address, 0) + for aggregator, replica := range c.AggregatorsToReplicas { + if !common.IsHexAddress(aggregator) { + return nil, fmt.Errorf("invalid price feed aggregator address for EVM source subnet: %s", aggregator) + } + if !common.IsHexAddress(replica) { + return nil, fmt.Errorf("invalid price feed replica address for EVM source subnet: %s", replica) + } + aggregator, replica := common.HexToAddress(aggregator), common.HexToAddress(replica) + if _, ok := aggregatorToReplicas[aggregator]; ok { + return nil, fmt.Errorf("duplicate aggregator entry for %s", aggregator) + } + aggregatorToReplicas[aggregator] = replica + } + config := Config{ + AggregatorsToReplicas: aggregatorToReplicas, + MaxFilterAdresses: maxFilterAdresses, + } + return &config, nil +} diff --git a/messages/chainlink/message_handler.go b/messages/chainlink/message_handler.go index c7a0b4fd..bdf1cab9 100644 --- a/messages/chainlink/message_handler.go +++ b/messages/chainlink/message_handler.go @@ -1,6 +1,9 @@ package chainlink import ( + "context" + "encoding/json" + "fmt" "math/big" "github.com/ava-labs/avalanchego/ids" @@ -9,38 +12,178 @@ import ( "github.com/ava-labs/awm-relayer/abi-bindings/eventimporter" "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/relayer/config" + relayerTypes "github.com/ava-labs/awm-relayer/types" + "github.com/ava-labs/awm-relayer/utils" "github.com/ava-labs/awm-relayer/vms" + subnetTypes "github.com/ava-labs/subnet-evm/core/types" + subnetEthclient "github.com/ava-labs/subnet-evm/ethclient" + subnetInterfaces "github.com/ava-labs/subnet-evm/interfaces" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" "go.uber.org/zap" ) type factory struct { - logger logging.Logger - aggregator common.Address + logger logging.Logger + config *Config } type ChainlinkMessageHandler struct { - unsignedMessage *warp.UnsignedMessage - logger logging.Logger - aggregator common.Address + unsignedMessage *warp.UnsignedMessage + logger logging.Logger + maxFilterAdresses uint64 + aggregatorsToReplicas map[common.Address]common.Address + aggregators []common.Address +} + +type ChainlinkMessageDecoder struct { + handler *ChainlinkMessageHandler +} + +type ChainlinkMessage struct { + aggregator common.Address + + blockHeader []byte + txIndex *big.Int + receiptProof [][]byte + logIndex *big.Int + + current *big.Int + roundId *big.Int + updatedAt *big.Int + data []byte +} + +var ChainlinkPriceUpdatedFilter = common.HexToHash("0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f") + +func (c ChainlinkMessageDecoder) Decode( + ctx context.Context, + header *subnetTypes.Header, + ethClient subnetEthclient.Client, +) ([]*relayerTypes.WarpMessageInfo, error) { + var ( + logs []subnetTypes.Log + err error + ) + // Check if the block contains warp logs, and fetch them from the client if it does + if header.Bloom.Test(ChainlinkPriceUpdatedFilter[:]) { + cctx, cancel := context.WithTimeout(context.Background(), utils.DefaultRPCRetryTimeout) + defer cancel() + logs, err = utils.CallWithRetry[[]subnetTypes.Log]( + cctx, + func() ([]subnetTypes.Log, error) { + return ethClient.FilterLogs(context.Background(), subnetInterfaces.FilterQuery{ + Topics: [][]common.Hash{{ChainlinkPriceUpdatedFilter}}, + Addresses: c.handler.aggregators, + FromBlock: header.Number, + ToBlock: header.Number, + }) + }) + if err != nil { + return nil, err + } + } + messages := make([]*relayerTypes.WarpMessageInfo, len(logs)) + for i, log := range logs { + warpLog, err := NewWarpMessageInfo(ctx, log, ethClient) + if err != nil { + return nil, err + } + messages[i] = warpLog + } + + return messages, nil +} + +func NewWarpMessageInfo( + ctx context.Context, + log subnetTypes.Log, + ethclient subnetEthclient.Client, +) ( + *relayerTypes.WarpMessageInfo, + error, +) { + if len(log.Topics) != 4 { + return nil, relayerTypes.ErrInvalidLog + } + if log.Topics[0] != ChainlinkPriceUpdatedFilter { + return nil, relayerTypes.ErrInvalidLog + } + block, err := ethclient.BlockByHash(ctx, log.BlockHash) + if err != nil { + return nil, err + } + blockHeader, err := rlp.EncodeToBytes(block.Header) + if err != nil { + return nil, err + } + msg := ChainlinkMessage{ + aggregator: log.Address, + blockHeader: blockHeader, + current: log.Topics[1].Big(), + roundId: log.Topics[2].Big(), + updatedAt: log.Topics[3].Big(), + data: log.Data, + } + unsignedMsg, err := ConvertToUnsignedMessage(&msg) + if err != nil { + return nil, err + } + + return &relayerTypes.WarpMessageInfo{ + SourceAddress: common.BytesToAddress(log.Address[:]), + UnsignedMessage: unsignedMsg, + }, nil +} + +func ConvertToUnsignedMessage(msg *ChainlinkMessage) (*warp.UnsignedMessage, error) { + bytes, err := json.Marshal(msg) + if err != nil { + return nil, err + } + return warp.ParseUnsignedMessage(bytes) } func NewMessageHandlerFactory( logger logging.Logger, messageProtocolConfig config.MessageProtocolConfig, ) (messages.MessageHandlerFactory, error) { + data, err := json.Marshal(messageProtocolConfig.Settings) + if err != nil { + logger.Error("Failed to marshal Teleporter config") + return nil, err + } + var messageConfig RawConfig + if err := json.Unmarshal(data, &messageConfig); err != nil { + logger.Error("Failed to unmarshal Teleporter config") + return nil, err + } + + config, err := messageConfig.Parse() + if err != nil { + return nil, err + } + return &factory{ - logger: logger, - aggregator: common.Address{}, + logger: logger, + config: config, }, nil } func (f *factory) NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (messages.MessageHandler, error) { + aggregatorsToReplicas := f.config.AggregatorsToReplicas + aggregators := make([]common.Address, len(aggregatorsToReplicas)) + for aggregator := range aggregatorsToReplicas { + aggregators = append(aggregators, aggregator) + } + return &ChainlinkMessageHandler{ - logger: f.logger, - unsignedMessage: unsignedMessage, - aggregator: f.aggregator, + logger: f.logger, + unsignedMessage: unsignedMessage, + maxFilterAdresses: f.config.MaxFilterAdresses, + aggregatorsToReplicas: aggregatorsToReplicas, + aggregators: aggregators, }, nil } @@ -73,11 +216,11 @@ func (c *ChainlinkMessageHandler) SendMessage( ) return common.Hash{}, err } - blockHeader := []byte{} - txIndex := big.NewInt(0) - receiptProof := [][]byte{} - logIndex := big.NewInt(0) - callData, err := eventimporter.PackImportEvent(blockHeader, txIndex, receiptProof, logIndex) + var msg ChainlinkMessage + if err := json.Unmarshal(signedMessage.Payload, &msg); err != nil { + return common.Hash{}, err + } + callData, err := eventimporter.PackImportEvent(msg.blockHeader, msg.txIndex, msg.receiptProof, msg.logIndex) if err != nil { c.logger.Error( "Failed packing importEvent call data", @@ -87,9 +230,19 @@ func (c *ChainlinkMessageHandler) SendMessage( return common.Hash{}, err } + replica, ok := c.aggregatorsToReplicas[msg.aggregator] + if !ok { + c.logger.Error( + "Failed to find replica for aggregator", + zap.String("destinationBlockchainID", destinationBlockchainID.String()), + zap.String("warpMessageID", signedMessage.ID().String()), + zap.Error(err), + ) + return common.Hash{}, fmt.Errorf("failed to find replica for aggregator: %s", msg.aggregator) + } txHash, err := destinationClient.SendTx( signedMessage, - c.aggregator.Hex(), + replica.Hex(), gasLimit, callData, ) diff --git a/messages/message_handler.go b/messages/message_handler.go index 19709915..5a410052 100644 --- a/messages/message_handler.go +++ b/messages/message_handler.go @@ -60,13 +60,21 @@ type MessageHandler interface { } type MessageDecoder interface { - Decode(header *subnetTypes.Header, ethClient subnetEthclient.Client) ([]*relayerTypes.WarpMessageInfo, error) + Decode( + ctx context.Context, + header *subnetTypes.Header, + ethClient subnetEthclient.Client, + ) ( + []*relayerTypes.WarpMessageInfo, + error, + ) } type WarpMessageDecoder struct{} // Extract Warp logs from the block, if they exist func (w WarpMessageDecoder) Decode( + ctx context.Context, header *subnetTypes.Header, ethClient subnetEthclient.Client, ) ([]*relayerTypes.WarpMessageInfo, error) { diff --git a/relayer/listener.go b/relayer/listener.go index 379af75f..5a0283bc 100644 --- a/relayer/listener.go +++ b/relayer/listener.go @@ -193,6 +193,7 @@ func (lstnr *Listener) processLogs(ctx context.Context) error { } case blockHeader := <-lstnr.Subscriber.Headers(): go lstnr.messageCoordinator.ProcessBlock( + ctx, blockHeader, lstnr.sourceBlockchain.GetBlockchainID(), lstnr.ethClient, diff --git a/relayer/main/main.go b/relayer/main/main.go index 2b85d9d2..655f837f 100644 --- a/relayer/main/main.go +++ b/relayer/main/main.go @@ -20,6 +20,7 @@ import ( "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/awm-relayer/database" "github.com/ava-labs/awm-relayer/messages" + "github.com/ava-labs/awm-relayer/messages/chainlink" offchainregistry "github.com/ava-labs/awm-relayer/messages/off-chain-registry" "github.com/ava-labs/awm-relayer/messages/teleporter" "github.com/ava-labs/awm-relayer/peers" @@ -323,8 +324,7 @@ func createMessageHandlerFactories( cfg, ) case config.CHAINLINK_PRICE_FEED: - // TODO - m, err = offchainregistry.NewMessageHandlerFactory( + m, err = chainlink.NewMessageHandlerFactory( logger, cfg, ) diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go index 54534080..df8e396c 100644 --- a/relayer/message_coordinator.go +++ b/relayer/message_coordinator.go @@ -226,6 +226,7 @@ func (mc *MessageCoordinator) ProcessMessageID( // Meant to be ran asynchronously. Errors should be sent to errChan. func (mc *MessageCoordinator) ProcessBlock( + ctx context.Context, blockHeader *types.Header, blockchainID ids.ID, ethClient ethclient.Client, @@ -235,7 +236,7 @@ func (mc *MessageCoordinator) ProcessBlock( number := blockHeader.Number.Uint64() msgs := make([]*relayerTypes.WarpMessageInfo, 0) for _, msgDecoder := range mc.messageDecoders { - message, err := msgDecoder.Decode(blockHeader, ethClient) + message, err := msgDecoder.Decode(ctx, blockHeader, ethClient) if err != nil { mc.logger.Error( "Failed to create Warp block info", From 99fe8c4b30c5646ea07793c27804ce863cb90f9f Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Thu, 19 Sep 2024 09:05:00 +0200 Subject: [PATCH 07/10] add state to chainlink message decoder --- messages/chainlink/config.go | 20 +++-- messages/chainlink/message_handler.go | 82 ++++++++++++++----- messages/message_handler.go | 2 +- .../off-chain-registry/message_handler.go | 3 +- messages/teleporter/message_handler.go | 3 +- relayer/main/main.go | 34 +++++++- relayer/message_coordinator.go | 2 +- 7 files changed, 113 insertions(+), 33 deletions(-) diff --git a/messages/chainlink/config.go b/messages/chainlink/config.go index aff98f46..421e57bd 100644 --- a/messages/chainlink/config.go +++ b/messages/chainlink/config.go @@ -4,17 +4,20 @@ import ( "fmt" "strconv" + "github.com/ava-labs/avalanchego/ids" "github.com/ethereum/go-ethereum/common" ) type RawConfig struct { - AggregatorsToReplicas map[string]string `json:"aggregators-to-replicas"` - MaxFilterAdresses string `json:"max-filter-addresses"` + AggregatorsToReplicas map[string]string `json:"aggregators-to-replicas"` + MaxFilterAdresses string `json:"max-filter-addresses"` + DestinationBlockchainID string `json:"destination-blockchain-id"` } type Config struct { - AggregatorsToReplicas map[common.Address]common.Address - MaxFilterAdresses uint64 + AggregatorsToReplicas map[common.Address]common.Address + MaxFilterAdresses uint64 + DestinationBlockchainID ids.ID } func (c *RawConfig) Parse() (*Config, error) { @@ -36,9 +39,14 @@ func (c *RawConfig) Parse() (*Config, error) { } aggregatorToReplicas[aggregator] = replica } + destinationBlockchainID, err := ids.FromString(c.DestinationBlockchainID) + if err != nil { + return nil, err + } config := Config{ - AggregatorsToReplicas: aggregatorToReplicas, - MaxFilterAdresses: maxFilterAdresses, + AggregatorsToReplicas: aggregatorToReplicas, + MaxFilterAdresses: maxFilterAdresses, + DestinationBlockchainID: destinationBlockchainID, } return &config, nil } diff --git a/messages/chainlink/message_handler.go b/messages/chainlink/message_handler.go index bdf1cab9..b0fbfdfb 100644 --- a/messages/chainlink/message_handler.go +++ b/messages/chainlink/message_handler.go @@ -30,15 +30,16 @@ type factory struct { } type ChainlinkMessageHandler struct { - unsignedMessage *warp.UnsignedMessage - logger logging.Logger - maxFilterAdresses uint64 - aggregatorsToReplicas map[common.Address]common.Address - aggregators []common.Address + unsignedMessage *warp.UnsignedMessage + logger logging.Logger + destinationBlockchainID ids.ID + maxFilterAdresses uint64 + aggregatorsToReplicas map[common.Address]common.Address + aggregators []common.Address } type ChainlinkMessageDecoder struct { - handler *ChainlinkMessageHandler + aggregators []common.Address } type ChainlinkMessage struct { @@ -57,6 +58,20 @@ type ChainlinkMessage struct { var ChainlinkPriceUpdatedFilter = common.HexToHash("0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f") +func NewMessageDecoder(messageProtocolConfig config.MessageProtocolConfig) (*ChainlinkMessageDecoder, error) { + cfg, err := ParseConfig(messageProtocolConfig) + aggregators := make([]common.Address, len(cfg.AggregatorsToReplicas)) + for aggregator := range cfg.AggregatorsToReplicas { + aggregators = append(aggregators, aggregator) + } + if err != nil { + return nil, err + } + return &ChainlinkMessageDecoder{ + aggregators: aggregators, + }, nil +} + func (c ChainlinkMessageDecoder) Decode( ctx context.Context, header *subnetTypes.Header, @@ -75,7 +90,7 @@ func (c ChainlinkMessageDecoder) Decode( func() ([]subnetTypes.Log, error) { return ethClient.FilterLogs(context.Background(), subnetInterfaces.FilterQuery{ Topics: [][]common.Hash{{ChainlinkPriceUpdatedFilter}}, - Addresses: c.handler.aggregators, + Addresses: c.aggregators, FromBlock: header.Number, ToBlock: header.Number, }) @@ -145,19 +160,14 @@ func ConvertToUnsignedMessage(msg *ChainlinkMessage) (*warp.UnsignedMessage, err return warp.ParseUnsignedMessage(bytes) } -func NewMessageHandlerFactory( - logger logging.Logger, - messageProtocolConfig config.MessageProtocolConfig, -) (messages.MessageHandlerFactory, error) { +func ParseConfig(messageProtocolConfig config.MessageProtocolConfig) (*Config, error) { data, err := json.Marshal(messageProtocolConfig.Settings) if err != nil { - logger.Error("Failed to marshal Teleporter config") - return nil, err + return nil, fmt.Errorf("Failed to marshal Teleporter config: %w", err) } var messageConfig RawConfig if err := json.Unmarshal(data, &messageConfig); err != nil { - logger.Error("Failed to unmarshal Teleporter config") - return nil, err + return nil, fmt.Errorf("Failed to unmarshal Teleporter config: %w", err) } config, err := messageConfig.Parse() @@ -165,6 +175,18 @@ func NewMessageHandlerFactory( return nil, err } + return config, nil +} + +func NewMessageHandlerFactory( + logger logging.Logger, + messageProtocolConfig config.MessageProtocolConfig, +) (messages.MessageHandlerFactory, error) { + config, err := ParseConfig(messageProtocolConfig) + if err != nil { + return nil, err + } + return &factory{ logger: logger, config: config, @@ -179,11 +201,12 @@ func (f *factory) NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (mess } return &ChainlinkMessageHandler{ - logger: f.logger, - unsignedMessage: unsignedMessage, - maxFilterAdresses: f.config.MaxFilterAdresses, - aggregatorsToReplicas: aggregatorsToReplicas, - aggregators: aggregators, + logger: f.logger, + unsignedMessage: unsignedMessage, + destinationBlockchainID: f.config.DestinationBlockchainID, + maxFilterAdresses: f.config.MaxFilterAdresses, + aggregatorsToReplicas: aggregatorsToReplicas, + aggregators: aggregators, }, nil } @@ -272,14 +295,29 @@ func (c *ChainlinkMessageHandler) SendMessage( return txHash, nil } -func (c *ChainlinkMessageHandler) GetMessageRoutingInfo() ( +func (c *ChainlinkMessageHandler) GetMessageRoutingInfo(warpMessageInfo *relayerTypes.WarpMessageInfo) ( ids.ID, common.Address, ids.ID, common.Address, error, ) { - return ids.Empty, common.Address{}, ids.Empty, common.Address{}, nil + var msg ChainlinkMessage + err := json.Unmarshal(warpMessageInfo.UnsignedMessage.Payload, &msg) + if err != nil { + return ids.Empty, common.Address{}, ids.Empty, common.Address{}, err + } + + replica, ok := c.aggregatorsToReplicas[msg.aggregator] + if !ok { + return ids.Empty, common.Address{}, ids.Empty, common.Address{}, fmt.Errorf("replica not found for aggregator %s", msg.aggregator) + } + + return c.unsignedMessage.SourceChainID, + warpMessageInfo.SourceAddress, + c.destinationBlockchainID, + replica, + nil } func (c *ChainlinkMessageHandler) GetUnsignedMessage() *warp.UnsignedMessage { diff --git a/messages/message_handler.go b/messages/message_handler.go index 5a410052..b4908486 100644 --- a/messages/message_handler.go +++ b/messages/message_handler.go @@ -47,7 +47,7 @@ type MessageHandler interface { // GetMessageRoutingInfo returns the source chain ID, origin sender address, // destination chain ID, and destination address. - GetMessageRoutingInfo() ( + GetMessageRoutingInfo(warpMessageInfo *relayerTypes.WarpMessageInfo) ( ids.ID, common.Address, ids.ID, diff --git a/messages/off-chain-registry/message_handler.go b/messages/off-chain-registry/message_handler.go index 090c1a23..30570189 100644 --- a/messages/off-chain-registry/message_handler.go +++ b/messages/off-chain-registry/message_handler.go @@ -14,6 +14,7 @@ import ( warpPayload "github.com/ava-labs/avalanchego/vms/platformvm/warp/payload" "github.com/ava-labs/awm-relayer/messages" "github.com/ava-labs/awm-relayer/relayer/config" + relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/accounts/abi/bind" "github.com/ava-labs/subnet-evm/ethclient" @@ -195,7 +196,7 @@ func (m *messageHandler) SendMessage( return txHash, nil } -func (m *messageHandler) GetMessageRoutingInfo() ( +func (m *messageHandler) GetMessageRoutingInfo(warpMessageInfo *relayerTypes.WarpMessageInfo) ( ids.ID, common.Address, ids.ID, diff --git a/messages/teleporter/message_handler.go b/messages/teleporter/message_handler.go index 4cbd6041..aa72c908 100644 --- a/messages/teleporter/message_handler.go +++ b/messages/teleporter/message_handler.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/awm-relayer/messages" pbDecider "github.com/ava-labs/awm-relayer/proto/pb/decider" "github.com/ava-labs/awm-relayer/relayer/config" + relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/vms" "github.com/ava-labs/subnet-evm/accounts/abi/bind" "github.com/ava-labs/subnet-evm/ethclient" @@ -134,7 +135,7 @@ func (m *messageHandler) GetUnsignedMessage() *warp.UnsignedMessage { return m.unsignedMessage } -func (m *messageHandler) GetMessageRoutingInfo() ( +func (m *messageHandler) GetMessageRoutingInfo(warpMessageInfo *relayerTypes.WarpMessageInfo) ( ids.ID, common.Address, ids.ID, diff --git a/relayer/main/main.go b/relayer/main/main.go index 655f837f..a89cfb07 100644 --- a/relayer/main/main.go +++ b/relayer/main/main.go @@ -251,7 +251,11 @@ func main() { logger.Fatal("Failed to create application relayers", zap.Error(err)) panic(err) } - messagesDecoders := []messages.MessageDecoder{messages.WarpMessageDecoder{}} + messagesDecoders, err := createMessageDecoders(logger, &cfg) + if err != nil { + logger.Fatal("Failed to create application relayers", zap.Error(err)) + panic(err) + } messageCoordinator := relayer.NewMessageCoordinator( logger, messageHandlerFactories, @@ -342,6 +346,34 @@ func createMessageHandlerFactories( return messageHandlerFactories, nil } +func createMessageDecoders(logger logging.Logger, globalConfig *config.Config) ([]messages.MessageDecoder, error) { + messageDecoders := make([]messages.MessageDecoder, 0) + for _, sourceBlockchain := range globalConfig.SourceBlockchains { + // Create message decoder for each supported message protocol + for _, cfg := range sourceBlockchain.MessageContracts { + format := cfg.MessageFormat + var ( + m messages.MessageDecoder + err error + ) + switch config.ParseMessageProtocol(format) { + case config.TELEPORTER, config.OFF_CHAIN_REGISTRY: + m = messages.WarpMessageDecoder{} + case config.CHAINLINK_PRICE_FEED: + m, err = chainlink.NewMessageDecoder(cfg) + default: + m, err = nil, fmt.Errorf("invalid message format %s", format) + } + if err != nil { + logger.Error("Failed to create message handler factory", zap.Error(err)) + return nil, err + } + messageDecoders = append(messageDecoders, m) + } + } + return messageDecoders, nil +} + func createSourceClients( ctx context.Context, logger logging.Logger, diff --git a/relayer/message_coordinator.go b/relayer/message_coordinator.go index df8e396c..2dc75a6f 100644 --- a/relayer/message_coordinator.go +++ b/relayer/message_coordinator.go @@ -81,7 +81,7 @@ func (mc *MessageCoordinator) getAppRelayerMessageHandler( // Fetch the message delivery data //nolint:lll - sourceBlockchainID, originSenderAddress, destinationBlockchainID, destinationAddress, err := messageHandler.GetMessageRoutingInfo() + sourceBlockchainID, originSenderAddress, destinationBlockchainID, destinationAddress, err := messageHandler.GetMessageRoutingInfo(warpMessageInfo) if err != nil { mc.logger.Error("Failed to get message routing information", zap.Error(err)) return nil, nil, err From 6cbbff5c5ab031482cbdd67e465750d06a52c2fa Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Thu, 19 Sep 2024 09:25:18 +0200 Subject: [PATCH 08/10] fix err check --- messages/chainlink/message_handler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/messages/chainlink/message_handler.go b/messages/chainlink/message_handler.go index b0fbfdfb..5448e466 100644 --- a/messages/chainlink/message_handler.go +++ b/messages/chainlink/message_handler.go @@ -60,6 +60,9 @@ var ChainlinkPriceUpdatedFilter = common.HexToHash("0559884fd3a460db3073b7fc896c func NewMessageDecoder(messageProtocolConfig config.MessageProtocolConfig) (*ChainlinkMessageDecoder, error) { cfg, err := ParseConfig(messageProtocolConfig) + if err != nil { + return nil, err + } aggregators := make([]common.Address, len(cfg.AggregatorsToReplicas)) for aggregator := range cfg.AggregatorsToReplicas { aggregators = append(aggregators, aggregator) @@ -310,7 +313,8 @@ func (c *ChainlinkMessageHandler) GetMessageRoutingInfo(warpMessageInfo *relayer replica, ok := c.aggregatorsToReplicas[msg.aggregator] if !ok { - return ids.Empty, common.Address{}, ids.Empty, common.Address{}, fmt.Errorf("replica not found for aggregator %s", msg.aggregator) + err = fmt.Errorf("replica not found for aggregator %s", msg.aggregator) + return ids.Empty, common.Address{}, ids.Empty, common.Address{}, err } return c.unsignedMessage.SourceChainID, From 4445736ede22309e4f524ed9823cab03bfe3183f Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Fri, 20 Sep 2024 10:01:56 +0200 Subject: [PATCH 09/10] send receipt proof --- messages/chainlink/message_handler.go | 31 +++- messages/chainlink/proofs/proof_utils.go | 196 +++++++++++++++++++++++ 2 files changed, 220 insertions(+), 7 deletions(-) create mode 100644 messages/chainlink/proofs/proof_utils.go diff --git a/messages/chainlink/message_handler.go b/messages/chainlink/message_handler.go index 5448e466..3b4de5b8 100644 --- a/messages/chainlink/message_handler.go +++ b/messages/chainlink/message_handler.go @@ -11,6 +11,7 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/awm-relayer/abi-bindings/eventimporter" "github.com/ava-labs/awm-relayer/messages" + "github.com/ava-labs/awm-relayer/messages/chainlink/proofs" "github.com/ava-labs/awm-relayer/relayer/config" relayerTypes "github.com/ava-labs/awm-relayer/types" "github.com/ava-labs/awm-relayer/utils" @@ -136,13 +137,25 @@ func NewWarpMessageInfo( if err != nil { return nil, err } + memDb, err := proofs.ConstructSubnetEVMReceiptProof(ctx, ethclient, log.BlockHash, log.TxIndex) + if err != nil { + return nil, err + } + it := memDb.NewIterator(nil, nil) + receiptProof := make([][]byte, 0) + for it.Next() { + receiptProof = append(receiptProof, it.Value()) + } msg := ChainlinkMessage{ - aggregator: log.Address, - blockHeader: blockHeader, - current: log.Topics[1].Big(), - roundId: log.Topics[2].Big(), - updatedAt: log.Topics[3].Big(), - data: log.Data, + aggregator: log.Address, + blockHeader: blockHeader, + txIndex: new(big.Int).SetUint64(uint64(log.TxIndex)), + receiptProof: receiptProof, + logIndex: new(big.Int).SetUint64(uint64(log.Index)), + current: log.Topics[1].Big(), + roundId: log.Topics[2].Big(), + updatedAt: log.Topics[3].Big(), + data: log.Data, } unsignedMsg, err := ConvertToUnsignedMessage(&msg) if err != nil { @@ -282,7 +295,7 @@ func (c *ChainlinkMessageHandler) SendMessage( return common.Hash{}, err } - teleporterMessageID := ids.Empty + teleporterMessageID := ids.Empty // TODO // Wait for the message to be included in a block before returning err = messages.WaitForReceipt(c.logger, signedMessage, destinationClient, txHash, teleporterMessageID) if err != nil { @@ -327,3 +340,7 @@ func (c *ChainlinkMessageHandler) GetMessageRoutingInfo(warpMessageInfo *relayer func (c *ChainlinkMessageHandler) GetUnsignedMessage() *warp.UnsignedMessage { return c.unsignedMessage } + +func GetReceiptProof(blockHash common.Hash, txIndex uint, ethclient subnetEthclient.Client) ([][]byte, error) { + return [][]byte{}, nil +} diff --git a/messages/chainlink/proofs/proof_utils.go b/messages/chainlink/proofs/proof_utils.go new file mode 100644 index 00000000..ef534447 --- /dev/null +++ b/messages/chainlink/proofs/proof_utils.go @@ -0,0 +1,196 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package proofs + +import ( + "context" + "encoding/hex" + "fmt" + + corethtypes "github.com/ava-labs/coreth/core/types" + corethethclient "github.com/ava-labs/coreth/ethclient" + corethtrie "github.com/ava-labs/coreth/trie" + subnetevmtypes "github.com/ava-labs/subnet-evm/core/types" + subnetevmethclient "github.com/ava-labs/subnet-evm/ethclient" + subnetevmtrie "github.com/ava-labs/subnet-evm/trie" + subnetevmtriedb "github.com/ava-labs/subnet-evm/triedb" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb/memorydb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// TODO: Is there a better way to handle the different block hash constructions between coreth and subnet-evm +// that doesn't require duplicating the ConstructReceiptProof function? + +// Constructs a Merkle proof for the specified txIndex in the block with the given blockHash. +// The proof is returned as a memorydb.Database. +// Should only be used for chains that use coreth as their VM. Not compatible with subnet-evm chains. +func ConstructCorethReceiptProof( + ctx context.Context, + ethClient corethethclient.Client, + blockHash common.Hash, + txIndex uint, +) (*memorydb.Database, error) { + // Get the block info + blockInfo, err := ethClient.BlockByHash(ctx, blockHash) + if err != nil || blockInfo == nil { + log.Error("Failed to get block info", "blockHash", blockHash.String(), "err", err) + return nil, err + } + if blockInfo.Hash() != blockHash { + log.Error("Block hash does not match", "blockHash", blockHash.String()) + return nil, fmt.Errorf("block hash does not match") + } + + encodedBlockHeader, err := rlp.EncodeToBytes(blockInfo.Header()) + if err != nil { + log.Error("Failed to encode block header", "blockHash", blockHash.String(), "err", err) + return nil, err + } + log.Info("Fetched block header", "blockHash", blockHash.String(), "header", hex.EncodeToString(encodedBlockHeader)) + + // Get the receipts for each transaction in the block + receipts := make([]*corethtypes.Receipt, blockInfo.Transactions().Len()) + for i, tx := range blockInfo.Transactions() { + receipt, err := ethClient.TransactionReceipt(ctx, tx.Hash()) + if err != nil { + log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", err) + return nil, err + } + receipts[i] = receipt + encodedReceipt, err := rlp.EncodeToBytes(receipt) + if err != nil { + log.Error("Failed to encode receipt", "txHash", tx.Hash().String(), "err", err) + return nil, err + } + log.Info("Fetched encoded receipt", "txHash", tx.Hash().String(), "receipt", hex.EncodeToString(encodedReceipt)) + } + log.Info("Fetched all receipts for block", "blockHash", blockHash.String(), "numReceipts", len(receipts)) + + // Create a trie of the receipts + receiptTrie, err := corethtrie.New(corethtrie.StateTrieID(common.Hash{}), corethtrie.NewDatabase(nil, nil)) + if err != nil { + log.Error("Failed to create receipt trie", "err", err) + return nil, err + } + + // Defensive check that the receipts root matches the block header. + // This should always be the case. + receiptsRoot := corethtypes.DeriveSha(corethtypes.Receipts(receipts), receiptTrie) + log.Info("Computed receipts trie root", "root", receiptsRoot.String()) + if receiptsRoot != blockInfo.Header().ReceiptHash { + log.Error("Receipts root does not match", "blockHash", blockHash.String()) + return nil, err + } + + // Construct the proof of the request receipt against the trie. + key, err := rlp.EncodeToBytes(txIndex) + if err != nil { + log.Error("Failed to encode tx index", "err", err) + return nil, err + } + memoryDB := memorydb.New() + err = receiptTrie.Prove(key, memoryDB) + if err != nil { + log.Error("Failed to prove receipt", "err", err) + return nil, err + } + log.Info("Created Merkle proof for receipt", "txIndex", txIndex) + + // Double check that the proof is valid. + verifiedValue, err := corethtrie.VerifyProof(receiptsRoot, key, memoryDB) + if err != nil { + log.Error("Failed to verify proof", "err", err) + return nil, err + } + log.Info("Verified proof", "key", hex.EncodeToString(key), "value", hex.EncodeToString(verifiedValue)) + + return memoryDB, nil +} + +// Constructs a Merkle proof for the specified txIndex in the block with the given blockHash. +// The proof is returned as a memorydb.Database. +// Should only be used for chains that use subnet-evm as their VM. Not compatible with coreth chains. +func ConstructSubnetEVMReceiptProof( + ctx context.Context, + ethClient subnetevmethclient.Client, + blockHash common.Hash, + txIndex uint, +) (*memorydb.Database, error) { + // Get the block info + blockInfo, err := ethClient.BlockByHash(ctx, blockHash) + if err != nil || blockInfo == nil { + log.Error("Failed to get block info", "blockHash", blockHash.String(), "err", err) + return nil, err + } + if blockInfo.Hash() != blockHash { + log.Error("Block hash does not match", "blockHash", blockHash.String()) + return nil, fmt.Errorf("block hash does not match") + } + + // Get the receipts for each transaction in the block + receipts := make([]*subnetevmtypes.Receipt, blockInfo.Transactions().Len()) + for i, tx := range blockInfo.Transactions() { + receipt, err := ethClient.TransactionReceipt(ctx, tx.Hash()) + if err != nil { + log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", err) + return nil, err + } + receipts[i] = receipt + encodedReceipt, err := rlp.EncodeToBytes(receipt) + if err != nil { + log.Error("Failed to encode receipt", "txHash", tx.Hash().String(), "err", err) + return nil, err + } + log.Info("Got encoded receipt", "txHash", tx.Hash().String(), "receipt", hex.EncodeToString(encodedReceipt)) + } + + // Create a trie of the receipts + receiptTrie, err := subnetevmtrie.New(subnetevmtrie.StateTrieID(common.Hash{}), subnetevmtriedb.NewDatabase(nil, nil)) + if err != nil { + log.Error("Failed to create receipt trie", "err", err) + return nil, err + } + + // Defensive check that the receipts root matches the block header. + // This should always be the case. + receiptsRoot := subnetevmtypes.DeriveSha(subnetevmtypes.Receipts(receipts), receiptTrie) + if receiptsRoot != blockInfo.Header().ReceiptHash { + log.Error("Receipts root does not match", "blockHash", blockHash.String()) + return nil, err + } + + // Construct the proof of the request receipt against the trie. + key, err := rlp.EncodeToBytes(txIndex) + if err != nil { + log.Error("Failed to encode tx index", "err", err) + return nil, err + } + memoryDB := memorydb.New() + err = receiptTrie.Prove(key, memoryDB) + if err != nil { + log.Error("Failed to prove receipt", "err", err) + return nil, err + } + + // Double check that the proof is valid. + verifiedValue, err := subnetevmtrie.VerifyProof(receiptsRoot, key, memoryDB) + if err != nil { + log.Error("Failed to verify proof", "err", err) + return nil, err + } + log.Info("Verified proof", "value", hex.EncodeToString(verifiedValue)) + + return memoryDB, nil +} + +func EncodeMerkleProof(proofDB *memorydb.Database) [][]byte { + encodedProof := make([][]byte, 0) + it := proofDB.NewIterator(nil, nil) + for it.Next() { + encodedProof = append(encodedProof, it.Value()) + } + return encodedProof +} From cd1e5b5b32eb34caabfc5295addbcfdb3b4192ab Mon Sep 17 00:00:00 2001 From: Francois Hardrouyere Date: Fri, 20 Sep 2024 10:21:22 +0200 Subject: [PATCH 10/10] Construct unique teleporter message ID --- messages/chainlink/message_handler.go | 29 ++++++++++++++------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/messages/chainlink/message_handler.go b/messages/chainlink/message_handler.go index 3b4de5b8..21c5b464 100644 --- a/messages/chainlink/message_handler.go +++ b/messages/chainlink/message_handler.go @@ -19,6 +19,7 @@ import ( subnetTypes "github.com/ava-labs/subnet-evm/core/types" subnetEthclient "github.com/ava-labs/subnet-evm/ethclient" subnetInterfaces "github.com/ava-labs/subnet-evm/interfaces" + teleporterUtils "github.com/ava-labs/teleporter/utils/teleporter-utils" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" @@ -36,7 +37,6 @@ type ChainlinkMessageHandler struct { destinationBlockchainID ids.ID maxFilterAdresses uint64 aggregatorsToReplicas map[common.Address]common.Address - aggregators []common.Address } type ChainlinkMessageDecoder struct { @@ -210,19 +210,12 @@ func NewMessageHandlerFactory( } func (f *factory) NewMessageHandler(unsignedMessage *warp.UnsignedMessage) (messages.MessageHandler, error) { - aggregatorsToReplicas := f.config.AggregatorsToReplicas - aggregators := make([]common.Address, len(aggregatorsToReplicas)) - for aggregator := range aggregatorsToReplicas { - aggregators = append(aggregators, aggregator) - } - return &ChainlinkMessageHandler{ logger: f.logger, unsignedMessage: unsignedMessage, destinationBlockchainID: f.config.DestinationBlockchainID, maxFilterAdresses: f.config.MaxFilterAdresses, - aggregatorsToReplicas: aggregatorsToReplicas, - aggregators: aggregators, + aggregatorsToReplicas: f.config.AggregatorsToReplicas, }, nil } @@ -238,7 +231,20 @@ func (c *ChainlinkMessageHandler) SendMessage( signedMessage *warp.Message, destinationClient vms.DestinationClient, ) (common.Hash, error) { + var msg ChainlinkMessage + if err := json.Unmarshal(signedMessage.Payload, &msg); err != nil { + return common.Hash{}, err + } destinationBlockchainID := destinationClient.DestinationBlockchainID() + teleporterMessageID, err := teleporterUtils.CalculateMessageID( + msg.aggregator, + signedMessage.SourceChainID, + destinationBlockchainID, + msg.roundId, + ) + if err != nil { + return common.Hash{}, fmt.Errorf("failed to calculate Teleporter message ID: %w", err) + } c.logger.Info( "Sending message to destination chain", @@ -255,10 +261,6 @@ func (c *ChainlinkMessageHandler) SendMessage( ) return common.Hash{}, err } - var msg ChainlinkMessage - if err := json.Unmarshal(signedMessage.Payload, &msg); err != nil { - return common.Hash{}, err - } callData, err := eventimporter.PackImportEvent(msg.blockHeader, msg.txIndex, msg.receiptProof, msg.logIndex) if err != nil { c.logger.Error( @@ -295,7 +297,6 @@ func (c *ChainlinkMessageHandler) SendMessage( return common.Hash{}, err } - teleporterMessageID := ids.Empty // TODO // Wait for the message to be included in a block before returning err = messages.WaitForReceipt(c.logger, signedMessage, destinationClient, txHash, teleporterMessageID) if err != nil {