Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api] Re-send pending actions #4498

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type ActPool interface {
type Subscriber interface {
OnAdded(*action.SealedEnvelope)
OnRemoved(*action.SealedEnvelope)
OnRejected(context.Context, *action.SealedEnvelope, error)
}

// SortedActions is a slice of actions that implements sort.Interface to sort by Value.
Expand Down Expand Up @@ -281,7 +282,11 @@ func (ap *actPool) PendingActionMap() map[string][]*action.SealedEnvelope {
}

func (ap *actPool) Add(ctx context.Context, act *action.SealedEnvelope) error {
return ap.add(ctx, act)
err := ap.add(ctx, act)
if err != nil {
ap.onRejected(ctx, act, err)
}
return err
}

func (ap *actPool) add(ctx context.Context, act *action.SealedEnvelope) error {
Expand Down Expand Up @@ -586,6 +591,12 @@ func (ap *actPool) onRemoved(act *action.SealedEnvelope) {
}
}

func (ap *actPool) onRejected(ctx context.Context, act *action.SealedEnvelope, err error) {
for _, sub := range ap.subs {
sub.OnRejected(ctx, act, err)
}
}

type destinationMap struct {
mu sync.Mutex
acts map[string]map[hash.Hash256]*action.SealedEnvelope
Expand Down
2 changes: 2 additions & 0 deletions actpool/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,5 @@ func (v *blobValidator) OnRemoved(act *action.SealedEnvelope) {
}
v.blobCntPerAcc[sender]--
}

func (v *blobValidator) OnRejected(context.Context, *action.SealedEnvelope, error) {}
113 changes: 109 additions & 4 deletions api/action_radio.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package api
import (
"context"
"encoding/hex"
"errors"
"sync"
"time"

"github.com/iotexproject/go-pkgs/hash"
"github.com/iotexproject/iotex-proto/golang/iotextypes"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand All @@ -25,18 +29,43 @@ func WithMessageBatch() ActionRadioOption {
}
}

// WithRetry enables retry for action broadcast
func WithRetry(fetchFn func() chan *action.SealedEnvelope, max int, interval time.Duration) ActionRadioOption {
return func(ar *ActionRadio) {
ar.fetchFn = fetchFn
ar.retryMax = max
ar.retryInterval = interval
ar.tickInterval = time.Second * 10
}
}

// ActionRadio broadcasts actions to the network
type ActionRadio struct {
broadcastHandler BroadcastOutbound
messageBatcher *batch.Manager
chainID uint32
unconfirmedActs map[hash.Hash256]*radioAction
mutex sync.Mutex
Copy link
Member

@dustinxie dustinxie Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ThreadSafeLRU to hold unconfirmedActs, then we can remove the mutex?

quit chan struct{}
fetchFn func() chan *action.SealedEnvelope
retryMax int
retryInterval time.Duration
tickInterval time.Duration
}

type radioAction struct {
act *action.SealedEnvelope
lastRadioTime time.Time
retry int
}

// NewActionRadio creates a new ActionRadio
func NewActionRadio(broadcastHandler BroadcastOutbound, chainID uint32, opts ...ActionRadioOption) *ActionRadio {
ar := &ActionRadio{
broadcastHandler: broadcastHandler,
chainID: chainID,
unconfirmedActs: make(map[hash.Hash256]*radioAction),
quit: make(chan struct{}),
}
for _, opt := range opts {
opt(ar)
Expand All @@ -49,11 +78,28 @@ func (ar *ActionRadio) Start() error {
if ar.messageBatcher != nil {
return ar.messageBatcher.Start()
}
if ar.tickInterval > 0 {
go func() {
ticker := time.NewTicker(ar.tickInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ar.mutex.Lock()
ar.autoRadio()
ar.mutex.Unlock()
case <-ar.quit:
break
}
}
}()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are able to remove the mutex, this can be replaced by existing RecurringTask?

return nil
}

// Stop stops the action radio
func (ar *ActionRadio) Stop() error {
close(ar.quit)
if ar.messageBatcher != nil {
return ar.messageBatcher.Stop()
}
Expand All @@ -62,6 +108,58 @@ func (ar *ActionRadio) Stop() error {

// OnAdded broadcasts the action to the network
func (ar *ActionRadio) OnAdded(selp *action.SealedEnvelope) {
ar.mutex.Lock()
defer ar.mutex.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comments about removing mutex

ar.radio(selp)
}

// OnRemoved does nothing
func (ar *ActionRadio) OnRemoved(selp *action.SealedEnvelope) {
ar.mutex.Lock()
defer ar.mutex.Unlock()
hash, _ := selp.Hash()
delete(ar.unconfirmedActs, hash)
}

func (ar *ActionRadio) OnRejected(ctx context.Context, selp *action.SealedEnvelope, err error) {
if !errors.Is(err, action.ErrExistedInPool) {
return
}
if _, fromAPI := GetAPIContext(ctx); fromAPI {
// ignore action rejected from API
return
}
// retry+1 for action broadcast from other nodes, alleviate the network congestion
hash, _ := selp.Hash()
ar.mutex.Lock()
defer ar.mutex.Unlock()
if radioAct, ok := ar.unconfirmedActs[hash]; ok {
radioAct.retry++
radioAct.lastRadioTime = time.Now()
} else {
log.L().Warn("Found rejected action not in unconfirmedActs", zap.String("actionHash", hex.EncodeToString(hash[:])))
}
}

// autoRadio broadcasts long time pending actions periodically
func (ar *ActionRadio) autoRadio() {
now := time.Now()
for pending := range ar.fetchFn() {
hash, _ := pending.Hash()
if radioAct, ok := ar.unconfirmedActs[hash]; ok {
if radioAct.retry < ar.retryMax && now.Sub(radioAct.lastRadioTime) > ar.retryInterval {
ar.radio(radioAct.act)
}
continue
}
// wired case, add it to unconfirmedActs and broadcast it
log.L().Warn("Found missing pending action", zap.String("actionHash", hex.EncodeToString(hash[:])))
ar.radio(pending)
}
}

func (ar *ActionRadio) radio(selp *action.SealedEnvelope) {
// broadcast action
var (
hasSidecar = selp.BlobTxSidecar() != nil
hash, _ = selp.Hash()
Expand All @@ -85,9 +183,16 @@ func (ar *ActionRadio) OnAdded(selp *action.SealedEnvelope) {
err = ar.broadcastHandler(context.Background(), ar.chainID, out)
}
if err != nil {
log.L().Warn("Failed to broadcast SendAction request.", zap.Error(err), zap.String("actionHash", hex.EncodeToString(hash[:])))
log.L().Warn("Failed to broadcast action.", zap.Error(err), zap.String("actionHash", hex.EncodeToString(hash[:])))
}
// update unconfirmed action
if radio, ok := ar.unconfirmedActs[hash]; ok {
radio.lastRadioTime = time.Now()
radio.retry++
} else {
ar.unconfirmedActs[hash] = &radioAction{
act: selp,
lastRadioTime: time.Now(),
}
}
}

// OnRemoved does nothing
func (ar *ActionRadio) OnRemoved(act *action.SealedEnvelope) {}
85 changes: 85 additions & 0 deletions api/action_radio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package api
import (
"context"
"math/big"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -35,3 +37,86 @@ func TestActionRadio(t *testing.T) {
radio.OnAdded(selp)
r.Equal(uint64(1), atomic.LoadUint64(&broadcastCount))
}

func TestActionRadioRetry(t *testing.T) {
r := require.New(t)
broadcastCnt := uint64(0)
pendings := make([]*action.SealedEnvelope, 0)
mutex := sync.Mutex{}
ar := NewActionRadio(func(ctx context.Context, chainID uint32, msg proto.Message) error {
atomic.AddUint64(&broadcastCnt, 1)
return nil
}, 0, WithRetry(func() chan *action.SealedEnvelope {
ch := make(chan *action.SealedEnvelope, 1)
go func() {
mutex.Lock()
for _, p := range pendings {
ch <- p
}
mutex.Unlock()
close(ch)
}()
return ch
}, 3, 20*time.Millisecond))
ar.tickInterval = 10 * time.Millisecond

r.NoError(ar.Start())
defer ar.Stop()

setPending := func(acts ...*action.SealedEnvelope) {
mutex.Lock()
pendings = acts
mutex.Unlock()
}

tsf1, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 1, big.NewInt(1), nil, 10000, big.NewInt(1))
r.NoError(err)
tsf2, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 2, big.NewInt(1), nil, 10000, big.NewInt(1))
r.NoError(err)
tsf3, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 3, big.NewInt(1), nil, 10000, big.NewInt(1))
r.NoError(err)

// -- case 1: retry pending actions at most 3 times
r.Equal(uint64(0), atomic.LoadUint64(&broadcastCnt))
// add first action
ar.OnAdded(tsf1)
r.Equal(uint64(1), atomic.LoadUint64(&broadcastCnt))
// add second action
ar.OnAdded(tsf2)
r.Equal(uint64(2), atomic.LoadUint64(&broadcastCnt))
// set tsf1 as pending
time.Sleep(ar.retryInterval)
setPending(tsf1)
// first retry after interval
time.Sleep(ar.retryInterval)
r.Equal(uint64(3), atomic.LoadUint64(&broadcastCnt))
// retry 3 at most
time.Sleep(ar.retryInterval * 10)
r.Equal(uint64(5), atomic.LoadUint64(&broadcastCnt))
// tsf1 confirmed
setPending()
ar.OnRemoved(tsf1)

// -- case 2: retry + 1 if receive again
setPending(tsf2)
// first retry after interval
time.Sleep(ar.retryInterval)
r.Equal(uint64(6), atomic.LoadUint64(&broadcastCnt))
// receive tsf2 again and retry+1
ar.OnRejected(context.Background(), tsf2, action.ErrExistedInPool)
time.Sleep(ar.retryInterval * 10)
r.Equal(uint64(7), atomic.LoadUint64(&broadcastCnt))

// -- case 3: ignore if receive again from API
ar.OnAdded(tsf3)
r.Equal(uint64(8), atomic.LoadUint64(&broadcastCnt))
time.Sleep(ar.retryInterval)
setPending(tsf3)
// first retry after interval
time.Sleep(ar.retryInterval)
r.Equal(uint64(9), atomic.LoadUint64(&broadcastCnt))
// receive tsf3 again from API
ar.OnRejected(WithAPIContext(context.Background()), tsf3, action.ErrExistedInPool)
time.Sleep(ar.retryInterval * 10)
r.Equal(uint64(11), atomic.LoadUint64(&broadcastCnt))
}
14 changes: 14 additions & 0 deletions api/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
type (
streamContextKey struct{}

apiContextKey struct{}

StreamContext struct {
listenerIDs map[string]struct{}
mutex sync.Mutex
Expand Down Expand Up @@ -46,3 +48,15 @@ func StreamFromContext(ctx context.Context) (*StreamContext, bool) {
sc, ok := ctx.Value(streamContextKey{}).(*StreamContext)
return sc, ok
}

func WithAPIContext(ctx context.Context) context.Context {
return context.WithValue(ctx, apiContextKey{}, struct{}{})
}

func GetAPIContext(ctx context.Context) (struct{}, bool) {
c := ctx.Value(apiContextKey{})
if c == nil {
return struct{}{}, false
}
return c.(struct{}), true
}
15 changes: 14 additions & 1 deletion api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,19 @@ func newCoreService(
}

if core.broadcastHandler != nil {
core.actionRadio = NewActionRadio(core.broadcastHandler, core.bc.ChainID(), WithMessageBatch())
core.actionRadio = NewActionRadio(core.broadcastHandler, core.bc.ChainID(), WithMessageBatch(), WithRetry(func() chan *action.SealedEnvelope {
// fetch the exactly pending action of all accounts
pendings := make(chan *action.SealedEnvelope, 100)
go func() {
for _, pendingAcc := range core.ap.PendingActionMap() {
if len(pendingAcc) > 0 {
pendings <- pendingAcc[0]
}
}
close(pendings)
}()
return pendings
}, 3, time.Minute))
actPool.AddSubscriber(core.actionRadio)
}

Expand Down Expand Up @@ -471,6 +483,7 @@ func (core *coreService) SendAction(ctx context.Context, in *iotextypes.Action)
return "", err
}
l := log.Logger("api").With(zap.String("actionHash", hex.EncodeToString(hash[:])))
ctx = WithAPIContext(ctx)
if err = core.ap.Add(ctx, selp); err != nil {
txBytes, serErr := proto.Marshal(in)
if serErr != nil {
Expand Down
Loading