Skip to content

Commit

Permalink
update Failures option names, remove beaconNode from stream protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
GheisMohammadi committed Oct 1, 2024
1 parent b1760d5 commit f708c17
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 75 deletions.
2 changes: 1 addition & 1 deletion p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (host *HostV2) AddStreamProtocol(protocols ...sttypes.Protocol) {
host.streamProtos = append(host.streamProtos, proto)
host.h.SetStreamHandlerMatch(protocol.ID(proto.ProtoID()), proto.Match, proto.HandleStream)
// TODO: do we need to add handler match for shard proto id?
// if proto.IsBeaconNode() {
// if proto.IsBeaconValidator() {
// host.h.SetStreamHandlerMatch(protocol.ID(proto.ShardProtoID()), proto.Match, proto.HandleStream)
// }
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/stream/common/requestmanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (st *testStream) CloseOnExit() error {
return nil
}

func (st *testStream) FailedTimes() int {
func (st *testStream) Failures() int {
return 0
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/stream/common/streammanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (st *testStream) ReadBytes() ([]byte, error) {
return nil, nil
}

func (st *testStream) FailedTimes() int {
func (st *testStream) Failures() int {
return 0
}

Expand Down
12 changes: 7 additions & 5 deletions p2p/stream/common/streammanager/streammanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStrea
protoSpec, _ := sttypes.ProtoIDToProtoSpec(pid)

// if it is a beacon node or shard node, print the peer id and proto id
if protoSpec.BeaconNode || protoSpec.ShardID != shard.BeaconChainShardID {
if protoSpec.IsBeaconValidator || protoSpec.ShardID != shard.BeaconChainShardID {
fmt.Println("My peer id: ", host.ID().String())
fmt.Println("My proto id: ", pid)
}
Expand Down Expand Up @@ -344,10 +344,12 @@ func (sm *streamManager) discoverAndSetupStream(discCtx context.Context) (int, e
}

func (sm *streamManager) discover(ctx context.Context) (<-chan libp2p_peer.AddrInfo, error) {
numStreams := sm.streams.size()

protoID := sm.targetProtoID()
discBatch := sm.config.DiscBatch
if sm.config.HiCap-sm.streams.size() < sm.config.DiscBatch {
discBatch = sm.config.HiCap - sm.streams.size()
if sm.config.HiCap-numStreams < sm.config.DiscBatch {
discBatch = sm.config.HiCap - numStreams
}
if discBatch < 0 {
return nil, nil
Expand All @@ -364,7 +366,7 @@ func (sm *streamManager) discover(ctx context.Context) (<-chan libp2p_peer.AddrI
func (sm *streamManager) targetProtoID() string {
targetSpec := sm.myProtoSpec
if targetSpec.ShardID == shard.BeaconChainShardID { // for beacon chain, only connect to beacon nodes
targetSpec.BeaconNode = true
targetSpec.IsBeaconValidator = true
}
return string(targetSpec.ToProtoID())
}
Expand Down Expand Up @@ -478,7 +480,7 @@ func (ss *streamSet) getStreams() []sttypes.Stream {
ss.lock.RLock()
defer ss.lock.RUnlock()

res := make([]sttypes.Stream, 0, len(ss.streams))
res := make([]sttypes.Stream, 0)
for _, st := range ss.streams {
res = append(res, st)
}
Expand Down
84 changes: 47 additions & 37 deletions p2p/stream/protocols/sync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@ var (
type (
// Protocol is the protocol for sync streaming
Protocol struct {
chain engine.ChainReader // provide SYNC data
beaconNode bool // is beacon node or shard chain node
schedule shardingconfig.Schedule // provide schedule information
rl ratelimiter.RateLimiter // limit the incoming request rate
sm streammanager.StreamManager // stream management
rm requestmanager.RequestManager // deliver the response from stream
disc discovery.Discovery
chain engine.ChainReader // provide SYNC data
schedule shardingconfig.Schedule // provide schedule information
rl ratelimiter.RateLimiter // limit the incoming request rate
sm streammanager.StreamManager // stream management
rm requestmanager.RequestManager // deliver the response from stream
disc discovery.Discovery

config Config
logger zerolog.Logger
Expand All @@ -60,12 +59,14 @@ type (

// Config is the sync protocol config
Config struct {
Chain engine.ChainReader
Host libp2p_host.Host
Discovery discovery.Discovery
ShardID nodeconfig.ShardID
Network nodeconfig.NetworkType
BeaconNode bool
Chain engine.ChainReader
Host libp2p_host.Host
Discovery discovery.Discovery
ShardID nodeconfig.ShardID
Network nodeconfig.NetworkType
isValidator bool
isExplorer bool

MaxAdvertiseWaitTime int
// stream manager config
SmSoftLowCap int
Expand All @@ -80,13 +81,12 @@ func NewProtocol(config Config) *Protocol {
ctx, cancel := context.WithCancel(context.Background())

sp := &Protocol{
chain: config.Chain,
beaconNode: config.BeaconNode,
disc: config.Discovery,
config: config,
ctx: ctx,
cancel: cancel,
closeC: make(chan struct{}),
chain: config.Chain,
disc: config.Discovery,
config: config,
ctx: ctx,
cancel: cancel,
closeC: make(chan struct{}),
}
smConfig := streammanager.Config{
SoftLoCap: config.SmSoftLowCap,
Expand All @@ -111,7 +111,7 @@ func (p *Protocol) Start() {
p.rm.Start()
p.rl.Start()
// If it's not EpochChain, advertise
if p.beaconNode || p.chain.ShardID() != shard.BeaconChainShardID {
if p.config.isValidator || p.chain.ShardID() != shard.BeaconChainShardID {
go p.advertiseLoop()
}
}
Expand Down Expand Up @@ -145,9 +145,19 @@ func (p *Protocol) Version() *version.Version {
return MyVersion
}

// IsBeaconNode returns true if it is a beacon chain node
func (p *Protocol) IsBeaconNode() bool {
return p.beaconNode
// IsBeaconValidator returns true if it is a beacon chain validator
func (p *Protocol) IsBeaconValidator() bool {
return p.config.ShardID == shard.BeaconChainShardID && p.config.isValidator
}

// IsValidator returns true if it is a validator node
func (p *Protocol) IsValidator() bool {
return p.config.isValidator
}

// IsExplorer returns true if it is an explorer node
func (p *Protocol) IsExplorer() bool {
return p.config.isExplorer
}

// Match checks the compatibility to the target protocol ID.
Expand Down Expand Up @@ -234,7 +244,7 @@ func (p *Protocol) supportedProtoIDs() []sttypes.ProtoID {
pids = append(pids, p.protoIDByVersion(v))
// beacon node needs to inform shard nodes about it supports them as well for EpochChain
// basically beacon node can accept connection from shard nodes to share last epoch blocks
if p.beaconNode {
if p.IsBeaconValidator() {
pids = append(pids, p.protoIDByVersionForShardNodes(v))
}
}
Expand All @@ -247,22 +257,22 @@ func (p *Protocol) supportedVersions() []*version.Version {

func (p *Protocol) protoIDByVersion(v *version.Version) sttypes.ProtoID {
spec := sttypes.ProtoSpec{
Service: serviceSpecifier,
NetworkType: p.config.Network,
ShardID: p.config.ShardID,
Version: v,
BeaconNode: p.beaconNode,
Service: serviceSpecifier,
NetworkType: p.config.Network,
ShardID: p.config.ShardID,
Version: v,
IsBeaconValidator: p.config.isValidator && p.config.ShardID == shard.BeaconChainShardID,
}
return spec.ToProtoID()
}

func (p *Protocol) protoIDByVersionForShardNodes(v *version.Version) sttypes.ProtoID {
spec := sttypes.ProtoSpec{
Service: serviceSpecifier,
NetworkType: p.config.Network,
ShardID: p.config.ShardID,
Version: v,
BeaconNode: false,
Service: serviceSpecifier,
NetworkType: p.config.Network,
ShardID: p.config.ShardID,
Version: v,
IsBeaconValidator: false,
}
return spec.ToProtoID()
}
Expand All @@ -286,10 +296,10 @@ func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) {
st.AddFailedTimes(FaultRecoveryThreshold)
p.logger.Info().
Str("stream ID", string(st.ID())).
Int("num failures", st.FailedTimes()).
Int("num failures", st.Failures()).
Str("reason", reason).
Msg("stream failed")
if st.FailedTimes() >= MaxStreamFailures {
if st.Failures() >= MaxStreamFailures {
st.Close()
p.logger.Warn().
Str("stream ID", string(st.ID())).
Expand Down
1 change: 0 additions & 1 deletion p2p/stream/protocols/sync/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func TestProtocol_Match(t *testing.T) {

for i, test := range tests {
p := &Protocol{
beaconNode: true,
config: Config{
Network: "unitest",
ShardID: 0,
Expand Down
1 change: 1 addition & 0 deletions p2p/stream/protocols/sync/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (st *syncStream) deliverMsg(msg protobuf.Message) {
return
}

// handleReqLoop replies to incoming requests
func (st *syncStream) handleReqLoop() {
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion p2p/stream/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Protocol interface {
Version() *version.Version
ProtoID() ProtoID
// ShardProtoID() ProtoID
IsBeaconNode() bool
IsBeaconValidator() bool
Match(id protocol.ID) bool
HandleStream(st libp2p_network.Stream)
}
Expand Down
26 changes: 10 additions & 16 deletions p2p/stream/types/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"io"
"sync"
"sync/atomic"
"time"

libp2p_network "github.com/libp2p/go-libp2p/core/network"
Expand All @@ -22,7 +23,7 @@ type Stream interface {
ReadBytes() ([]byte, error)
Close() error
CloseOnExit() error
FailedTimes() int
Failures() int
AddFailedTimes(faultRecoveryThreshold time.Duration)
ResetFailedTimes()
}
Expand All @@ -39,17 +40,17 @@ type BaseStream struct {
specErr error
specOnce sync.Once

failedTimes int
failures int32
lastFailureTime time.Time
}

// NewBaseStream creates BaseStream as the wrapper of libp2p Stream
func NewBaseStream(st libp2p_network.Stream) *BaseStream {
reader := bufio.NewReader(st)
return &BaseStream{
raw: st,
reader: reader,
failedTimes: 0,
raw: st,
reader: reader,
failures: 0,
}
}

Expand Down Expand Up @@ -80,23 +81,16 @@ func (st *BaseStream) Close() error {
return st.raw.Reset()
}

func (st *BaseStream) FailedTimes() int {
return st.failedTimes
func (st *BaseStream) Failures() int {
return int(atomic.LoadInt32(&st.failures))
}

func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
if st.failedTimes > 0 {
durationSinceLastFailure := time.Now().Sub(st.lastFailureTime)
if durationSinceLastFailure >= faultRecoveryThreshold {
st.ResetFailedTimes()
}
}
st.failedTimes++
st.lastFailureTime = time.Now()
atomic.AddInt32(&st.failures, 1)
}

func (st *BaseStream) ResetFailedTimes() {
st.failedTimes = 0
atomic.StoreInt32(&st.failures, 0)
}

const (
Expand Down
24 changes: 12 additions & 12 deletions p2p/stream/types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,18 @@ const (
// 2. NetworkType - mainnet, testnet, stn, e.t.c.
// 3. ShardID - shard ID of the current protocol.
// 4. Version - Stream protocol version for backward compatibility.
// 5. BeaconNode - whether stream is from a beacon chain node or shard chain node
// 5. IsBeaconValidator - whether stream is from a beacon chain node or shard chain node
type ProtoID libp2p_proto.ID

// ProtoSpec is the un-serialized stream proto id specification
// TODO: move this to service wise module since different protocol might have different
// protoID information
type ProtoSpec struct {
Service string
NetworkType nodeconfig.NetworkType
ShardID nodeconfig.ShardID
Version *version.Version
BeaconNode bool
Service string
NetworkType nodeconfig.NetworkType
ShardID nodeconfig.ShardID
Version *version.Version
IsBeaconValidator bool
}

// ToProtoID convert a ProtoSpec to ProtoID.
Expand All @@ -54,7 +54,7 @@ func (spec ProtoSpec) ToProtoID() ProtoID {
versionStr = spec.Version.String()
}
s := fmt.Sprintf(ProtoIDFormat, ProtoIDCommonPrefix, spec.Service,
spec.NetworkType, spec.ShardID, versionStr, bool2int(spec.BeaconNode))
spec.NetworkType, spec.ShardID, versionStr, bool2int(spec.IsBeaconValidator))
return ProtoID(s)
}

Expand Down Expand Up @@ -88,11 +88,11 @@ func ProtoIDToProtoSpec(id ProtoID) (ProtoSpec, error) {
return ProtoSpec{}, errors.Wrap(err, "invalid beacon node flag")
}
return ProtoSpec{
Service: service,
NetworkType: nodeconfig.NetworkType(networkType),
ShardID: nodeconfig.ShardID(uint32(shardID)),
Version: version,
BeaconNode: int2bool(isBeaconNode),
Service: service,
NetworkType: nodeconfig.NetworkType(networkType),
ShardID: nodeconfig.ShardID(uint32(shardID)),
Version: version,
IsBeaconValidator: int2bool(isBeaconNode),
}, nil
}

Expand Down

0 comments on commit f708c17

Please sign in to comment.