Skip to content

Commit

Permalink
Merge branch 'main' into ramin/gateway-refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ramin authored Dec 21, 2023
2 parents dac865c + 150378f commit 9d898de
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 147 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.6.2
github.com/libp2p/go-libp2p v0.32.0
github.com/libp2p/go-libp2p v0.32.1
github.com/libp2p/go-libp2p-kad-dht v0.25.1
github.com/libp2p/go-libp2p-pubsub v0.10.0
github.com/libp2p/go-libp2p-record v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1414,8 +1414,8 @@ github.com/libp2p/go-libp2p v0.22.0/go.mod h1:UDolmweypBSjQb2f7xutPnwZ/fxioLbMBx
github.com/libp2p/go-libp2p v0.23.4/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI=
github.com/libp2p/go-libp2p v0.25.0/go.mod h1:vXHmFpcfl+xIGN4qW58Bw3a0/SKGAesr5/T4IuJHE3o=
github.com/libp2p/go-libp2p v0.25.1/go.mod h1:xnK9/1d9+jeQCVvi/f1g12KqtVi/jP/SijtKV1hML3g=
github.com/libp2p/go-libp2p v0.32.0 h1:86I4B7nBUPIyTgw3+5Ibq6K7DdKRCuZw8URCfPc1hQM=
github.com/libp2p/go-libp2p v0.32.0/go.mod h1:hXXC3kXPlBZ1eu8Q2hptGrMB4mZ3048JUoS4EKaHW5c=
github.com/libp2p/go-libp2p v0.32.1 h1:wy1J4kZIZxOaej6NveTWCZmHiJ/kY7GoAqXgqNCnPps=
github.com/libp2p/go-libp2p v0.32.1/go.mod h1:hXXC3kXPlBZ1eu8Q2hptGrMB4mZ3048JUoS4EKaHW5c=
github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI=
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
Expand Down
15 changes: 7 additions & 8 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader
utils.SetStatusAndEnd(span, err)
}()

dah := header.DAH
// short circuit if the data root is empty
if dah.Equals(share.EmptyRoot()) {
if header.DAH.Equals(share.EmptyRoot()) {
return share.EmptyExtendedDataSquare(), nil
}
for {
Expand All @@ -147,10 +146,10 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader
}
attempt++
start := time.Now()
peer, setStatus, getErr := sg.peerManager.Peer(ctx, dah.Hash())
peer, setStatus, getErr := sg.peerManager.Peer(ctx, header.DAH.Hash(), header.Height())
if getErr != nil {
log.Debugw("eds: couldn't find peer",
"hash", dah.String(),
"hash", header.DAH.String(),
"err", getErr,
"finished (s)", time.Since(start))
sg.metrics.recordEDSAttempt(ctx, attempt, false)
Expand All @@ -159,11 +158,11 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader

reqStart := time.Now()
reqCtx, cancel := ctxWithSplitTimeout(ctx, sg.minAttemptsCount-attempt+1, sg.minRequestTimeout)
eds, getErr := sg.edsClient.RequestEDS(reqCtx, dah.Hash(), peer)
eds, getErr := sg.edsClient.RequestEDS(reqCtx, header.DAH.Hash(), peer)
cancel()
switch {
case getErr == nil:
setStatus(peers.ResultSynced)
setStatus(peers.ResultNoop)
sg.metrics.recordEDSAttempt(ctx, attempt, true)
return eds, nil
case errors.Is(getErr, context.DeadlineExceeded),
Expand All @@ -182,7 +181,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, header *header.ExtendedHeader
err = errors.Join(err, getErr)
}
log.Debugw("eds: request failed",
"hash", dah.String(),
"hash", header.DAH.String(),
"peer", peer.String(),
"attempt", attempt,
"err", getErr,
Expand Down Expand Up @@ -223,7 +222,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
}
attempt++
start := time.Now()
peer, setStatus, getErr := sg.peerManager.Peer(ctx, dah.Hash())
peer, setStatus, getErr := sg.peerManager.Peer(ctx, header.DAH.Hash(), header.Height())
if getErr != nil {
log.Debugw("nd: couldn't find peer",
"hash", dah.String(),
Expand Down
103 changes: 50 additions & 53 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
const (
// ResultNoop indicates operation was successful and no extra action is required
ResultNoop result = "result_noop"
// ResultSynced will save the status of pool as "synced" and will remove peers from it
ResultSynced = "result_synced"
// ResultCooldownPeer will put returned peer on cooldown, meaning it won't be available by Peer
// method for some time
ResultCooldownPeer = "result_cooldown_peer"
Expand All @@ -39,6 +37,9 @@ const (
// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p
eventbusBufSize = 32

// storedPoolsAmount is the amount of pools for recent headers that will be stored in the peer manager
storedPoolsAmount = 10
)

type result string
Expand All @@ -56,11 +57,14 @@ type Manager struct {
host host.Host
connGater *conngater.BasicConnectionGater

// pools collecting peers from shrexSub
// pools collecting peers from shrexSub and stores them by datahash
pools map[string]*syncPool
// messages from shrex.Sub with height below initialHeight will be ignored, since we don't need to
// track peers for those headers

// initialHeight is the height of the first header received from headersub
initialHeight atomic.Uint64
// messages from shrex.Sub with height below storeFrom will be ignored, since we don't need to
// track peers for those headers
storeFrom atomic.Uint64

// fullNodes collects full nodes peer.ID found via discovery
fullNodes *pool
Expand All @@ -85,11 +89,8 @@ type syncPool struct {
// isValidatedDataHash indicates if datahash was validated by receiving corresponding extended
// header from headerSub
isValidatedDataHash atomic.Bool
// headerHeight is the height of header corresponding to syncpool
headerHeight atomic.Uint64
// isSynced will be true if DoneFunc was called with ResultSynced. It indicates that given datahash
// was synced and peer-manager no longer need to keep peers for it
isSynced atomic.Bool
// height is the height of the header that corresponds to datahash
height uint64
// createdAt is the syncPool creation time
createdAt time.Time
}
Expand Down Expand Up @@ -190,16 +191,15 @@ func (m *Manager) Stop(ctx context.Context) error {
// full nodes, it will wait until any peer appear in either source or timeout happen.
// After fetching data using given peer, caller is required to call returned DoneFunc using
// appropriate result value
func (m *Manager) Peer(
ctx context.Context, datahash share.DataHash,
func (m *Manager) Peer(ctx context.Context, datahash share.DataHash, height uint64,
) (peer.ID, DoneFunc, error) {
p := m.validatedPool(datahash.String())
p := m.validatedPool(datahash.String(), height)

// first, check if a peer is available for the given datahash
peerID, ok := p.tryGet()
if ok {
if m.removeIfUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
return m.Peer(ctx, datahash, height)
}
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0)
}
Expand All @@ -216,7 +216,7 @@ func (m *Manager) Peer(
select {
case peerID = <-p.next(ctx):
if m.removeIfUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
return m.Peer(ctx, datahash, height)
}
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
case peerID = <-m.fullNodes.next(ctx):
Expand Down Expand Up @@ -270,14 +270,12 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS
m.metrics.observeDoneResult(source, result)
switch result {
case ResultNoop:
case ResultSynced:
m.markPoolAsSynced(datahash.String())
case ResultCooldownPeer:
if source == sourceFullNodes {
m.fullNodes.putOnCooldown(peerID)
return
}
m.getOrCreatePool(datahash.String()).putOnCooldown(peerID)
m.getPool(datahash.String()).putOnCooldown(peerID)
case ResultBlacklistPeer:
m.blacklistPeers(reasonMisbehave, peerID)
}
Expand All @@ -298,12 +296,16 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
log.Errorw("get next header from sub", "err", err)
continue
}
m.validatedPool(h.DataHash.String())
m.validatedPool(h.DataHash.String(), h.Height())

// store first header for validation purposes
if m.initialHeight.CompareAndSwap(0, h.Height()) {
log.Debugw("stored initial height", "height", h.Height())
}

// update storeFrom if header heigh
m.storeFrom.Store(uint64(max(0, int(h.Height())-storedPoolsAmount)))
log.Debugw("updated lowest stored height", "height", h.Height())
}
}

Expand Down Expand Up @@ -355,22 +357,12 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif
return pubsub.ValidationReject
}

if msg.Height == 0 {
logger.Debug("received message with 0 height")
return pubsub.ValidationReject
}

if msg.Height < m.initialHeight.Load() {
// we can use peers from discovery for headers before the first one from headerSub
// if we allow pool creation for those headers, there is chance the pool will not be validated in
// time and will be false-positively trigger blacklisting of hash and all peers that sent msgs for
// that hash
if msg.Height < m.storeFrom.Load() {
logger.Debug("received message for past header")
return pubsub.ValidationIgnore
}

p := m.getOrCreatePool(msg.DataHash.String())
p.headerHeight.Store(msg.Height)
p := m.getOrCreatePool(msg.DataHash.String(), msg.Height)
logger.Debugw("got hash from shrex-sub")

p.add(peerID)
Expand All @@ -381,13 +373,20 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif
return pubsub.ValidationIgnore
}

func (m *Manager) getOrCreatePool(datahash string) *syncPool {
func (m *Manager) getPool(datahash string) *syncPool {
m.lock.Lock()
defer m.lock.Unlock()
return m.pools[datahash]
}

func (m *Manager) getOrCreatePool(datahash string, height uint64) *syncPool {
m.lock.Lock()
defer m.lock.Unlock()

p, ok := m.pools[datahash]
if !ok {
p = &syncPool{
height: height,
pool: newPool(m.params.PeerCooldown),
createdAt: time.Now(),
}
Expand Down Expand Up @@ -432,8 +431,8 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool {
return m.blacklistedHashes[hash.String()]
}

func (m *Manager) validatedPool(hashStr string) *syncPool {
p := m.getOrCreatePool(hashStr)
func (m *Manager) validatedPool(hashStr string, height uint64) *syncPool {
p := m.getOrCreatePool(hashStr, height)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "datahash", hashStr)
// if pool is proven to be valid, add all collected peers to full nodes
Expand Down Expand Up @@ -482,12 +481,24 @@ func (m *Manager) cleanUp() []peer.ID {

addToBlackList := make(map[peer.ID]struct{})
for h, p := range m.pools {
if !p.isValidatedDataHash.Load() && time.Since(p.createdAt) > m.params.PoolValidationTimeout {
delete(m.pools, h)
if p.headerHeight.Load() < m.initialHeight.Load() {
// outdated pools could still be valid even if not validated, no need to blacklist
continue
if p.isValidatedDataHash.Load() {
// remove pools that are outdated
if p.height < m.storeFrom.Load() {
delete(m.pools, h)
}
continue
}

// can't validate datahashes below initial height
if p.height < m.initialHeight.Load() {
delete(m.pools, h)
continue
}

// find pools that are not validated in time
if time.Since(p.createdAt) > m.params.PoolValidationTimeout {
delete(m.pools, h)

log.Debug("blacklisting datahash with all corresponding peers",
"hash", h,
"peer_list", p.peersList)
Expand All @@ -507,17 +518,3 @@ func (m *Manager) cleanUp() []peer.ID {
}
return blacklist
}

func (m *Manager) markPoolAsSynced(datahash string) {
p := m.getOrCreatePool(datahash)
if p.isSynced.CompareAndSwap(false, true) {
p.isSynced.Store(true)
p.reset()
}
}

func (p *syncPool) add(peers ...peer.ID) {
if !p.isSynced.Load() {
p.pool.add(peers...)
}
}
Loading

0 comments on commit 9d898de

Please sign in to comment.