Skip to content

Commit 45f7028

Browse files
committed
alleviate network congestion
1 parent 74a8d5d commit 45f7028

File tree

5 files changed

+50
-1
lines changed

5 files changed

+50
-1
lines changed

actpool/actpool.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ type ActPool interface {
8989
type Subscriber interface {
9090
OnAdded(*action.SealedEnvelope)
9191
OnRemoved(*action.SealedEnvelope)
92+
OnRejected(context.Context, *action.SealedEnvelope, error)
9293
}
9394

9495
// SortedActions is a slice of actions that implements sort.Interface to sort by Value.
@@ -281,7 +282,11 @@ func (ap *actPool) PendingActionMap() map[string][]*action.SealedEnvelope {
281282
}
282283

283284
func (ap *actPool) Add(ctx context.Context, act *action.SealedEnvelope) error {
284-
return ap.add(ctx, act)
285+
err := ap.add(ctx, act)
286+
if err != nil {
287+
ap.onRejected(ctx, act, err)
288+
}
289+
return err
285290
}
286291

287292
func (ap *actPool) add(ctx context.Context, act *action.SealedEnvelope) error {
@@ -586,6 +591,12 @@ func (ap *actPool) onRemoved(act *action.SealedEnvelope) {
586591
}
587592
}
588593

594+
func (ap *actPool) onRejected(ctx context.Context, act *action.SealedEnvelope, err error) {
595+
for _, sub := range ap.subs {
596+
sub.OnRejected(ctx, act, err)
597+
}
598+
}
599+
589600
type destinationMap struct {
590601
mu sync.Mutex
591602
acts map[string]map[hash.Hash256]*action.SealedEnvelope

actpool/validator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,5 @@ func (v *blobValidator) OnRemoved(act *action.SealedEnvelope) {
6161
}
6262
v.blobCntPerAcc[sender]--
6363
}
64+
65+
func (v *blobValidator) OnRejected(context.Context, *action.SealedEnvelope, error) {}

api/action_radio.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api
33
import (
44
"context"
55
"encoding/hex"
6+
"errors"
67
"sync"
78
"time"
89

@@ -117,6 +118,26 @@ func (ar *ActionRadio) OnRemoved(selp *action.SealedEnvelope) {
117118
delete(ar.unconfirmedActs, hash)
118119
}
119120

121+
func (ar *ActionRadio) OnRejected(ctx context.Context, selp *action.SealedEnvelope, err error) {
122+
if !errors.Is(err, action.ErrExistedInPool) {
123+
return
124+
}
125+
if _, fromAPI := GetAPIContext(ctx); fromAPI {
126+
// ignore action rejected from API
127+
return
128+
}
129+
// retry+1 for action broadcast from other nodes, alleviate the network congestion
130+
hash, _ := selp.Hash()
131+
ar.mutex.Lock()
132+
defer ar.mutex.Unlock()
133+
if radioAct, ok := ar.unconfirmedActs[hash]; ok {
134+
radioAct.retry++
135+
radioAct.lastRadioTime = time.Now()
136+
} else {
137+
log.L().Warn("Found rejected action not in unconfirmedActs", zap.String("actionHash", hex.EncodeToString(hash[:])))
138+
}
139+
}
140+
120141
// autoRadio broadcasts long time pending actions periodically
121142
func (ar *ActionRadio) autoRadio() {
122143
now := time.Now()

api/context.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
type (
99
streamContextKey struct{}
1010

11+
apiContextKey struct{}
12+
1113
StreamContext struct {
1214
listenerIDs map[string]struct{}
1315
mutex sync.Mutex
@@ -46,3 +48,15 @@ func StreamFromContext(ctx context.Context) (*StreamContext, bool) {
4648
sc, ok := ctx.Value(streamContextKey{}).(*StreamContext)
4749
return sc, ok
4850
}
51+
52+
func WithAPIContext(ctx context.Context) context.Context {
53+
return context.WithValue(ctx, apiContextKey{}, struct{}{})
54+
}
55+
56+
func GetAPIContext(ctx context.Context) (struct{}, bool) {
57+
c := ctx.Value(apiContextKey{})
58+
if c == nil {
59+
return struct{}{}, false
60+
}
61+
return c.(struct{}), true
62+
}

api/coreservice.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,7 @@ func (core *coreService) SendAction(ctx context.Context, in *iotextypes.Action)
483483
return "", err
484484
}
485485
l := log.Logger("api").With(zap.String("actionHash", hex.EncodeToString(hash[:])))
486+
ctx = WithAPIContext(ctx)
486487
if err = core.ap.Add(ctx, selp); err != nil {
487488
txBytes, serErr := proto.Marshal(in)
488489
if serErr != nil {

0 commit comments

Comments
 (0)