From 3040907d3f890ff91317bc3f0d90a97c542e633d Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Wed, 8 Jan 2025 13:50:27 +0800 Subject: [PATCH 1/3] support reserved stream list in p2p stream manager --- p2p/stream/common/streammanager/config.go | 2 + .../common/streammanager/streammanager.go | 100 ++++++++++++++---- 2 files changed, 83 insertions(+), 19 deletions(-) diff --git a/p2p/stream/common/streammanager/config.go b/p2p/stream/common/streammanager/config.go index 671b174f36..808add96c1 100644 --- a/p2p/stream/common/streammanager/config.go +++ b/p2p/stream/common/streammanager/config.go @@ -10,6 +10,8 @@ const ( discTimeout = 10 * time.Second // connectTimeout is the timeout for setting up a stream with a discovered peer connectTimeout = 60 * time.Second + // MaxReservedStreams is the maximum number of reserved streams + MaxReservedStreams = 100 ) // Config is the config for stream manager diff --git a/p2p/stream/common/streammanager/streammanager.go b/p2p/stream/common/streammanager/streammanager.go index 0bfc9bbd5b..49374029b9 100644 --- a/p2p/stream/common/streammanager/streammanager.go +++ b/p2p/stream/common/streammanager/streammanager.go @@ -44,6 +44,8 @@ type streamManager struct { // Note that it could happen that remote node does not share exactly the same // protocol ID (e.g. different version) streams *streamSet + // reserved streams + reservedStreams *streamSet // libp2p utilities host host pf peerFinder @@ -85,22 +87,23 @@ func newStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStrea } return &streamManager{ - myProtoID: pid, - myProtoSpec: protoSpec, - config: c, - streams: newStreamSet(), - host: host, - pf: pf, - handleStream: handleStream, - addStreamCh: make(chan addStreamTask), - rmStreamCh: make(chan rmStreamTask), - stopCh: make(chan stopTask), - discCh: make(chan discTask, 1), // discCh is a buffered channel to avoid overuse of goroutine - coolDown: abool.New(), - coolDownCache: newCoolDownCache(), - logger: logger, - ctx: ctx, - cancel: cancel, + myProtoID: pid, + myProtoSpec: protoSpec, + config: c, + streams: newStreamSet(), + reservedStreams: newStreamSet(), + host: host, + pf: pf, + handleStream: handleStream, + addStreamCh: make(chan addStreamTask), + rmStreamCh: make(chan rmStreamTask), + stopCh: make(chan stopTask), + discCh: make(chan discTask, 1), // discCh is a buffered channel to avoid overuse of goroutine + coolDown: abool.New(), + coolDownCache: newCoolDownCache(), + logger: logger, + ctx: ctx, + cancel: cancel, } } @@ -212,6 +215,16 @@ func (sm *streamManager) GetStreamByID(id sttypes.StreamID) (sttypes.Stream, boo return sm.streams.get(id) } +// GetReservedStreams return the reserved streams. +func (sm *streamManager) GetReservedStreams() []sttypes.Stream { + return sm.reservedStreams.getStreams() +} + +// NumReservedStreams return the number of reserved streams. +func (sm *streamManager) NumReservedStreams() int { + return sm.reservedStreams.size() +} + type ( addStreamTask struct { st sttypes.Stream @@ -254,12 +267,23 @@ func (sm *streamManager) sanityCheckStream(st sttypes.Stream) error { func (sm *streamManager) handleAddStream(st sttypes.Stream) error { id := st.ID() - if sm.streams.size() >= sm.config.HiCap { - return ErrTooManyStreams - } + // check if stream exists if _, ok := sm.streams.get(id); ok { return ErrStreamAlreadyExist } + if _, ok := sm.reservedStreams.get(id); ok { + return ErrStreamAlreadyExist + } + if sm.streams.size() >= sm.config.HiCap { + if sm.reservedStreams.size() < MaxReservedStreams { + if _, ok := sm.reservedStreams.get(id); ok { + return nil + } + sm.reservedStreams.addStream(st) + return nil + } + return ErrTooManyStreams + } sm.streams.addStream(st) @@ -269,6 +293,22 @@ func (sm *streamManager) handleAddStream(st sttypes.Stream) error { return nil } +func (sm *streamManager) addStreamFromReserved(count int) (int, error) { + if sm.reservedStreams.size() == 0 { + return 0, errors.New("reserved streams list is empty") + } + added := 0 + for added < count && sm.reservedStreams.size() > 0 { + st, err := sm.reservedStreams.popStream() + if err != nil { + return added, err + } + sm.streams.addStream(st) + added++ + } + return added, nil +} + func (sm *streamManager) handleRemoveStream(id sttypes.StreamID) error { st, ok := sm.streams.get(id) if !ok { @@ -276,6 +316,17 @@ func (sm *streamManager) handleRemoveStream(id sttypes.StreamID) error { } sm.streams.deleteStream(st) + + // try to replace removed streams from reserved list + requiredStreams := sm.hardRequiredStreams() + if added, err := sm.addStreamFromReserved(requiredStreams); added > 0 { + sm.logger.Info(). + Err(err). // in case if some new streams added and others failed + Int("requiredStreams", requiredStreams). + Int("added", added). + Msg("added new streams from reserved list") + } + // if stream number is smaller than HardLoCap, spin up the discover if !sm.hardHaveEnoughStream() { select { @@ -330,6 +381,9 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e if _, ok := sm.streams.get(sttypes.StreamID(peer.ID)); ok { continue } + if _, ok := sm.reservedStreams.get(sttypes.StreamID(peer.ID)); ok { + continue + } discoveredPeersCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc() connecting += 1 go func(pid libp2p_peer.ID) { @@ -401,3 +455,11 @@ func (sm *streamManager) hardHaveEnoughStream() bool { availStreams := sm.streams.numStreamsWithMinProtoSpec(sm.myProtoSpec) return availStreams >= sm.config.HardLoCap } + +func (sm *streamManager) hardRequiredStreams() int { + availStreams := sm.streams.numStreamsWithMinProtoSpec(sm.myProtoSpec) + if availStreams >= sm.config.HardLoCap { + return 0 + } + return sm.config.HardLoCap - availStreams +} From 5a4616d494c23c6f39b12df12e957572b9d4695c Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Wed, 8 Jan 2025 13:50:57 +0800 Subject: [PATCH 2/3] add test for reserved stream list --- .../common/streammanager/interface_test.go | 8 +-- .../streammanager/streammanager_test.go | 55 +++++++++++++++++++ p2p/stream/common/streammanager/streamset.go | 15 +++++ 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/p2p/stream/common/streammanager/interface_test.go b/p2p/stream/common/streammanager/interface_test.go index 368716e8a6..a98ff882f4 100644 --- a/p2p/stream/common/streammanager/interface_test.go +++ b/p2p/stream/common/streammanager/interface_test.go @@ -22,10 +22,10 @@ var ( ) const ( - defHardLoCap = 16 // discovery trigger immediately when size smaller than this number - defSoftLoCap = 32 // discovery trigger for routine check - defHiCap = 128 // Hard cap of the stream number - defDiscBatch = 16 // batch size for discovery + defHardLoCap = 16 // discovery trigger immediately when size smaller than this number + defSoftLoCap = 32 // discovery trigger for routine check + defHiCap = 64 // Hard cap of the stream number + defDiscBatch = 16 // batch size for discovery ) var defConfig = Config{ diff --git a/p2p/stream/common/streammanager/streammanager_test.go b/p2p/stream/common/streammanager/streammanager_test.go index 2b49a5f16c..f00f8f0ce0 100644 --- a/p2p/stream/common/streammanager/streammanager_test.go +++ b/p2p/stream/common/streammanager/streammanager_test.go @@ -145,6 +145,61 @@ func TestStreamManager_HandleNewStream(t *testing.T) { } } +func TestStreamManager_ReservedStreams(t *testing.T) { + sm := newTestStreamManager() + sm.Start() + time.Sleep(defTestWait) + + if sm.streams.size() != defHardLoCap { + t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHardLoCap) + } + if sm.reservedStreams.size() != 0 { + t.Errorf("unexpected reserved stream size: %v / %v", sm.reservedStreams.size(), 0) + } + + // Add more streams to get to Hi Cap + for i := sm.streams.size() + 1; i <= defHiCap; i++ { + stream := newTestStream(makeStreamID(i), testProtoID) + if err := sm.NewStream(stream); err != nil { + t.Errorf("unexpected add stream error: %v", err) + } + } + if sm.streams.size() != defHiCap { + t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap) + } + if sm.reservedStreams.size() != 0 { + t.Errorf("unexpected reserved stream size after getting to hi cap: %v / %v", sm.reservedStreams.size(), 0) + } + + // Add more streams to fill up reserved list + for i := sm.streams.size() + 1; i <= MaxReservedStreams+defHiCap; i++ { + stream := newTestStream(makeStreamID(i), testProtoID) + if err := sm.NewStream(stream); err != nil { + t.Errorf("unexpected add stream error: %v", err) + } + } + if sm.streams.size() != defHiCap { + t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap) + } + if sm.reservedStreams.size() != MaxReservedStreams { + t.Errorf("unexpected reserved stream size after getting to hi cap: %v / %v", sm.reservedStreams.size(), MaxReservedStreams) + } + + // try to add one more stream, it should be rejected + stream := newTestStream(makeStreamID(1234), testProtoID) + err := sm.NewStream(stream) + if assErr := assertError(err, ErrTooManyStreams); assErr != nil { + t.Errorf("unexpected add stream error: %v", err) + } + if sm.streams.size() != defHiCap { + t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap) + } + if sm.reservedStreams.size() != MaxReservedStreams { + t.Errorf("unexpected reserved stream size after getting to hi cap: %v / %v", sm.reservedStreams.size(), MaxReservedStreams) + } + +} + func TestStreamManager_HandleRemoveStream(t *testing.T) { tests := []struct { id sttypes.StreamID diff --git a/p2p/stream/common/streammanager/streamset.go b/p2p/stream/common/streammanager/streamset.go index c8d086b00d..5a1f84e01f 100644 --- a/p2p/stream/common/streammanager/streamset.go +++ b/p2p/stream/common/streammanager/streamset.go @@ -4,6 +4,7 @@ import ( "sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/pkg/errors" ) // streamSet is the concurrency safe stream set. @@ -95,6 +96,20 @@ func (ss *streamSet) getStreams() []sttypes.Stream { return res } +func (ss *streamSet) popStream() (sttypes.Stream, error) { + ss.lock.RLock() + defer ss.lock.RUnlock() + + if len(ss.streams) == 0 { + return nil, errors.New("no available stream") + } + for id, stream := range ss.streams { + delete(ss.streams, id) + return stream, nil + } + return nil, errors.New("pop stream failed") +} + func (ss *streamSet) numStreamsWithMinProtoSpec(minSpec sttypes.ProtoSpec) int { ss.lock.RLock() defer ss.lock.RUnlock() From bb63cf8f315d969a7955c465c8f85434a036ab72 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Wed, 8 Jan 2025 19:12:41 +0800 Subject: [PATCH 3/3] improve handling new streams for reserved list, improve test comments --- p2p/stream/common/streammanager/streammanager.go | 10 ++++------ p2p/stream/common/streammanager/streammanager_test.go | 6 +++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/p2p/stream/common/streammanager/streammanager.go b/p2p/stream/common/streammanager/streammanager.go index 49374029b9..1396b2ea5e 100644 --- a/p2p/stream/common/streammanager/streammanager.go +++ b/p2p/stream/common/streammanager/streammanager.go @@ -271,15 +271,13 @@ func (sm *streamManager) handleAddStream(st sttypes.Stream) error { if _, ok := sm.streams.get(id); ok { return ErrStreamAlreadyExist } - if _, ok := sm.reservedStreams.get(id); ok { - return ErrStreamAlreadyExist - } + + // If the stream list has sufficient capacity, the stream can be added to the reserved list if sm.streams.size() >= sm.config.HiCap { if sm.reservedStreams.size() < MaxReservedStreams { - if _, ok := sm.reservedStreams.get(id); ok { - return nil + if _, ok := sm.reservedStreams.get(id); !ok { + sm.reservedStreams.addStream(st) } - sm.reservedStreams.addStream(st) return nil } return ErrTooManyStreams diff --git a/p2p/stream/common/streammanager/streammanager_test.go b/p2p/stream/common/streammanager/streammanager_test.go index f00f8f0ce0..a898fa765d 100644 --- a/p2p/stream/common/streammanager/streammanager_test.go +++ b/p2p/stream/common/streammanager/streammanager_test.go @@ -168,7 +168,7 @@ func TestStreamManager_ReservedStreams(t *testing.T) { t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap) } if sm.reservedStreams.size() != 0 { - t.Errorf("unexpected reserved stream size after getting to hi cap: %v / %v", sm.reservedStreams.size(), 0) + t.Errorf("unexpected reserved stream size after reaching the high capacity limit: %v / %v", sm.reservedStreams.size(), 0) } // Add more streams to fill up reserved list @@ -182,7 +182,7 @@ func TestStreamManager_ReservedStreams(t *testing.T) { t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap) } if sm.reservedStreams.size() != MaxReservedStreams { - t.Errorf("unexpected reserved stream size after getting to hi cap: %v / %v", sm.reservedStreams.size(), MaxReservedStreams) + t.Errorf("unexpected reserved stream size after retrieving the maximum allowed reserved streams: %v / %v", sm.reservedStreams.size(), MaxReservedStreams) } // try to add one more stream, it should be rejected @@ -195,7 +195,7 @@ func TestStreamManager_ReservedStreams(t *testing.T) { t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap) } if sm.reservedStreams.size() != MaxReservedStreams { - t.Errorf("unexpected reserved stream size after getting to hi cap: %v / %v", sm.reservedStreams.size(), MaxReservedStreams) + t.Errorf("unexpected reserved stream size after attempting to add a stream beyond the allowed limit: %v / %v", sm.reservedStreams.size(), MaxReservedStreams) } }