Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an option to execute all messages with src or dest matching target chain(s) #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.18-alpine as builder
FROM golang:1.19-alpine as builder

RUN apk add --no-cache g++ git
# override git so go can access private repos
Expand All @@ -17,4 +17,4 @@ FROM alpine:latest
VOLUME /executor/env
WORKDIR /executor/env
COPY --from=builder /im-executor/bin/executor /usr/local/bin
CMD ["/bin/sh", "-c", "executor start --test --loglevel debug --home /executor/env 2> /executor/env/app.log"]
CMD ["/bin/sh", "-c", "executor start --test --loglevel debug --home /executor/env 2> /executor/env/app.log"]
8 changes: 5 additions & 3 deletions accounts/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ func newAccount(conf *AccountConfig) *Account {
transactors := make(map[uint64]*ethutils.Transactor)
signers := make(map[uint64]ethutils.Signer)
address := eth.ZeroAddr
for _, contract := range conf.ReceiverContracts {
chain := chains.GetChainMustExist(contract.ChainId)
for _, id := range chains.GetChainIDs() {
chain := chains.GetChainMustExist(id)
if _, ok := transactors[chain.ChainID]; !ok {
transactor, signer := newTransactor(chain, conf.Keystore, conf.Passphrase)
transactors[chain.ChainID] = transactor
signers[chain.ChainID] = signer
address = transactor.Address()
}
}

err := conf.SenderGroups.Validate()
if err != nil {
log.Fatalf("cannot initialize account: %s", err.Error())
Expand Down Expand Up @@ -77,7 +78,8 @@ func (a *Account) IsSenderAllowed(sender, receiver *contracts.ContractAddress) b
}
recvContract, ok := a.ReceiverContract(receiver)
if !ok {
log.Errorf("unable to check sender, receiver contract %s not found", receiver)
log.Warnf("unable to check sender, receiver contract %s not found", receiver)
return true
}
// if no allowed sender groups are configured, it is defaulted to allowed
if len(recvContract.AllowSenderGroups) == 0 {
Expand Down
8 changes: 8 additions & 0 deletions chains/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ func GetChainMustExist(chid uint64) *Chain {
return chain
}

func GetChainIDs() []uint64 {
var ids []uint64;
for _, chain := range chains.chains {
ids = append(ids, chain.ChainID)
}
return ids
}

func StartMonitoring(filters contracts.ReceiverContracts, signers map[uint64][]eth.Addr) {
for _, chain := range chains.chains {
go chain.startMonitoringEvents(filters)
Expand Down
2 changes: 1 addition & 1 deletion contracts/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c ReceiverContracts) String() string {

func (c ReceiverContracts) Validate() error {
if len(c) == 0 {
return fmt.Errorf("empty executor contract filter")
log.Warnf("empty executor contract filter")
}
log.Infoln("executor will submit execution for these contracts:")
for _, f := range c {
Expand Down
12 changes: 9 additions & 3 deletions executor/execution_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func (e *Executor) executeMessage(x *Execution) {
tx, err := x.Transactor.Transact(
newTransactionCallback(id, "execute message"),
func(transactor bind.ContractTransactor, opts *bind.TransactOpts) (*gethtypes.Transaction, error) {
setValue(opts, x.Receiver.PayableValue)
if x.Receiver != nil {
setValue(opts, x.Receiver.PayableValue)
}
method := func() (*gethtypes.Transaction, error) {
return x.ExecuteMessage(opts, msg, route, sigs, signers, powers)
}
Expand Down Expand Up @@ -73,7 +75,9 @@ func (e *Executor) executeDelayedMessage(x *Execution) {
tx, err := x.Transactor.Transact(
newDelayTransactionCallback(delayId, "execute delayed message"),
func(transactor bind.ContractTransactor, opts *bind.TransactOpts) (*gethtypes.Transaction, error) {
setValue(opts, x.Receiver.PayableValue)
if x.Receiver != nil {
setValue(opts, x.Receiver.PayableValue)
}
method := func() (*gethtypes.Transaction, error) {
return x.ExecuteDelayedMessage(opts, dm.Adapter, dm.SrcContract, dm.SrcChainID, dm.DstContract, dm.Calldata, dm.Nonce)
}
Expand Down Expand Up @@ -128,7 +132,9 @@ func (e *Executor) executeMessageWithTransfer(x *Execution) {
tx, err := x.Transactor.Transact(
newTransactionCallback(id, "execute message with transfer"),
func(transactor bind.ContractTransactor, opts *bind.TransactOpts) (*gethtypes.Transaction, error) {
setValue(opts, x.Receiver.PayableValue)
if x.Receiver != nil {
setValue(opts, x.Receiver.PayableValue)
}
method := func() (*gethtypes.Transaction, error) {
return x.Chain.MessageBus.ExecuteMessageWithTransfer(opts, msg, xfer, sigs, signers, powers)
}
Expand Down
21 changes: 11 additions & 10 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func (e *Executor) startFetchingExecCtxsFromSgn() {
log.Infoln("Start fetching execution contexts from SGN")
for {
time.Sleep(8 * time.Second)
execCtxs, err := e.sgn.GetExecutionContexts(e.accounts.ReceiverContracts())
// get all execution contexts for any enabled chain
execCtxs, err := e.sgn.GetExecutionContexts()
if err != nil {
log.Errorln("failed to get messages", err)
continue
Expand All @@ -85,6 +86,10 @@ func (e *Executor) startFetchingExecCtxsFromSgn() {
log.Tracef("Got %d execution contexts", len(execCtxs))
execCtxsToSave := []*msgtypes.ExecutionContext{}
for i := range execCtxs {
// TODO: process only messages to or from target chain, e.g., sapphire testnet (0x5aff)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: add a config option for target chain

//if execCtxs[i].Message.SrcChainId == 0x5aff || execCtxs[i].Message.DstChainId == 0x5aff {
// execCtxsToSave = append(execCtxsToSave, &execCtxs[i])
//}
execCtxsToSave = append(execCtxsToSave, &execCtxs[i])
}
db.SaveExecutionContexts(execCtxsToSave)
Expand Down Expand Up @@ -277,12 +282,11 @@ func (e *Executor) DelayedMessageToExecution(delayedMessages []*dal.DelayedMessa
func (e *Executor) newExecution(msgId common.Hash, sender, receiver *contracts.ContractAddress, chain *chains.Chain, record *models.ExecutionRecord, delayedMessage *dal.DelayedMessage, gasLimit uint64) (*Execution, error) {
acc, ok := e.accounts.AccountByReceiver(receiver)
if !ok {
if record != nil {
dal.GetDB().UpdateStatus(msgId.Bytes(), types.ExecutionStatus_Ignored)
} else {
dal.GetDB().UpdateDelayStatus(msgId, types.ExecutionStatus_Ignored)
// get the default account
acc, ok = e.accounts.AccountByID("")
if !ok {
return nil, fmt.Errorf("no account configured")
}
return nil, fmt.Errorf("ignoring message/delayed-message with id/delayId %x: cannot find account by receiver %s", msgId, receiver)
}
allowed := acc.IsSenderAllowed(sender, receiver)
if !allowed {
Expand All @@ -297,10 +301,7 @@ func (e *Executor) newExecution(msgId common.Hash, sender, receiver *contracts.C
if !ok {
return nil, fmt.Errorf("transactor not registered for chainId %d", chain.ChainID)
}
recvContract, ok := acc.ReceiverContract(receiver)
if !ok {
return nil, fmt.Errorf("receiver contract not registered for %s", receiver)
}
recvContract, _ := acc.ReceiverContract(receiver)
bal, err := chain.EthClient.PendingBalanceAt(context.Background(), acc.Address)
if err != nil {
log.Debugf("failed to query balance for account %s on chain %d", acc.ID, chain.ChainID)
Expand Down
17 changes: 14 additions & 3 deletions sgn/sgn_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,29 @@ import (
"context"

"github.com/celer-network/goutils/log"
"github.com/celer-network/im-executor/contracts"
"github.com/celer-network/im-executor/chains"
"github.com/celer-network/im-executor/sgn-v2/eth"
comtypes "github.com/celer-network/im-executor/sgn-v2/common/types"
cbrtypes "github.com/celer-network/im-executor/sgn-v2/x/cbridge/types"
msgtypes "github.com/celer-network/im-executor/sgn-v2/x/message/types"
pegbrtypes "github.com/celer-network/im-executor/sgn-v2/x/pegbridge/types"
"github.com/celer-network/im-executor/types"
)

func (c *SgnClient) GetExecutionContexts(filters contracts.ReceiverContracts) ([]msgtypes.ExecutionContext, error) {
func (c *SgnClient) GetExecutionContexts() ([]msgtypes.ExecutionContext, error) {
qc := msgtypes.NewQueryClient(c.grpcConn)

// filter execution contexts for all supported chains
filter := []*comtypes.ContractInfo{}
for _, id := range chains.GetChainIDs() {
info := &comtypes.ContractInfo{
ChainId: id,
}
filter = append(filter, info)
}

req := &msgtypes.QueryExecutionContextsRequest{
ContractInfos: filters.ContractInfoList(),
ContractInfos: filter,
}
ctx, cancel := context.WithTimeout(context.Background(), types.GatewayTimeout)
defer cancel()
Expand Down