Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: various improvements in pusher, pushsync, salud, reacher #4958

Merged
merged 13 commits into from
Feb 3, 2025
Next Next commit
fix: various improvements in pusher, pushsync, salud, reacher
istae committed Jan 28, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit fc29b7e4c2583bfdf3bc4ed75a7306c228de4e97
4 changes: 2 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
@@ -723,7 +723,7 @@ func NewBee(
Batchstore: batchStore,
StateStore: stateStore,
RadiusSetter: kad,
WarmupDuration: o.WarmupTime,
WarmupDuration: warmupTime,
Logger: logger,
Tracer: tracer,
CacheMinEvictCount: cacheMinEvictCount,
@@ -964,7 +964,7 @@ func NewBee(
retrieval := retrieval.New(swarmAddress, waitNetworkRFunc, localStore, p2ps, kad, logger, acc, pricer, tracer, o.RetrievalCaching)
localStore.SetRetrievalService(retrieval)

pusherService := pusher.New(networkID, localStore, pushSyncProtocol, validStamp, logger, warmupTime, pusher.DefaultRetryCount)
pusherService := pusher.New(networkID, localStore, pushSyncProtocol, batchStore, logger, warmupTime, pusher.DefaultRetryCount)
b.pusherCloser = pusherService

pusherService.AddFeed(localStore.PusherFeed())
36 changes: 13 additions & 23 deletions pkg/p2p/libp2p/internal/reacher/reacher.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ import (

const (
pingTimeout = time.Second * 15
workers = 8
workers = 16
retryAfterDuration = time.Minute * 5
)

@@ -32,8 +32,8 @@ type reacher struct {
mu sync.Mutex
peers map[string]*peer

work chan struct{}
quit chan struct{}
newPeer chan struct{}
quit chan struct{}

pinger p2p.Pinger
notifier p2p.ReachableNotifier
@@ -53,7 +53,7 @@ type Options struct {
func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options) *reacher {

r := &reacher{
work: make(chan struct{}, 1),
newPeer: make(chan struct{}, 1),
quit: make(chan struct{}),
pinger: streamer,
peers: make(map[string]*peer),
@@ -103,7 +103,7 @@ func (r *reacher) manage() {
select {
case <-r.quit:
return
case <-r.work:
case <-r.newPeer:
continue
case <-time.After(tryAfter):
continue
@@ -115,12 +115,12 @@ func (r *reacher) manage() {
select {
case <-r.quit:
return
case <-r.work:
case <-r.newPeer:
continue
}
}

// send p to channel
// ping peer
select {
case <-r.quit:
return
@@ -135,10 +135,6 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {

for p := range c {

r.mu.Lock()
overlay := p.overlay
r.mu.Unlock()

now := time.Now()

ctxt, cancel := context.WithTimeout(ctx, r.options.PingTimeout)
@@ -149,14 +145,12 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) {
if err == nil {
r.metrics.Pings.WithLabelValues("success").Inc()
r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPublic)
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPublic)
} else {
r.metrics.Pings.WithLabelValues("failure").Inc()
r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds())
r.notifier.Reachable(overlay, p2p.ReachabilityStatusPrivate)
r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPrivate)
}

r.notifyManage()
}
}

@@ -191,13 +185,6 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) {
return nil, time.Until(nextClosest)
}

func (r *reacher) notifyManage() {
select {
case r.work <- struct{}{}:
default:
}
}

// Connected adds a new peer to the queue for testing reachability.
func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
r.mu.Lock()
@@ -207,7 +194,10 @@ func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) {
r.peers[overlay.ByteString()] = &peer{overlay: overlay, addr: addr}
}

r.notifyManage()
select {
case r.newPeer <- struct{}{}:
default:
}
}

// Disconnected removes a peer from the queue.
9 changes: 6 additions & 3 deletions pkg/postage/interface.go
Original file line number Diff line number Diff line change
@@ -42,15 +42,13 @@ type ChainSnapshot struct {
// on the current (highest available) block.
type Storer interface {
ChainStateGetter
BatchExist

Radius() uint8

// Get returns a batch from the store with the given ID.
Get([]byte) (*Batch, error)

// Exists reports whether batch referenced by the give id exists.
Exists([]byte) (bool, error)

// Iterate iterates through stored batches.
Iterate(func(*Batch) (bool, error)) error

@@ -73,6 +71,11 @@ type Storer interface {
SetBatchExpiryHandler(BatchExpiryHandler)
}

type BatchExist interface {
// Exists reports whether batch referenced by the give id exists.
Exists([]byte) (bool, error)
}

// StorageRadiusSetter is used to calculate total batch commitment of the network.
type CommitmentGetter interface {
Commitment() (uint64, error)
20 changes: 10 additions & 10 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ type Service struct {
networkID uint64
storer Storer
pushSyncer pushsync.PushSyncer
validStamp postage.ValidStampFn
batchExist postage.BatchExist
logger log.Logger
metrics metrics
quit chan struct{}
@@ -75,7 +75,7 @@ func New(
networkID uint64,
storer Storer,
pushSyncer pushsync.PushSyncer,
validStamp postage.ValidStampFn,
batchExist postage.BatchExist,
logger log.Logger,
warmupTime time.Duration,
retryCount int,
@@ -84,7 +84,7 @@ func New(
networkID: networkID,
storer: storer,
pushSyncer: pushSyncer,
validStamp: validStamp,
batchExist: batchExist,
logger: logger.WithName(loggerName).Register(),
metrics: newMetrics(),
quit: make(chan struct{}),
@@ -251,14 +251,14 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (

defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID())

if _, err := s.validStamp(op.Chunk); err != nil {
loggerV1.Warning(
"stamp with is no longer valid, skipping syncing for chunk",
ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID())
if !ok || err != nil {
logger.Warning(
"stamp is no longer valid, skipping syncing for chunk",
"batch_id", hex.EncodeToString(op.Chunk.Stamp().BatchID()),
"chunk_address", op.Chunk.Address(),
"error", err,
)

return false, errors.Join(err, s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync))
}

@@ -311,10 +311,10 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err
}
}()

_, err = s.validStamp(op.Chunk)
if err != nil {
ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If batch not found it will return nil? Is this also ok?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for direct uploads, the chunk resync is retried if this error is not nil. because batch is no longer valid at this point, we should stop syncing, hence the nil return

if !ok || err != nil {
logger.Warning(
"stamp with is no longer valid, skipping direct upload for chunk",
"stamp is no longer valid, skipping direct upload for chunk",
"batch_id", hex.EncodeToString(op.Chunk.Stamp().BatchID()),
"chunk_address", op.Chunk.Address(),
"error", err,
28 changes: 15 additions & 13 deletions pkg/pusher/pusher_test.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/postage"
batchstoremock "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock"
"github.com/ethersphere/bee/v2/pkg/pusher"
"github.com/ethersphere/bee/v2/pkg/pushsync"
pushsyncmock "github.com/ethersphere/bee/v2/pkg/pushsync/mock"
@@ -33,9 +34,9 @@ const spinTimeout = time.Second * 3

var (
block = common.HexToHash("0x1").Bytes()
defaultMockValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) {
return ch, nil
}
defaultMockBatchStore = batchstoremock.New(batchstoremock.WithExistsFunc(func(b []byte) (bool, error) {
return true, nil
}))
defaultRetryCount = 3
)

@@ -134,7 +135,7 @@ func TestChunkSyncing(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

@@ -181,7 +182,7 @@ func TestChunkStored(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

@@ -239,7 +240,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

@@ -283,7 +284,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

@@ -326,7 +327,7 @@ func TestPusherRetryShallow(t *testing.T) {
t,
storer,
pushSyncService,
defaultMockValidStamp,
defaultMockBatchStore,
defaultRetryCount,
)

@@ -364,9 +365,10 @@ func TestChunkWithInvalidStampSkipped(t *testing.T) {
})

wantErr := errors.New("dummy error")
validStamp := func(ch swarm.Chunk) (swarm.Chunk, error) {
return nil, wantErr
}

bmock := batchstoremock.New(batchstoremock.WithExistsFunc(func(b []byte) (bool, error) {
return false, wantErr
}))

storer := &mockStorer{
chunks: make(chan swarm.Chunk),
@@ -376,7 +378,7 @@ func TestChunkWithInvalidStampSkipped(t *testing.T) {
t,
storer,
pushSyncService,
validStamp,
bmock,
defaultRetryCount,
)

@@ -412,7 +414,7 @@ func createPusher(
t *testing.T,
storer pusher.Storer,
pushSyncService pushsync.PushSyncer,
validStamp postage.ValidStampFn,
validStamp postage.BatchExist,
retryCount int,
) *pusher.Service {
t.Helper()
11 changes: 7 additions & 4 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
@@ -609,10 +609,13 @@ func (ps *PushSync) pushChunkToPeer(ctx context.Context, peer swarm.Address, ch
return nil, err
}

err = ps.store.Report(ctx, ch, storage.ChunkSent)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
err = fmt.Errorf("tag %d increment: %w", ch.TagID(), err)
return
// if the chunk has a tag, then it's from a local deferred upload
if ch.TagID() != 0 {
err = ps.store.Report(ctx, ch, storage.ChunkSent)
if err != nil {
err = fmt.Errorf("tag %d increment: %w", ch.TagID(), err)
return
}
}

var rec pb.Receipt
14 changes: 10 additions & 4 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ func TestPushClosest(t *testing.T) {

// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), mock.WithClosestPeer(closestPeer))
psPivot, pivotStorer, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), mock.WithClosestPeer(closestPeer))

// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
@@ -93,6 +93,12 @@ func TestPushClosest(t *testing.T) {

// this intercepts the incoming receipt message
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)

found, _ := pivotStorer.hasReported(t, chunk.Address())
if found {
t.Fatalf("chunk %s reported", chunk.Address())
}

balance, err := pivotAccounting.Balance(closestPeer)
if err != nil {
t.Fatal(err)
@@ -245,7 +251,7 @@ func TestShallowReceipt(t *testing.T) {
func TestPushChunkToClosest(t *testing.T) {
t.Parallel()
// chunk data to upload
chunk := testingc.FixtureChunk("7000")
chunk := testingc.FixtureChunk("7000").WithTagID(1)

// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
@@ -318,7 +324,7 @@ func TestPushChunkToNextClosest(t *testing.T) {
t.Skip("flaky test")

// chunk data to upload
chunk := testingc.FixtureChunk("7000")
chunk := testingc.FixtureChunk("7000").WithTagID(1)

// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
@@ -428,7 +434,7 @@ func TestPushChunkToClosestErrorAttemptRetry(t *testing.T) {
t.Parallel()

// chunk data to upload
chunk := testingc.FixtureChunk("7000")
chunk := testingc.FixtureChunk("7000").WithTagID(1)

// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
13 changes: 9 additions & 4 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
@@ -184,6 +184,11 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,

s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment)

// sort peers by duration, highest first to give priority to the fastest peers
sort.Slice(peers, func(i, j int) bool {
return peers[i].dur > peers[j].dur // descending
})

for _, peer := range peers {

var healthy bool
@@ -196,13 +201,13 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
}

if networkRadius > 0 && peer.status.CommittedDepth < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.CommittedDepth, "peer_address", peer.addr)
s.logger.Debug("radius health failure", "radius", peer.status.CommittedDepth, "peer_address", peer.addr, "bin", peer.bin)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr)
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr, "bin", peer.bin)
} else if peer.status.ConnectedPeers < pConns {
s.logger.Debug("connections count below threshold", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr)
s.logger.Debug("connections count below threshold", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr, "bin", peer.bin)
} else if peer.status.BatchCommitment != commitment {
s.logger.Debug("batch commitment check failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr)
s.logger.Debug("batch commitment check failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr, "bin", peer.bin)
} else {
healthy = true
}