Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance P2P Streams with Reserved Stream Replacement #4826

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions p2p/stream/common/streammanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (
discTimeout = 10 * time.Second
// connectTimeout is the timeout for setting up a stream with a discovered peer
connectTimeout = 60 * time.Second
// MaxReservedStreams is the maximum number of reserved streams
MaxReservedStreams = 100
)

// Config is the config for stream manager
Expand Down
8 changes: 4 additions & 4 deletions p2p/stream/common/streammanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ var (
)

const (
defHardLoCap = 16 // discovery trigger immediately when size smaller than this number
defSoftLoCap = 32 // discovery trigger for routine check
defHiCap = 128 // Hard cap of the stream number
defDiscBatch = 16 // batch size for discovery
defHardLoCap = 16 // discovery trigger immediately when size smaller than this number
defSoftLoCap = 32 // discovery trigger for routine check
defHiCap = 64 // Hard cap of the stream number
defDiscBatch = 16 // batch size for discovery
)

var defConfig = Config{
Expand Down
98 changes: 79 additions & 19 deletions p2p/stream/common/streammanager/streammanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type streamManager struct {
// Note that it could happen that remote node does not share exactly the same
// protocol ID (e.g. different version)
streams *streamSet
// reserved streams
reservedStreams *streamSet
// libp2p utilities
host host
pf peerFinder
Expand Down Expand Up @@ -85,22 +87,23 @@ func newStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStrea
}

return &streamManager{
myProtoID: pid,
myProtoSpec: protoSpec,
config: c,
streams: newStreamSet(),
host: host,
pf: pf,
handleStream: handleStream,
addStreamCh: make(chan addStreamTask),
rmStreamCh: make(chan rmStreamTask),
stopCh: make(chan stopTask),
discCh: make(chan discTask, 1), // discCh is a buffered channel to avoid overuse of goroutine
coolDown: abool.New(),
coolDownCache: newCoolDownCache(),
logger: logger,
ctx: ctx,
cancel: cancel,
myProtoID: pid,
myProtoSpec: protoSpec,
config: c,
streams: newStreamSet(),
reservedStreams: newStreamSet(),
host: host,
pf: pf,
handleStream: handleStream,
addStreamCh: make(chan addStreamTask),
rmStreamCh: make(chan rmStreamTask),
stopCh: make(chan stopTask),
discCh: make(chan discTask, 1), // discCh is a buffered channel to avoid overuse of goroutine
coolDown: abool.New(),
coolDownCache: newCoolDownCache(),
logger: logger,
ctx: ctx,
cancel: cancel,
}
}

Expand Down Expand Up @@ -212,6 +215,16 @@ func (sm *streamManager) GetStreamByID(id sttypes.StreamID) (sttypes.Stream, boo
return sm.streams.get(id)
}

// GetReservedStreams return the reserved streams.
func (sm *streamManager) GetReservedStreams() []sttypes.Stream {
return sm.reservedStreams.getStreams()
}

// NumReservedStreams return the number of reserved streams.
func (sm *streamManager) NumReservedStreams() int {
return sm.reservedStreams.size()
}

type (
addStreamTask struct {
st sttypes.Stream
Expand Down Expand Up @@ -254,13 +267,22 @@ func (sm *streamManager) sanityCheckStream(st sttypes.Stream) error {

func (sm *streamManager) handleAddStream(st sttypes.Stream) error {
id := st.ID()
if sm.streams.size() >= sm.config.HiCap {
return ErrTooManyStreams
}
// check if stream exists
if _, ok := sm.streams.get(id); ok {
return ErrStreamAlreadyExist
}

// If the stream list has sufficient capacity, the stream can be added to the reserved list
if sm.streams.size() >= sm.config.HiCap {
if sm.reservedStreams.size() < MaxReservedStreams {
if _, ok := sm.reservedStreams.get(id); !ok {
sm.reservedStreams.addStream(st)
}
return nil
}
return ErrTooManyStreams
}

sm.streams.addStream(st)

sm.addStreamFeed.Send(EvtStreamAdded{st})
Expand All @@ -269,13 +291,40 @@ func (sm *streamManager) handleAddStream(st sttypes.Stream) error {
return nil
}

func (sm *streamManager) addStreamFromReserved(count int) (int, error) {
if sm.reservedStreams.size() == 0 {
return 0, errors.New("reserved streams list is empty")
}
added := 0
for added < count && sm.reservedStreams.size() > 0 {
st, err := sm.reservedStreams.popStream()
if err != nil {
return added, err
}
sm.streams.addStream(st)
added++
}
return added, nil
}

func (sm *streamManager) handleRemoveStream(id sttypes.StreamID) error {
st, ok := sm.streams.get(id)
if !ok {
return ErrStreamAlreadyRemoved
}

sm.streams.deleteStream(st)

// try to replace removed streams from reserved list
requiredStreams := sm.hardRequiredStreams()
if added, err := sm.addStreamFromReserved(requiredStreams); added > 0 {
sm.logger.Info().
Err(err). // in case if some new streams added and others failed
Int("requiredStreams", requiredStreams).
Int("added", added).
Msg("added new streams from reserved list")
}

// if stream number is smaller than HardLoCap, spin up the discover
if !sm.hardHaveEnoughStream() {
select {
Expand Down Expand Up @@ -330,6 +379,9 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e
if _, ok := sm.streams.get(sttypes.StreamID(peer.ID)); ok {
continue
}
if _, ok := sm.reservedStreams.get(sttypes.StreamID(peer.ID)); ok {
continue
}
discoveredPeersCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc()
connecting += 1
go func(pid libp2p_peer.ID) {
Expand Down Expand Up @@ -401,3 +453,11 @@ func (sm *streamManager) hardHaveEnoughStream() bool {
availStreams := sm.streams.numStreamsWithMinProtoSpec(sm.myProtoSpec)
return availStreams >= sm.config.HardLoCap
}

func (sm *streamManager) hardRequiredStreams() int {
availStreams := sm.streams.numStreamsWithMinProtoSpec(sm.myProtoSpec)
if availStreams >= sm.config.HardLoCap {
return 0
}
return sm.config.HardLoCap - availStreams
}
55 changes: 55 additions & 0 deletions p2p/stream/common/streammanager/streammanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,61 @@ func TestStreamManager_HandleNewStream(t *testing.T) {
}
}

func TestStreamManager_ReservedStreams(t *testing.T) {
sm := newTestStreamManager()
sm.Start()
time.Sleep(defTestWait)

if sm.streams.size() != defHardLoCap {
t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHardLoCap)
}
if sm.reservedStreams.size() != 0 {
t.Errorf("unexpected reserved stream size: %v / %v", sm.reservedStreams.size(), 0)
}

// Add more streams to get to Hi Cap
for i := sm.streams.size() + 1; i <= defHiCap; i++ {
stream := newTestStream(makeStreamID(i), testProtoID)
if err := sm.NewStream(stream); err != nil {
t.Errorf("unexpected add stream error: %v", err)
}
}
if sm.streams.size() != defHiCap {
t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap)
}
if sm.reservedStreams.size() != 0 {
t.Errorf("unexpected reserved stream size after reaching the high capacity limit: %v / %v", sm.reservedStreams.size(), 0)
}

// Add more streams to fill up reserved list
for i := sm.streams.size() + 1; i <= MaxReservedStreams+defHiCap; i++ {
stream := newTestStream(makeStreamID(i), testProtoID)
if err := sm.NewStream(stream); err != nil {
t.Errorf("unexpected add stream error: %v", err)
}
}
if sm.streams.size() != defHiCap {
t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap)
}
if sm.reservedStreams.size() != MaxReservedStreams {
t.Errorf("unexpected reserved stream size after retrieving the maximum allowed reserved streams: %v / %v", sm.reservedStreams.size(), MaxReservedStreams)
}

// try to add one more stream, it should be rejected
stream := newTestStream(makeStreamID(1234), testProtoID)
err := sm.NewStream(stream)
if assErr := assertError(err, ErrTooManyStreams); assErr != nil {
t.Errorf("unexpected add stream error: %v", err)
}
if sm.streams.size() != defHiCap {
t.Errorf("unexpected stream size: %v / %v", sm.streams.size(), defHiCap)
}
if sm.reservedStreams.size() != MaxReservedStreams {
t.Errorf("unexpected reserved stream size after attempting to add a stream beyond the allowed limit: %v / %v", sm.reservedStreams.size(), MaxReservedStreams)
}

}

func TestStreamManager_HandleRemoveStream(t *testing.T) {
tests := []struct {
id sttypes.StreamID
Expand Down
15 changes: 15 additions & 0 deletions p2p/stream/common/streammanager/streamset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/pkg/errors"
)

// streamSet is the concurrency safe stream set.
Expand Down Expand Up @@ -95,6 +96,20 @@ func (ss *streamSet) getStreams() []sttypes.Stream {
return res
}

func (ss *streamSet) popStream() (sttypes.Stream, error) {
ss.lock.RLock()
defer ss.lock.RUnlock()

if len(ss.streams) == 0 {
return nil, errors.New("no available stream")
}
for id, stream := range ss.streams {
delete(ss.streams, id)
return stream, nil
}
return nil, errors.New("pop stream failed")
}

func (ss *streamSet) numStreamsWithMinProtoSpec(minSpec sttypes.ProtoSpec) int {
ss.lock.RLock()
defer ss.lock.RUnlock()
Expand Down