From 4506a8d028d4f91a6fa6a78bfdd52289006ff2ae Mon Sep 17 00:00:00 2001 From: envestcc Date: Wed, 20 Nov 2024 18:48:22 +0800 Subject: [PATCH 1/4] pending action retry --- api/action_radio.go | 88 ++++++++++++++++++++++++++++++++++++++++++--- api/coreservice.go | 14 +++++++- 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/api/action_radio.go b/api/action_radio.go index 6beda0ea06..8b1e5a577d 100644 --- a/api/action_radio.go +++ b/api/action_radio.go @@ -3,7 +3,10 @@ package api import ( "context" "encoding/hex" + "sync" + "time" + "github.com/iotexproject/go-pkgs/hash" "github.com/iotexproject/iotex-proto/golang/iotextypes" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -25,11 +28,32 @@ func WithMessageBatch() ActionRadioOption { } } +// WithRetry enables retry for action broadcast +func WithRetry(fetchFn func() chan *action.SealedEnvelope, max int, interval time.Duration) ActionRadioOption { + return func(ar *ActionRadio) { + ar.fetchFn = fetchFn + ar.retryMax = max + ar.retryInterval = interval + } +} + // ActionRadio broadcasts actions to the network type ActionRadio struct { broadcastHandler BroadcastOutbound messageBatcher *batch.Manager chainID uint32 + unconfirmedActs map[hash.Hash256]*radioAction + mutex sync.Mutex + quit chan struct{} + fetchFn func() chan *action.SealedEnvelope + retryMax int + retryInterval time.Duration +} + +type radioAction struct { + act *action.SealedEnvelope + lastRadioTime time.Time + retry int } // NewActionRadio creates a new ActionRadio @@ -37,6 +61,8 @@ func NewActionRadio(broadcastHandler BroadcastOutbound, chainID uint32, opts ... ar := &ActionRadio{ broadcastHandler: broadcastHandler, chainID: chainID, + unconfirmedActs: make(map[hash.Hash256]*radioAction), + quit: make(chan struct{}), } for _, opt := range opts { opt(ar) @@ -49,11 +75,26 @@ func (ar *ActionRadio) Start() error { if ar.messageBatcher != nil { return ar.messageBatcher.Start() } + go func() { + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + for { + select { + case <-ticker.C: + ar.mutex.Lock() + ar.autoRadio() + ar.mutex.Unlock() + case <-ar.quit: + break + } + } + }() return nil } // Stop stops the action radio func (ar *ActionRadio) Stop() error { + close(ar.quit) if ar.messageBatcher != nil { return ar.messageBatcher.Stop() } @@ -62,6 +103,38 @@ func (ar *ActionRadio) Stop() error { // OnAdded broadcasts the action to the network func (ar *ActionRadio) OnAdded(selp *action.SealedEnvelope) { + ar.mutex.Lock() + defer ar.mutex.Unlock() + ar.radio(selp) +} + +// OnRemoved does nothing +func (ar *ActionRadio) OnRemoved(selp *action.SealedEnvelope) { + ar.mutex.Lock() + defer ar.mutex.Unlock() + hash, _ := selp.Hash() + delete(ar.unconfirmedActs, hash) +} + +// autoRadio broadcasts long time pending actions periodically +func (ar *ActionRadio) autoRadio() { + now := time.Now() + for pending := range ar.fetchFn() { + hash, _ := pending.Hash() + if radioAct, ok := ar.unconfirmedActs[hash]; ok { + if radioAct.retry < ar.retryMax && now.Sub(radioAct.lastRadioTime) > ar.retryInterval { + ar.radio(radioAct.act) + } + continue + } + // wired case, add it to unconfirmedActs and broadcast it + log.L().Warn("Found missing pending action", zap.String("actionHash", hex.EncodeToString(hash[:]))) + ar.radio(pending) + } +} + +func (ar *ActionRadio) radio(selp *action.SealedEnvelope) { + // broadcast action var ( hasSidecar = selp.BlobTxSidecar() != nil hash, _ = selp.Hash() @@ -85,9 +158,16 @@ func (ar *ActionRadio) OnAdded(selp *action.SealedEnvelope) { err = ar.broadcastHandler(context.Background(), ar.chainID, out) } if err != nil { - log.L().Warn("Failed to broadcast SendAction request.", zap.Error(err), zap.String("actionHash", hex.EncodeToString(hash[:]))) + log.L().Warn("Failed to broadcast action.", zap.Error(err), zap.String("actionHash", hex.EncodeToString(hash[:]))) + } + // update unconfirmed action + if radio, ok := ar.unconfirmedActs[hash]; ok { + radio.lastRadioTime = time.Now() + radio.retry++ + } else { + ar.unconfirmedActs[hash] = &radioAction{ + act: selp, + lastRadioTime: time.Now(), + } } } - -// OnRemoved does nothing -func (ar *ActionRadio) OnRemoved(act *action.SealedEnvelope) {} diff --git a/api/coreservice.go b/api/coreservice.go index e8203aae6b..7f20a38a28 100644 --- a/api/coreservice.go +++ b/api/coreservice.go @@ -296,7 +296,19 @@ func newCoreService( } if core.broadcastHandler != nil { - core.actionRadio = NewActionRadio(core.broadcastHandler, core.bc.ChainID(), WithMessageBatch()) + core.actionRadio = NewActionRadio(core.broadcastHandler, core.bc.ChainID(), WithMessageBatch(), WithRetry(func() chan *action.SealedEnvelope { + // fetch the exactly pending action of all accounts + pendings := make(chan *action.SealedEnvelope, 100) + go func() { + for _, pendingAcc := range core.ap.PendingActionMap() { + if len(pendingAcc) > 0 { + pendings <- pendingAcc[0] + } + } + close(pendings) + }() + return pendings + }, 3, time.Minute)) actPool.AddSubscriber(core.actionRadio) } From 9ffdbf69e22d70e3ee9e47660c62571ce9792195 Mon Sep 17 00:00:00 2001 From: envestcc Date: Thu, 21 Nov 2024 11:53:05 +0800 Subject: [PATCH 2/4] alleviate network congestion --- actpool/actpool.go | 13 ++++++++++++- actpool/validator.go | 2 ++ api/action_radio.go | 21 +++++++++++++++++++++ api/context.go | 14 ++++++++++++++ api/coreservice.go | 1 + 5 files changed, 50 insertions(+), 1 deletion(-) diff --git a/actpool/actpool.go b/actpool/actpool.go index 5716435a2b..32bfaeb947 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -89,6 +89,7 @@ type ActPool interface { type Subscriber interface { OnAdded(*action.SealedEnvelope) OnRemoved(*action.SealedEnvelope) + OnRejected(context.Context, *action.SealedEnvelope, error) } // SortedActions is a slice of actions that implements sort.Interface to sort by Value. @@ -281,7 +282,11 @@ func (ap *actPool) PendingActionMap() map[string][]*action.SealedEnvelope { } func (ap *actPool) Add(ctx context.Context, act *action.SealedEnvelope) error { - return ap.add(ctx, act) + err := ap.add(ctx, act) + if err != nil { + ap.onRejected(ctx, act, err) + } + return err } func (ap *actPool) add(ctx context.Context, act *action.SealedEnvelope) error { @@ -586,6 +591,12 @@ func (ap *actPool) onRemoved(act *action.SealedEnvelope) { } } +func (ap *actPool) onRejected(ctx context.Context, act *action.SealedEnvelope, err error) { + for _, sub := range ap.subs { + sub.OnRejected(ctx, act, err) + } +} + type destinationMap struct { mu sync.Mutex acts map[string]map[hash.Hash256]*action.SealedEnvelope diff --git a/actpool/validator.go b/actpool/validator.go index 201af354c8..813b854108 100644 --- a/actpool/validator.go +++ b/actpool/validator.go @@ -61,3 +61,5 @@ func (v *blobValidator) OnRemoved(act *action.SealedEnvelope) { } v.blobCntPerAcc[sender]-- } + +func (v *blobValidator) OnRejected(context.Context, *action.SealedEnvelope, error) {} diff --git a/api/action_radio.go b/api/action_radio.go index 8b1e5a577d..c3d7ddd3c3 100644 --- a/api/action_radio.go +++ b/api/action_radio.go @@ -3,6 +3,7 @@ package api import ( "context" "encoding/hex" + "errors" "sync" "time" @@ -116,6 +117,26 @@ func (ar *ActionRadio) OnRemoved(selp *action.SealedEnvelope) { delete(ar.unconfirmedActs, hash) } +func (ar *ActionRadio) OnRejected(ctx context.Context, selp *action.SealedEnvelope, err error) { + if !errors.Is(err, action.ErrExistedInPool) { + return + } + if _, fromAPI := GetAPIContext(ctx); fromAPI { + // ignore action rejected from API + return + } + // retry+1 for action broadcast from other nodes, alleviate the network congestion + hash, _ := selp.Hash() + ar.mutex.Lock() + defer ar.mutex.Unlock() + if radioAct, ok := ar.unconfirmedActs[hash]; ok { + radioAct.retry++ + radioAct.lastRadioTime = time.Now() + } else { + log.L().Warn("Found rejected action not in unconfirmedActs", zap.String("actionHash", hex.EncodeToString(hash[:]))) + } +} + // autoRadio broadcasts long time pending actions periodically func (ar *ActionRadio) autoRadio() { now := time.Now() diff --git a/api/context.go b/api/context.go index d772ad6644..185009394b 100644 --- a/api/context.go +++ b/api/context.go @@ -8,6 +8,8 @@ import ( type ( streamContextKey struct{} + apiContextKey struct{} + StreamContext struct { listenerIDs map[string]struct{} mutex sync.Mutex @@ -46,3 +48,15 @@ func StreamFromContext(ctx context.Context) (*StreamContext, bool) { sc, ok := ctx.Value(streamContextKey{}).(*StreamContext) return sc, ok } + +func WithAPIContext(ctx context.Context) context.Context { + return context.WithValue(ctx, apiContextKey{}, struct{}{}) +} + +func GetAPIContext(ctx context.Context) (struct{}, bool) { + c := ctx.Value(apiContextKey{}) + if c == nil { + return struct{}{}, false + } + return c.(struct{}), true +} diff --git a/api/coreservice.go b/api/coreservice.go index 7f20a38a28..1e4ff6f8ee 100644 --- a/api/coreservice.go +++ b/api/coreservice.go @@ -483,6 +483,7 @@ func (core *coreService) SendAction(ctx context.Context, in *iotextypes.Action) return "", err } l := log.Logger("api").With(zap.String("actionHash", hex.EncodeToString(hash[:]))) + ctx = WithAPIContext(ctx) if err = core.ap.Add(ctx, selp); err != nil { txBytes, serErr := proto.Marshal(in) if serErr != nil { From 17ad2187556e38aa5463ed76454c4941166180b3 Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 22 Nov 2024 11:57:05 +0800 Subject: [PATCH 3/4] fix --- api/action_radio.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/api/action_radio.go b/api/action_radio.go index c3d7ddd3c3..8dfce38eea 100644 --- a/api/action_radio.go +++ b/api/action_radio.go @@ -35,6 +35,7 @@ func WithRetry(fetchFn func() chan *action.SealedEnvelope, max int, interval tim ar.fetchFn = fetchFn ar.retryMax = max ar.retryInterval = interval + ar.tickInterval = time.Second * 10 } } @@ -49,6 +50,7 @@ type ActionRadio struct { fetchFn func() chan *action.SealedEnvelope retryMax int retryInterval time.Duration + tickInterval time.Duration } type radioAction struct { @@ -76,20 +78,22 @@ func (ar *ActionRadio) Start() error { if ar.messageBatcher != nil { return ar.messageBatcher.Start() } - go func() { - ticker := time.NewTicker(time.Second * 10) - defer ticker.Stop() - for { - select { - case <-ticker.C: - ar.mutex.Lock() - ar.autoRadio() - ar.mutex.Unlock() - case <-ar.quit: - break + if ar.tickInterval > 0 { + go func() { + ticker := time.NewTicker(ar.tickInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + ar.mutex.Lock() + ar.autoRadio() + ar.mutex.Unlock() + case <-ar.quit: + break + } } - } - }() + }() + } return nil } From acb48b795419ed5fa5a0e80f563e9fb4c1c83ebe Mon Sep 17 00:00:00 2001 From: envestcc Date: Fri, 22 Nov 2024 12:00:01 +0800 Subject: [PATCH 4/4] add test --- api/action_radio_test.go | 85 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/api/action_radio_test.go b/api/action_radio_test.go index 1fec4274d3..f39d363801 100644 --- a/api/action_radio_test.go +++ b/api/action_radio_test.go @@ -3,8 +3,10 @@ package api import ( "context" "math/big" + "sync" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -35,3 +37,86 @@ func TestActionRadio(t *testing.T) { radio.OnAdded(selp) r.Equal(uint64(1), atomic.LoadUint64(&broadcastCount)) } + +func TestActionRadioRetry(t *testing.T) { + r := require.New(t) + broadcastCnt := uint64(0) + pendings := make([]*action.SealedEnvelope, 0) + mutex := sync.Mutex{} + ar := NewActionRadio(func(ctx context.Context, chainID uint32, msg proto.Message) error { + atomic.AddUint64(&broadcastCnt, 1) + return nil + }, 0, WithRetry(func() chan *action.SealedEnvelope { + ch := make(chan *action.SealedEnvelope, 1) + go func() { + mutex.Lock() + for _, p := range pendings { + ch <- p + } + mutex.Unlock() + close(ch) + }() + return ch + }, 3, 20*time.Millisecond)) + ar.tickInterval = 10 * time.Millisecond + + r.NoError(ar.Start()) + defer ar.Stop() + + setPending := func(acts ...*action.SealedEnvelope) { + mutex.Lock() + pendings = acts + mutex.Unlock() + } + + tsf1, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 1, big.NewInt(1), nil, 10000, big.NewInt(1)) + r.NoError(err) + tsf2, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 2, big.NewInt(1), nil, 10000, big.NewInt(1)) + r.NoError(err) + tsf3, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 3, big.NewInt(1), nil, 10000, big.NewInt(1)) + r.NoError(err) + + // -- case 1: retry pending actions at most 3 times + r.Equal(uint64(0), atomic.LoadUint64(&broadcastCnt)) + // add first action + ar.OnAdded(tsf1) + r.Equal(uint64(1), atomic.LoadUint64(&broadcastCnt)) + // add second action + ar.OnAdded(tsf2) + r.Equal(uint64(2), atomic.LoadUint64(&broadcastCnt)) + // set tsf1 as pending + time.Sleep(ar.retryInterval) + setPending(tsf1) + // first retry after interval + time.Sleep(ar.retryInterval) + r.Equal(uint64(3), atomic.LoadUint64(&broadcastCnt)) + // retry 3 at most + time.Sleep(ar.retryInterval * 10) + r.Equal(uint64(5), atomic.LoadUint64(&broadcastCnt)) + // tsf1 confirmed + setPending() + ar.OnRemoved(tsf1) + + // -- case 2: retry + 1 if receive again + setPending(tsf2) + // first retry after interval + time.Sleep(ar.retryInterval) + r.Equal(uint64(6), atomic.LoadUint64(&broadcastCnt)) + // receive tsf2 again and retry+1 + ar.OnRejected(context.Background(), tsf2, action.ErrExistedInPool) + time.Sleep(ar.retryInterval * 10) + r.Equal(uint64(7), atomic.LoadUint64(&broadcastCnt)) + + // -- case 3: ignore if receive again from API + ar.OnAdded(tsf3) + r.Equal(uint64(8), atomic.LoadUint64(&broadcastCnt)) + time.Sleep(ar.retryInterval) + setPending(tsf3) + // first retry after interval + time.Sleep(ar.retryInterval) + r.Equal(uint64(9), atomic.LoadUint64(&broadcastCnt)) + // receive tsf3 again from API + ar.OnRejected(WithAPIContext(context.Background()), tsf3, action.ErrExistedInPool) + time.Sleep(ar.retryInterval * 10) + r.Equal(uint64(11), atomic.LoadUint64(&broadcastCnt)) +}