Skip to content

Commit

Permalink
Merge pull request #89 from perun-network/84-use-generic-subs
Browse files Browse the repository at this point in the history
Use EventSub instead of Filter/Watch.
  • Loading branch information
matthiasgeihs authored Jun 16, 2021
2 parents 7eaa521 + 69ab9e0 commit f5f7b3e
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 227 deletions.
77 changes: 55 additions & 22 deletions backend/ethereum/bindings/abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package bindings

import (
"fmt"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi"
Expand All @@ -27,37 +28,69 @@ import (
"perun.network/go-perun/backend/ethereum/bindings/trivialapp"
)

// This file contains all the parsed ABI definitions of our contracts.
// ABI contains all the parsed ABI definitions of our contracts.
// Use it together with `bind.NewBoundContract` to create a bound contract.
var ABI = struct {
// ERC20Token is the parsed ABI definition of contract ERC20Token.
ERC20Token abi.ABI
// Adjudicator is the parsed ABI definition of contract Adjudicator.
Adjudicator abi.ABI
// AssetHolder is the parsed ABI definition of contract AssetHolder.
AssetHolder abi.ABI
// ETHAssetHolder is the parsed ABI definition of contract ETHAssetHolder.
ETHAssetHolder abi.ABI
// ERC20AssetHolder is the parsed ABI definition of contract ERC20AssetHolder.
ERC20AssetHolder abi.ABI
// TrivialApp is the parsed ABI definition of contract TrivialApp.
TrivialApp abi.ABI
}{}

var (
// ERC20TokenABI is the parsed ABI definition of contract ERC20Token.
ERC20TokenABI abi.ABI
// AdjudicatorABI is the parsed ABI definition of contract Adjudicator.
AdjudicatorABI abi.ABI
// AssetHolderABI is the parsed ABI definition of contract AssetHolder.
AssetHolderABI abi.ABI
// ETHAssetHolderABI is the parsed ABI definition of contract ETHAssetHolder.
ETHAssetHolderABI abi.ABI
// ERC20AssetHolderABI is the parsed ABI definition of contract ERC20AssetHolder.
ERC20AssetHolderABI abi.ABI
// TrivialAppABI is the parsed ABI definition of contract TrivialApp.
TrivialAppABI abi.ABI
)
// Events contains the event names for specific events.
var Events = struct {
// AdjChannelUpdate is the ChannelUpdate event of the Adjudicator contract.
AdjChannelUpdate string
// AhDeposited is the Deposited event of the Assetholder contract.
AhDeposited string
// AhWithdrawn is the Withdrawn event of the Assetholder contract.
AhWithdrawn string
// ERC20Approval is the Approval event of the ERC20Token contract.
ERC20Approval string
}{}

func init() {
parseABI := func(raw string) abi.ABI {
parseABIs()
extractEvents()
}

func parseABIs() {
parse := func(raw string) abi.ABI {
abi, err := abi.JSON(strings.NewReader(raw))
if err != nil {
panic(err)
}
return abi
}

ERC20TokenABI = parseABI(peruntoken.ERC20ABI)
AdjudicatorABI = parseABI(adjudicator.AdjudicatorABI)
AssetHolderABI = parseABI(assetholder.AssetHolderABI)
ETHAssetHolderABI = parseABI(assetholdereth.AssetHolderETHABI)
ERC20AssetHolderABI = parseABI(assetholdererc20.AssetHolderERC20ABI)
TrivialAppABI = parseABI(trivialapp.TrivialAppABI)
ABI.ERC20Token = parse(peruntoken.ERC20ABI)
ABI.Adjudicator = parse(adjudicator.AdjudicatorABI)
ABI.AssetHolder = parse(assetholder.AssetHolderABI)
ABI.ETHAssetHolder = parse(assetholdereth.AssetHolderETHABI)
ABI.ERC20AssetHolder = parse(assetholdererc20.AssetHolderERC20ABI)
ABI.TrivialApp = parse(trivialapp.TrivialAppABI)
}

// extractEvents sets the event names and panics if any event does not exist.
func extractEvents() {
extract := func(abi abi.ABI, eName string) string {
e, ok := abi.Events[eName]
if !ok {
panic(fmt.Sprintf("Event '%s' not found.", eName))
}
return e.Name
}

Events.AdjChannelUpdate = extract(ABI.Adjudicator, "ChannelUpdate")
Events.AhDeposited = extract(ABI.AssetHolder, "Deposited")
Events.AhWithdrawn = extract(ABI.AssetHolder, "Withdrawn")
Events.ERC20Approval = extract(ABI.ERC20Token, "Approval")
}
4 changes: 4 additions & 0 deletions backend/ethereum/channel/adjudicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"

"perun.network/go-perun/backend/ethereum/bindings"
"perun.network/go-perun/backend/ethereum/bindings/adjudicator"
cherrors "perun.network/go-perun/backend/ethereum/channel/errors"
"perun.network/go-perun/channel"
Expand All @@ -40,6 +41,7 @@ var _ channel.Adjudicator = (*Adjudicator)(nil)
type Adjudicator struct {
ContractBackend
contract *adjudicator.Adjudicator
bound *bind.BoundContract
// The address to which we send all funds.
Receiver common.Address
// Structured logger
Expand All @@ -57,9 +59,11 @@ func NewAdjudicator(backend ContractBackend, contract common.Address, receiver c
if err != nil {
panic("Could not create a new instance of adjudicator")
}
bound := bind.NewBoundContract(contract, bindings.ABI.Adjudicator, backend, backend, backend)
return &Adjudicator{
ContractBackend: backend,
contract: contr,
bound: bound,
Receiver: receiver,
txSender: txSender,
log: log.WithField("txSender", txSender.Address),
Expand Down
122 changes: 68 additions & 54 deletions backend/ethereum/channel/conclude.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"

"perun.network/go-perun/backend/ethereum/bindings"
"perun.network/go-perun/backend/ethereum/bindings/adjudicator"
cherrors "perun.network/go-perun/backend/ethereum/channel/errors"
"perun.network/go-perun/backend/ethereum/subscription"
"perun.network/go-perun/channel"
)

Expand All @@ -31,35 +33,36 @@ const secondaryWaitBlocks = 2
// ensureConcluded ensures that conclude or concludeFinal (for non-final and
// final states, resp.) is called on the adjudicator.
// - a subscription on Concluded events is established
// - it searches for a past concluded event
// - it searches for a past concluded event by calling `isConcluded`
// - if found, channel is already concluded and success is returned
// - if none found, conclude/concludeFinal is called on the adjudicator
// - it waits for a Concluded event from the blockchain.
func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.AdjudicatorReq, subStates channel.StateMap) error {
// Listen for Concluded event.
watchOpts, err := a.NewWatchOpts(ctx)
sub, err := subscription.NewEventSub(ctx, a.ContractBackend, a.bound, updateEventType(req.Params.ID()), startBlockOffset)
if err != nil {
return errors.WithMessage(err, "creating watchOpts")
return errors.WithMessage(err, "subscribing")
}
events := make(chan *adjudicator.AdjudicatorChannelUpdate)
sub, err := a.contract.WatchChannelUpdate(watchOpts, events, [][32]byte{req.Params.ID()})
if err != nil {
err = cherrors.CheckIsChainNotReachableError(err)
return errors.WithMessage(err, "creating subscription failed")
}
defer sub.Unsubscribe()

if found, err := a.filterConcluded(ctx, req.Params.ID()); err != nil {
return errors.WithMessage(err, "filtering old Concluded events")
} else if found {
defer sub.Close()
// Check whether it is already concluded.
if concluded, err := a.isConcluded(ctx, sub); err != nil {
return errors.WithMessage(err, "isConcluded")
} else if concluded {
return nil
}

events := make(chan *subscription.Event, 10)
subErr := make(chan error, 1)
waitCtx, cancel := context.WithCancel(ctx)
go func() {
subErr <- sub.Read(ctx, events)
cancel()
}()

// In final Register calls, as the non-initiator, we optimistically wait for
// the other party to send the transaction first for secondaryWaitBlocks many
// blocks.
if req.Tx.IsFinal && req.Secondary {
isConcluded, err := waitConcludedForNBlocks(ctx, a, sub, events, secondaryWaitBlocks)
isConcluded, err := waitConcludedForNBlocks(waitCtx, a, events, secondaryWaitBlocks)
if err != nil {
return err
} else if isConcluded {
Expand All @@ -79,14 +82,51 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
return err
}

select {
case <-events:
return nil
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context cancelled")
case err = <-sub.Err():
err = cherrors.CheckIsChainNotReachableError(err)
return errors.Wrap(err, "subscription error")
for {
select {
case _e := <-events:
e := _e.Data.(*adjudicator.AdjudicatorChannelUpdate)
if e.Phase == phaseConcluded {
return nil
}
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context cancelled")
case err = <-subErr:
if err != nil {
return errors.WithMessage(err, "subscription error")
}
return errors.New("subscription closed")
}
}
}

// isConcluded returns whether a channel is already concluded.
func (a *Adjudicator) isConcluded(ctx context.Context, sub *subscription.EventSub) (bool, error) {
events := make(chan *subscription.Event, 10)
subErr := make(chan error, 1)
// Write the events into events.
go func() {
defer close(events)
subErr <- sub.ReadPast(ctx, events)
}()
// Read all events and check for concluded.
for _e := range events {
e := _e.Data.(*adjudicator.AdjudicatorChannelUpdate)
if e.Phase == phaseConcluded {
return true, nil
}
}
return false, errors.WithMessage(<-subErr, "reading past events")
}

func updateEventType(channelID [32]byte) subscription.EventFactory {
return func() *subscription.Event {
return &subscription.Event{
Name: bindings.Events.AdjChannelUpdate,
Data: new(adjudicator.AdjudicatorChannelUpdate),
// In the best case we could already filter for 'Concluded' phase only here.
Filter: [][]interface{}{{channelID}},
}
}
}

Expand All @@ -98,11 +138,10 @@ func (a *Adjudicator) ensureConcluded(ctx context.Context, req channel.Adjudicat
// the Concluded event subscription instance.
func waitConcludedForNBlocks(ctx context.Context,
cr ethereum.ChainReader,
sub ethereum.Subscription,
concluded chan *adjudicator.AdjudicatorChannelUpdate,
concluded chan *subscription.Event,
numBlocks int,
) (bool, error) {
h := make(chan *types.Header)
h := make(chan *types.Header, 10)
hsub, err := cr.SubscribeNewHead(ctx, h)
if err != nil {
err = cherrors.CheckIsChainNotReachableError(err)
Expand All @@ -112,7 +151,8 @@ func waitConcludedForNBlocks(ctx context.Context,
for i := 0; i < numBlocks; i++ {
select {
case <-h: // do nothing, wait another block
case e := <-concluded: // other participant performed transaction
case _e := <-concluded: // other participant performed transaction
e := _e.Data.(*adjudicator.AdjudicatorChannelUpdate)
if e.Phase == phaseConcluded {
return true, nil
}
Expand All @@ -121,33 +161,7 @@ func waitConcludedForNBlocks(ctx context.Context,
case err = <-hsub.Err():
err = cherrors.CheckIsChainNotReachableError(err)
return false, errors.WithMessage(err, "header subscription error")
case err = <-sub.Err():
err = cherrors.CheckIsChainNotReachableError(err)
return false, errors.WithMessage(err, "concluded subscription error")
}
}
return false, nil
}

// filterConcluded returns whether there has been a Concluded event in the past.
func (a *Adjudicator) filterConcluded(ctx context.Context, channelID channel.ID) (bool, error) {
filterOpts, err := a.NewFilterOpts(ctx)
if err != nil {
return false, err
}
iter, err := a.contract.FilterChannelUpdate(filterOpts, [][32]byte{channelID})
if err != nil {
err = cherrors.CheckIsChainNotReachableError(err)
return false, errors.WithMessage(err, "creating iterator")
}

found := false
for iter.Next() {
if iter.Event.Phase == phaseConcluded {
found = true
break
}
}

return found, nil
}
4 changes: 2 additions & 2 deletions backend/ethereum/channel/funder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"

"perun.network/go-perun/backend/ethereum/bindings"
"perun.network/go-perun/backend/ethereum/bindings/assetholder"
"perun.network/go-perun/backend/ethereum/subscription"
"perun.network/go-perun/backend/ethereum/wallet"
Expand Down Expand Up @@ -240,7 +241,7 @@ func (f *Funder) depositedSub(ctx context.Context, contract *bind.BoundContract,
}
event := func() *subscription.Event {
return &subscription.Event{
Name: "Deposited",
Name: bindings.Events.AhDeposited,
Data: new(assetholder.AssetHolderDeposited),
Filter: [][]interface{}{filter},
}
Expand All @@ -252,7 +253,6 @@ func (f *Funder) depositedSub(ctx context.Context, contract *bind.BoundContract,
// waitForFundingConfirmation waits for the confirmation events on the blockchain that
// both we and all peers successfully funded the channel for the specified asset
// according to the funding agreement.
// nolint: funlen
func (f *Funder) waitForFundingConfirmation(ctx context.Context, request channel.FundingReq, asset assetHolder, fundingIDs [][32]byte) error {
deposited := make(chan *subscription.Event)
subErr := make(chan error, 1)
Expand Down
Loading

0 comments on commit f5f7b3e

Please sign in to comment.