From fc29b7e4c2583bfdf3bc4ed75a7306c228de4e97 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 24 Jan 2025 02:04:56 +0300 Subject: [PATCH 01/13] fix: various improvements in pusher, pushsync, salud, reacher --- pkg/node/node.go | 4 +-- pkg/p2p/libp2p/internal/reacher/reacher.go | 36 ++++++++-------------- pkg/postage/interface.go | 9 ++++-- pkg/pusher/pusher.go | 20 ++++++------ pkg/pusher/pusher_test.go | 28 +++++++++-------- pkg/pushsync/pushsync.go | 11 ++++--- pkg/pushsync/pushsync_test.go | 14 ++++++--- pkg/salud/salud.go | 13 +++++--- 8 files changed, 72 insertions(+), 63 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index a9cbcf10580..2cd64d80177 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -723,7 +723,7 @@ func NewBee( Batchstore: batchStore, StateStore: stateStore, RadiusSetter: kad, - WarmupDuration: o.WarmupTime, + WarmupDuration: warmupTime, Logger: logger, Tracer: tracer, CacheMinEvictCount: cacheMinEvictCount, @@ -964,7 +964,7 @@ func NewBee( retrieval := retrieval.New(swarmAddress, waitNetworkRFunc, localStore, p2ps, kad, logger, acc, pricer, tracer, o.RetrievalCaching) localStore.SetRetrievalService(retrieval) - pusherService := pusher.New(networkID, localStore, pushSyncProtocol, validStamp, logger, warmupTime, pusher.DefaultRetryCount) + pusherService := pusher.New(networkID, localStore, pushSyncProtocol, batchStore, logger, warmupTime, pusher.DefaultRetryCount) b.pusherCloser = pusherService pusherService.AddFeed(localStore.PusherFeed()) diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index ad888df4d0f..18b1431229f 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -18,7 +18,7 @@ import ( const ( pingTimeout = time.Second * 15 - workers = 8 + workers = 16 retryAfterDuration = time.Minute * 5 ) @@ -32,8 +32,8 @@ type reacher struct { mu sync.Mutex peers map[string]*peer - work chan struct{} - quit chan struct{} + newPeer chan struct{} + quit chan struct{} pinger p2p.Pinger notifier p2p.ReachableNotifier @@ -53,7 +53,7 @@ type Options struct { func New(streamer p2p.Pinger, notifier p2p.ReachableNotifier, o *Options) *reacher { r := &reacher{ - work: make(chan struct{}, 1), + newPeer: make(chan struct{}, 1), quit: make(chan struct{}), pinger: streamer, peers: make(map[string]*peer), @@ -103,7 +103,7 @@ func (r *reacher) manage() { select { case <-r.quit: return - case <-r.work: + case <-r.newPeer: continue case <-time.After(tryAfter): continue @@ -115,12 +115,12 @@ func (r *reacher) manage() { select { case <-r.quit: return - case <-r.work: + case <-r.newPeer: continue } } - // send p to channel + // ping peer select { case <-r.quit: return @@ -135,10 +135,6 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) { for p := range c { - r.mu.Lock() - overlay := p.overlay - r.mu.Unlock() - now := time.Now() ctxt, cancel := context.WithTimeout(ctx, r.options.PingTimeout) @@ -149,14 +145,12 @@ func (r *reacher) ping(c chan *peer, ctx context.Context) { if err == nil { r.metrics.Pings.WithLabelValues("success").Inc() r.metrics.PingTime.WithLabelValues("success").Observe(time.Since(now).Seconds()) - r.notifier.Reachable(overlay, p2p.ReachabilityStatusPublic) + r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPublic) } else { r.metrics.Pings.WithLabelValues("failure").Inc() r.metrics.PingTime.WithLabelValues("failure").Observe(time.Since(now).Seconds()) - r.notifier.Reachable(overlay, p2p.ReachabilityStatusPrivate) + r.notifier.Reachable(p.overlay, p2p.ReachabilityStatusPrivate) } - - r.notifyManage() } } @@ -191,13 +185,6 @@ func (r *reacher) tryAcquirePeer() (*peer, time.Duration) { return nil, time.Until(nextClosest) } -func (r *reacher) notifyManage() { - select { - case r.work <- struct{}{}: - default: - } -} - // Connected adds a new peer to the queue for testing reachability. func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) { r.mu.Lock() @@ -207,7 +194,10 @@ func (r *reacher) Connected(overlay swarm.Address, addr ma.Multiaddr) { r.peers[overlay.ByteString()] = &peer{overlay: overlay, addr: addr} } - r.notifyManage() + select { + case r.newPeer <- struct{}{}: + default: + } } // Disconnected removes a peer from the queue. diff --git a/pkg/postage/interface.go b/pkg/postage/interface.go index 9d93918307b..865a88408c4 100644 --- a/pkg/postage/interface.go +++ b/pkg/postage/interface.go @@ -42,15 +42,13 @@ type ChainSnapshot struct { // on the current (highest available) block. type Storer interface { ChainStateGetter + BatchExist Radius() uint8 // Get returns a batch from the store with the given ID. Get([]byte) (*Batch, error) - // Exists reports whether batch referenced by the give id exists. - Exists([]byte) (bool, error) - // Iterate iterates through stored batches. Iterate(func(*Batch) (bool, error)) error @@ -73,6 +71,11 @@ type Storer interface { SetBatchExpiryHandler(BatchExpiryHandler) } +type BatchExist interface { + // Exists reports whether batch referenced by the give id exists. + Exists([]byte) (bool, error) +} + // StorageRadiusSetter is used to calculate total batch commitment of the network. type CommitmentGetter interface { Commitment() (uint64, error) diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index e71ba7fda73..f8c3a7cd168 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -51,7 +51,7 @@ type Service struct { networkID uint64 storer Storer pushSyncer pushsync.PushSyncer - validStamp postage.ValidStampFn + batchExist postage.BatchExist logger log.Logger metrics metrics quit chan struct{} @@ -75,7 +75,7 @@ func New( networkID uint64, storer Storer, pushSyncer pushsync.PushSyncer, - validStamp postage.ValidStampFn, + batchExist postage.BatchExist, logger log.Logger, warmupTime time.Duration, retryCount int, @@ -84,7 +84,7 @@ func New( networkID: networkID, storer: storer, pushSyncer: pushSyncer, - validStamp: validStamp, + batchExist: batchExist, logger: logger.WithName(loggerName).Register(), metrics: newMetrics(), quit: make(chan struct{}), @@ -251,14 +251,14 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( defer s.inflight.delete(op.identityAddress, op.Chunk.Stamp().BatchID()) - if _, err := s.validStamp(op.Chunk); err != nil { - loggerV1.Warning( - "stamp with is no longer valid, skipping syncing for chunk", + ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID()) + if !ok || err != nil { + logger.Warning( + "stamp is no longer valid, skipping syncing for chunk", "batch_id", hex.EncodeToString(op.Chunk.Stamp().BatchID()), "chunk_address", op.Chunk.Address(), "error", err, ) - return false, errors.Join(err, s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync)) } @@ -311,10 +311,10 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err } }() - _, err = s.validStamp(op.Chunk) - if err != nil { + ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID()) + if !ok || err != nil { logger.Warning( - "stamp with is no longer valid, skipping direct upload for chunk", + "stamp is no longer valid, skipping direct upload for chunk", "batch_id", hex.EncodeToString(op.Chunk.Stamp().BatchID()), "chunk_address", op.Chunk.Address(), "error", err, diff --git a/pkg/pusher/pusher_test.go b/pkg/pusher/pusher_test.go index fbdfb3a9acd..6424e667db7 100644 --- a/pkg/pusher/pusher_test.go +++ b/pkg/pusher/pusher_test.go @@ -17,6 +17,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/postage" + batchstoremock "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" "github.com/ethersphere/bee/v2/pkg/pusher" "github.com/ethersphere/bee/v2/pkg/pushsync" pushsyncmock "github.com/ethersphere/bee/v2/pkg/pushsync/mock" @@ -33,9 +34,9 @@ const spinTimeout = time.Second * 3 var ( block = common.HexToHash("0x1").Bytes() - defaultMockValidStamp = func(ch swarm.Chunk) (swarm.Chunk, error) { - return ch, nil - } + defaultMockBatchStore = batchstoremock.New(batchstoremock.WithExistsFunc(func(b []byte) (bool, error) { + return true, nil + })) defaultRetryCount = 3 ) @@ -134,7 +135,7 @@ func TestChunkSyncing(t *testing.T) { t, storer, pushSyncService, - defaultMockValidStamp, + defaultMockBatchStore, defaultRetryCount, ) @@ -181,7 +182,7 @@ func TestChunkStored(t *testing.T) { t, storer, pushSyncService, - defaultMockValidStamp, + defaultMockBatchStore, defaultRetryCount, ) @@ -239,7 +240,7 @@ func TestSendChunkAndReceiveInvalidReceipt(t *testing.T) { t, storer, pushSyncService, - defaultMockValidStamp, + defaultMockBatchStore, defaultRetryCount, ) @@ -283,7 +284,7 @@ func TestSendChunkAndTimeoutinReceivingReceipt(t *testing.T) { t, storer, pushSyncService, - defaultMockValidStamp, + defaultMockBatchStore, defaultRetryCount, ) @@ -326,7 +327,7 @@ func TestPusherRetryShallow(t *testing.T) { t, storer, pushSyncService, - defaultMockValidStamp, + defaultMockBatchStore, defaultRetryCount, ) @@ -364,9 +365,10 @@ func TestChunkWithInvalidStampSkipped(t *testing.T) { }) wantErr := errors.New("dummy error") - validStamp := func(ch swarm.Chunk) (swarm.Chunk, error) { - return nil, wantErr - } + + bmock := batchstoremock.New(batchstoremock.WithExistsFunc(func(b []byte) (bool, error) { + return false, wantErr + })) storer := &mockStorer{ chunks: make(chan swarm.Chunk), @@ -376,7 +378,7 @@ func TestChunkWithInvalidStampSkipped(t *testing.T) { t, storer, pushSyncService, - validStamp, + bmock, defaultRetryCount, ) @@ -412,7 +414,7 @@ func createPusher( t *testing.T, storer pusher.Storer, pushSyncService pushsync.PushSyncer, - validStamp postage.ValidStampFn, + validStamp postage.BatchExist, retryCount int, ) *pusher.Service { t.Helper() diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 10f7ac39811..7430bdfe6a9 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -609,10 +609,13 @@ func (ps *PushSync) pushChunkToPeer(ctx context.Context, peer swarm.Address, ch return nil, err } - err = ps.store.Report(ctx, ch, storage.ChunkSent) - if err != nil && !errors.Is(err, storage.ErrNotFound) { - err = fmt.Errorf("tag %d increment: %w", ch.TagID(), err) - return + // if the chunk has a tag, then it's from a local deferred upload + if ch.TagID() != 0 { + err = ps.store.Report(ctx, ch, storage.ChunkSent) + if err != nil { + err = fmt.Errorf("tag %d increment: %w", ch.TagID(), err) + return + } } var rec pb.Receipt diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 8773c693de0..d1cc75e14df 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -76,7 +76,7 @@ func TestPushClosest(t *testing.T) { // pivot node needs the streamer since the chunk is intercepted by // the chunk worker, then gets sent by opening a new stream - psPivot, _, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), mock.WithClosestPeer(closestPeer)) + psPivot, pivotStorer, pivotAccounting := createPushSyncNode(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), mock.WithClosestPeer(closestPeer)) // Trigger the sending of chunk to the closest node receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) @@ -93,6 +93,12 @@ func TestPushClosest(t *testing.T) { // this intercepts the incoming receipt message waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) + + found, _ := pivotStorer.hasReported(t, chunk.Address()) + if found { + t.Fatalf("chunk %s reported", chunk.Address()) + } + balance, err := pivotAccounting.Balance(closestPeer) if err != nil { t.Fatal(err) @@ -245,7 +251,7 @@ func TestShallowReceipt(t *testing.T) { func TestPushChunkToClosest(t *testing.T) { t.Parallel() // chunk data to upload - chunk := testingc.FixtureChunk("7000") + chunk := testingc.FixtureChunk("7000").WithTagID(1) // create a pivot node and a mocked closest node pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 @@ -318,7 +324,7 @@ func TestPushChunkToNextClosest(t *testing.T) { t.Skip("flaky test") // chunk data to upload - chunk := testingc.FixtureChunk("7000") + chunk := testingc.FixtureChunk("7000").WithTagID(1) // create a pivot node and a mocked closest node pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 @@ -428,7 +434,7 @@ func TestPushChunkToClosestErrorAttemptRetry(t *testing.T) { t.Parallel() // chunk data to upload - chunk := testingc.FixtureChunk("7000") + chunk := testingc.FixtureChunk("7000").WithTagID(1) // create a pivot node and a mocked closest node pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index d47abf3fd76..95a02b065fa 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -184,6 +184,11 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, s.logger.Debug("computed", "avg_dur", avgDur, "pDur", pDur, "pConns", pConns, "network_radius", networkRadius, "neighborhood_radius", nHoodRadius, "batch_commitment", commitment) + // sort peers by duration, highest first to give priority to the fastest peers + sort.Slice(peers, func(i, j int) bool { + return peers[i].dur > peers[j].dur // descending + }) + for _, peer := range peers { var healthy bool @@ -196,13 +201,13 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, } if networkRadius > 0 && peer.status.CommittedDepth < uint32(networkRadius-2) { - s.logger.Debug("radius health failure", "radius", peer.status.CommittedDepth, "peer_address", peer.addr) + s.logger.Debug("radius health failure", "radius", peer.status.CommittedDepth, "peer_address", peer.addr, "bin", peer.bin) } else if peer.dur.Seconds() > pDur { - s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr) + s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr, "bin", peer.bin) } else if peer.status.ConnectedPeers < pConns { - s.logger.Debug("connections count below threshold", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr) + s.logger.Debug("connections count below threshold", "connections", peer.status.ConnectedPeers, "peer_address", peer.addr, "bin", peer.bin) } else if peer.status.BatchCommitment != commitment { - s.logger.Debug("batch commitment check failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr) + s.logger.Debug("batch commitment check failure", "commitment", peer.status.BatchCommitment, "peer_address", peer.addr, "bin", peer.bin) } else { healthy = true } From 3316b16d440aed0e1bf8b482d861596e036e2762 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 24 Jan 2025 02:53:18 +0300 Subject: [PATCH 02/13] perf: improve concurrency numberS --- pkg/pusher/pusher.go | 6 +----- pkg/pushsync/pushsync.go | 1 - pkg/storer/storer.go | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index f8c3a7cd168..b03bd670aed 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -63,14 +63,10 @@ type Service struct { const ( traceDuration = 30 * time.Second // duration for every root tracing span - ConcurrentPushes = 100 // how many chunks to push simultaneously + ConcurrentPushes = swarm.Branches // how many chunks to push simultaneously DefaultRetryCount = 6 ) -var ( - ErrInvalidAddress = errors.New("invalid address") -) - func New( networkID uint64, storer Storer, diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 7430bdfe6a9..f2e4f00c95d 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -528,7 +528,6 @@ func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptRes select { case resultChan <- receiptResult{pushTime: now, peer: peer, err: err, receipt: receipt}: case <-parentCtx.Done(): - ps.logger.Debug("push result parent context canceled", "chunk_address", ch.Address(), "peer_address", peer) } }() diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 2628807a24e..32223e401de 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -243,7 +243,7 @@ const ( defaultWriteBufferSize = uint64(32 * 1024 * 1024) defaultDisableSeeksCompaction = false defaultCacheCapacity = uint64(1_000_000) - defaultBgCacheWorkers = 16 + defaultBgCacheWorkers = 128 DefaultReserveCapacity = 1 << 22 // 4194304 chunks indexPath = "indexstore" From 37288bbdc8c3ab5272b323f2a7458be6e828a56c Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 27 Jan 2025 18:32:03 +0300 Subject: [PATCH 03/13] fix: shallow receipt tolerance --- pkg/node/node.go | 10 ++++- pkg/pushsync/pb/pushsync.pb.go | 62 +++++++++++++++++++++++------- pkg/pushsync/pb/pushsync.proto | 1 + pkg/pushsync/pushsync.go | 54 ++++++++++++++------------ pkg/pushsync/pushsync_test.go | 70 ++++++++++++++++++++++++++++++++-- pkg/storage/testing/chunk.go | 2 +- 6 files changed, 154 insertions(+), 45 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 2cd64d80177..e9f327edbb3 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -250,9 +250,15 @@ func NewBee( } }(b) - if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > 1 { + if !o.FullNodeMode && o.ReserveCapacityDoubling != 0 { + return nil, fmt.Errorf("reserve capacity doubling is only allowed for full nodes") + } + + const maxAllowedDoubling = 1 + if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > maxAllowedDoubling { return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: 1") } + var shallowReceiptTolerance = maxAllowedDoubling - o.ReserveCapacityDoubling reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity @@ -955,7 +961,7 @@ func NewBee( } } - pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, warmupTime) + pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, warmupTime, shallowReceiptTolerance) b.pushSyncCloser = pushSyncProtocol // set the pushSyncer in the PSS diff --git a/pkg/pushsync/pb/pushsync.pb.go b/pkg/pushsync/pb/pushsync.pb.go index d3141357249..352abd774d7 100644 --- a/pkg/pushsync/pb/pushsync.pb.go +++ b/pkg/pushsync/pb/pushsync.pb.go @@ -83,10 +83,11 @@ func (m *Delivery) GetStamp() []byte { } type Receipt struct { - Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"` - Signature []byte `protobuf:"bytes,2,opt,name=Signature,proto3" json:"Signature,omitempty"` - Nonce []byte `protobuf:"bytes,3,opt,name=Nonce,proto3" json:"Nonce,omitempty"` - Err string `protobuf:"bytes,4,opt,name=Err,proto3" json:"Err,omitempty"` + Address []byte `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"` + Signature []byte `protobuf:"bytes,2,opt,name=Signature,proto3" json:"Signature,omitempty"` + Nonce []byte `protobuf:"bytes,3,opt,name=Nonce,proto3" json:"Nonce,omitempty"` + Err string `protobuf:"bytes,4,opt,name=Err,proto3" json:"Err,omitempty"` + StorageRadius uint32 `protobuf:"varint,5,opt,name=StorageRadius,proto3" json:"StorageRadius,omitempty"` } func (m *Receipt) Reset() { *m = Receipt{} } @@ -150,6 +151,13 @@ func (m *Receipt) GetErr() string { return "" } +func (m *Receipt) GetStorageRadius() uint32 { + if m != nil { + return m.StorageRadius + } + return 0 +} + func init() { proto.RegisterType((*Delivery)(nil), "pushsync.Delivery") proto.RegisterType((*Receipt)(nil), "pushsync.Receipt") @@ -158,20 +166,21 @@ func init() { func init() { proto.RegisterFile("pushsync.proto", fileDescriptor_723cf31bfc02bfd6) } var fileDescriptor_723cf31bfc02bfd6 = []byte{ - // 197 bytes of a gzipped FileDescriptorProto + // 224 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x28, 0x2d, 0xce, 0x28, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0x95, 0xfc, 0xb8, 0x38, 0x5c, 0x52, 0x73, 0x32, 0xcb, 0x52, 0x8b, 0x2a, 0x85, 0x24, 0xb8, 0xd8, 0x1d, 0x53, 0x52, 0x8a, 0x52, 0x8b, 0x8b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x78, 0x82, 0x60, 0x5c, 0x21, 0x21, 0x2e, 0x16, 0x97, 0xc4, 0x92, 0x44, 0x09, 0x26, 0xb0, 0x30, 0x98, 0x2d, 0x24, 0xc2, 0xc5, 0x1a, - 0x5c, 0x92, 0x98, 0x5b, 0x20, 0xc1, 0x0c, 0x16, 0x84, 0x70, 0x94, 0x32, 0xb9, 0xd8, 0x83, 0x52, - 0x93, 0x53, 0x33, 0x0b, 0x4a, 0xf0, 0x18, 0x27, 0xc3, 0xc5, 0x19, 0x9c, 0x99, 0x9e, 0x97, 0x58, - 0x52, 0x5a, 0x94, 0x0a, 0x35, 0x13, 0x21, 0x00, 0x32, 0xd8, 0x2f, 0x3f, 0x2f, 0x39, 0x15, 0x66, - 0x30, 0x98, 0x23, 0x24, 0xc0, 0xc5, 0xec, 0x5a, 0x54, 0x24, 0xc1, 0xa2, 0xc0, 0xa8, 0xc1, 0x19, - 0x04, 0x62, 0x3a, 0xc9, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, - 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x14, 0x53, - 0x41, 0x52, 0x12, 0x1b, 0xd8, 0xa7, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xbe, 0xdb, 0x14, - 0x12, 0xfb, 0x00, 0x00, 0x00, + 0x5c, 0x92, 0x98, 0x5b, 0x20, 0xc1, 0x0c, 0x16, 0x84, 0x70, 0x94, 0xfa, 0x19, 0xb9, 0xd8, 0x83, + 0x52, 0x93, 0x53, 0x33, 0x0b, 0x4a, 0xf0, 0x98, 0x27, 0xc3, 0xc5, 0x19, 0x9c, 0x99, 0x9e, 0x97, + 0x58, 0x52, 0x5a, 0x94, 0x0a, 0x35, 0x14, 0x21, 0x00, 0x32, 0xd9, 0x2f, 0x3f, 0x2f, 0x39, 0x15, + 0x66, 0x32, 0x98, 0x23, 0x24, 0xc0, 0xc5, 0xec, 0x5a, 0x54, 0x24, 0xc1, 0xa2, 0xc0, 0xa8, 0xc1, + 0x19, 0x04, 0x62, 0x0a, 0xa9, 0x70, 0xf1, 0x06, 0x97, 0xe4, 0x17, 0x25, 0xa6, 0xa7, 0x06, 0x25, + 0xa6, 0x64, 0x96, 0x16, 0x4b, 0xb0, 0x2a, 0x30, 0x6a, 0xf0, 0x06, 0xa1, 0x0a, 0x3a, 0xc9, 0x9c, + 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x13, 0x1e, 0xcb, 0x31, + 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x14, 0x53, 0x41, 0x52, 0x12, 0x1b, 0x38, + 0x40, 0x8c, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x80, 0xc6, 0x9e, 0x01, 0x22, 0x01, 0x00, 0x00, } func (m *Delivery) Marshal() (dAtA []byte, err error) { @@ -238,6 +247,11 @@ func (m *Receipt) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.StorageRadius != 0 { + i = encodeVarintPushsync(dAtA, i, uint64(m.StorageRadius)) + i-- + dAtA[i] = 0x28 + } if len(m.Err) > 0 { i -= len(m.Err) copy(dAtA[i:], m.Err) @@ -323,6 +337,9 @@ func (m *Receipt) Size() (n int) { if l > 0 { n += 1 + l + sovPushsync(uint64(l)) } + if m.StorageRadius != 0 { + n += 1 + sovPushsync(uint64(m.StorageRadius)) + } return n } @@ -650,6 +667,25 @@ func (m *Receipt) Unmarshal(dAtA []byte) error { } m.Err = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StorageRadius", wireType) + } + m.StorageRadius = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPushsync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StorageRadius |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipPushsync(dAtA[iNdEx:]) diff --git a/pkg/pushsync/pb/pushsync.proto b/pkg/pushsync/pb/pushsync.proto index e76c510902b..8fc293dca88 100644 --- a/pkg/pushsync/pb/pushsync.proto +++ b/pkg/pushsync/pb/pushsync.proto @@ -19,4 +19,5 @@ message Receipt { bytes Signature = 2; bytes Nonce = 3; string Err = 4; + uint32 StorageRadius = 5; } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index f2e4f00c95d..9aaba824f98 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -96,6 +96,8 @@ type PushSync struct { fullNode bool errSkip *skippeers.List warmupPeriod time.Time + + shallowReceiptTolerance int } type receiptResult struct { @@ -123,26 +125,28 @@ func New( signer crypto.Signer, tracer *tracing.Tracer, warmupTime time.Duration, + shallowReceiptTolerance int, ) *PushSync { ps := &PushSync{ - address: address, - radius: radius, - networkID: networkID, - nonce: nonce, - streamer: streamer, - store: store, - topologyDriver: topology, - fullNode: fullNode, - unwrap: unwrap, - gsocHandler: gsocHandler, - logger: logger.WithName(loggerName).Register(), - accounting: accounting, - pricer: pricer, - metrics: newMetrics(), - tracer: tracer, - signer: signer, - errSkip: skippeers.NewList(), - warmupPeriod: time.Now().Add(warmupTime), + address: address, + radius: radius, + networkID: networkID, + nonce: nonce, + streamer: streamer, + store: store, + topologyDriver: topology, + fullNode: fullNode, + unwrap: unwrap, + gsocHandler: gsocHandler, + logger: logger.WithName(loggerName).Register(), + accounting: accounting, + pricer: pricer, + metrics: newMetrics(), + tracer: tracer, + signer: signer, + errSkip: skippeers.NewList(), + warmupPeriod: time.Now().Add(warmupTime), + shallowReceiptTolerance: shallowReceiptTolerance, } ps.validStamp = ps.validStampWrapper(validStamp) @@ -236,6 +240,11 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) price := ps.pricer.Price(chunkAddress) + rad, err := ps.radius() + if err != nil { + return fmt.Errorf("pushsync: storage radius: %w", err) + } + store := func(ctx context.Context) error { ps.metrics.Storer.Inc() @@ -263,7 +272,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) attemptedWrite = true - receipt := pb.Receipt{Address: chunkToPut.Address().Bytes(), Signature: signature, Nonce: ps.nonce} + receipt := pb.Receipt{Address: chunkToPut.Address().Bytes(), Signature: signature, Nonce: ps.nonce, StorageRadius: uint32(rad)} if err := w.WriteMsgWithContext(ctx, &receipt); err != nil { return fmt.Errorf("send receipt to peer %s: %w", p.Address.String(), err) } @@ -271,11 +280,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) return debit.Apply() } - rad, err := ps.radius() - if err != nil { - return fmt.Errorf("pushsync: storage radius: %w", err) - } - if ps.topologyDriver.IsReachable() && swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) >= rad { stored, reason = true, "is within AOR" return store(ctx) @@ -566,7 +570,7 @@ func (ps *PushSync) checkReceipt(receipt *pb.Receipt) error { return fmt.Errorf("pushsync: storage radius: %w", err) } - if po < d { + if po < (d-uint8(ps.shallowReceiptTolerance)) || uint32(po) < receipt.StorageRadius { ps.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() ps.metrics.ShallowReceipt.Inc() ps.logger.Debug("shallow receipt", "chunk_address", addr, "peer_address", peer, "proximity_order", po) diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index d1cc75e14df..218cfcd9e7a 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -221,13 +221,13 @@ func TestShallowReceipt(t *testing.T) { // peer is the node responding to the chunk receipt message // mock should return ErrWantSelf since there's no one to forward to - psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) // pivot node needs the streamer since the chunk is intercepted by // the chunk worker, then gets sent by opening a new stream - psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, mock.WithClosestPeer(closestPeer)) + psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeer(closestPeer)) // Trigger the sending of chunk to the closest node receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) @@ -246,6 +246,67 @@ func TestShallowReceipt(t *testing.T) { waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) } +// TestShallowReceiptTolerance sends back a shallow receipt but because of the tolerance level, the origin node accepts the receipts. +func TestShallowReceiptTolerance(t *testing.T) { + t.Parallel() + + key, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + + signer := crypto.NewDefaultSigner(key) + + pubKey, err := signer.PublicKey() + if err != nil { + t.Fatal(err) + } + + closestPeer, err := crypto.NewOverlayAddress(*pubKey, 1, blockHash.Bytes()) + if err != nil { + t.Fatal(err) + } + + storerRadius := 2 + chunkProximity := 2 + + pivotRadius := 4 + pivotTolerance := 2 + + // create a pivot node and a mocked closest node + pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") + + chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, chunkProximity) + + // peer is the node responding to the chunk receipt message + // mock should return ErrWantSelf since there's no one to forward to + psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) + + recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) + + // pivot node needs the streamer since the chunk is intercepted by + // the chunk worker, then gets sent by opening a new stream + psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, nil, uint8(pivotRadius), pivotTolerance, mock.WithClosestPeer(closestPeer)) + + // Trigger the sending of chunk to the closest node + receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) + if !chunk.Address().Equal(receipt.Address) { + t.Fatal("invalid receipt") + } + if err != nil { + t.Fatalf("got %v, want %v", err, nil) + } + if got := swarm.Proximity(receipt.Address.Bytes(), closestPeer.Bytes()); got < uint8(chunkProximity) { + t.Fatalf("got %v, want at least %v", got, chunkProximity) + } + + // this intercepts the outgoing delivery message + waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), chunk.Data()) + + // this intercepts the incoming receipt message + waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) +} + // PushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective. // it also checks whether the tags are incremented properly if they are present func TestPushChunkToClosest(t *testing.T) { @@ -862,6 +923,7 @@ func createPushSyncNodeWithRadius( unwrap func(swarm.Chunk), signer crypto.Signer, radius uint8, + shallowReceiptTolerance int, mockOpts ...mock.Option, ) (*pushsync.PushSync, *testStorer) { t.Helper() @@ -884,7 +946,7 @@ func createPushSyncNodeWithRadius( radiusFunc := func() (uint8, error) { return radius, nil } - ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(*soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, -1) + ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(*soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, -1, shallowReceiptTolerance) t.Cleanup(func() { ps.Close() }) return ps, storer @@ -925,7 +987,7 @@ func createPushSyncNodeWithAccounting( radiusFunc := func() (uint8, error) { return 0, nil } - ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, gsocListener, validStamp, logger, acct, mockPricer, signer, nil, -1) + ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, gsocListener, validStamp, logger, acct, mockPricer, signer, nil, -1, 0) t.Cleanup(func() { ps.Close() }) return ps, storer diff --git a/pkg/storage/testing/chunk.go b/pkg/storage/testing/chunk.go index c5e83f845aa..4ddfdae2dc6 100644 --- a/pkg/storage/testing/chunk.go +++ b/pkg/storage/testing/chunk.go @@ -94,7 +94,7 @@ func GenerateTestRandomChunkAt(tb testing.TB, target swarm.Address, po int) swar } -// GenerateTestRandomChunkAt generates an invalid (!) chunk with address of proximity order po wrt target. +// GenerateTestRandomChunkAt generates an valid chunk with address of proximity order po wrt target. func GenerateValidRandomChunkAt(tb testing.TB, target swarm.Address, po int) swarm.Chunk { tb.Helper() From 1ac6bdbf7efd19792a3ea485be6cf6d02b1dfba7 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 28 Jan 2025 00:29:36 +0300 Subject: [PATCH 04/13] fix: forwarded nodes do not check shallow receipt --- pkg/pushsync/pushsync.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 9aaba824f98..13eb302a49c 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -289,8 +289,6 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) case errors.Is(err, topology.ErrWantSelf): stored, reason = true, "want self" return store(ctx) - case errors.Is(err, ErrShallowReceipt): - fallthrough case err == nil: ps.metrics.Forwarder.Inc() @@ -469,6 +467,11 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo ps.measurePushPeer(result.pushTime, result.err) if result.err == nil { + + if !origin { // forwarder nodes do not need to check the receipt + return result.receipt, nil + } + switch err := ps.checkReceipt(result.receipt); { case err == nil: return result.receipt, nil From 8e853b9b02d55efb9d2ab114f1a8664ffb893fd7 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 28 Jan 2025 00:51:19 +0300 Subject: [PATCH 05/13] fix: bump version --- pkg/pushsync/pushsync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 13eb302a49c..526ec6fb1c2 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -38,7 +38,7 @@ const loggerName = "pushsync" const ( protocolName = "pushsync" - protocolVersion = "1.3.0" + protocolVersion = "1.4.0" streamName = "pushsync" ) From ee85784930e41b92e01391fe6ee7588fd62b9771 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 29 Jan 2025 02:18:08 +0300 Subject: [PATCH 06/13] perf: kad less work --- pkg/topology/kademlia/kademlia.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index a310e47b6c6..d0ef7b8866b 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -1279,7 +1279,15 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, filter topology. closest = k.base } - err := k.EachConnectedPeerRev(func(peer swarm.Address, po uint8) (bool, bool, error) { + prox := swarm.Proximity(k.base.Bytes(), addr.Bytes()) + + // iterate starting from bin 0 to the maximum bin + err := k.EachConnectedPeerRev(func(peer swarm.Address, bin uint8) (bool, bool, error) { + + if bin > prox && !closest.IsZero() { + return true, false, nil + } + if swarm.ContainsAddress(skipPeers, peer) { return false, false, nil } @@ -1317,9 +1325,9 @@ func (k *Kad) ClosestPeer(addr swarm.Address, includeSelf bool, filter topology. // EachConnectedPeer implements topology.PeerIterator interface. func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) error { - filters := excludeFromIterator(filter) + excludeFunc := k.opt.ExcludeFunc(excludeFromIterator(filter)...) return k.connectedPeers.EachBin(func(addr swarm.Address, po uint8) (bool, bool, error) { - if k.opt.ExcludeFunc(filters...)(addr) { + if excludeFunc(addr) { return false, false, nil } return f(addr, po) @@ -1328,9 +1336,9 @@ func (k *Kad) EachConnectedPeer(f topology.EachPeerFunc, filter topology.Select) // EachConnectedPeerRev implements topology.PeerIterator interface. func (k *Kad) EachConnectedPeerRev(f topology.EachPeerFunc, filter topology.Select) error { - filters := excludeFromIterator(filter) + excludeFunc := k.opt.ExcludeFunc(excludeFromIterator(filter)...) return k.connectedPeers.EachBinRev(func(addr swarm.Address, po uint8) (bool, bool, error) { - if k.opt.ExcludeFunc(filters...)(addr) { + if excludeFunc(addr) { return false, false, nil } return f(addr, po) @@ -1397,7 +1405,6 @@ func (k *Kad) SubscribeTopologyChange() (c <-chan struct{}, unsubscribe func()) func excludeFromIterator(filter topology.Select) []im.ExcludeOp { ops := make([]im.ExcludeOp, 0, 3) - ops = append(ops, im.Bootnode()) if filter.Reachable { From 79bb702057f152cfc8e756f8a7632d1160af6ece Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 29 Jan 2025 02:18:49 +0300 Subject: [PATCH 07/13] revert: version bump --- pkg/pushsync/pushsync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 526ec6fb1c2..13eb302a49c 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -38,7 +38,7 @@ const loggerName = "pushsync" const ( protocolName = "pushsync" - protocolVersion = "1.4.0" + protocolVersion = "1.3.0" streamName = "pushsync" ) From e7922827d85189294f6d89ef61b09b2f9ec78b9f Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 29 Jan 2025 02:44:19 +0300 Subject: [PATCH 08/13] fix: tag --- pkg/pusher/pusher.go | 4 ++-- pkg/pushsync/pushsync.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index b03bd670aed..85d2deb1788 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -249,7 +249,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID()) if !ok || err != nil { - logger.Warning( + loggerV1.Warning( "stamp is no longer valid, skipping syncing for chunk", "batch_id", hex.EncodeToString(op.Chunk.Stamp().BatchID()), "chunk_address", op.Chunk.Address(), @@ -309,7 +309,7 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err ok, err := s.batchExist.Exists(op.Chunk.Stamp().BatchID()) if !ok || err != nil { - logger.Warning( + loggerV1.Warning( "stamp is no longer valid, skipping direct upload for chunk", "batch_id", hex.EncodeToString(op.Chunk.Stamp().BatchID()), "chunk_address", op.Chunk.Address(), diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 13eb302a49c..2b63b2e214d 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -618,7 +618,7 @@ func (ps *PushSync) pushChunkToPeer(ctx context.Context, peer swarm.Address, ch // if the chunk has a tag, then it's from a local deferred upload if ch.TagID() != 0 { err = ps.store.Report(ctx, ch, storage.ChunkSent) - if err != nil { + if err != nil && !errors.Is(err, storage.ErrNotFound) { err = fmt.Errorf("tag %d increment: %w", ch.TagID(), err) return } From 662536f25b7f51fb0488b3b4dcee20e3522a70df Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 29 Jan 2025 04:05:36 +0300 Subject: [PATCH 09/13] ci: custom branch --- .github/workflows/beekeeper.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/beekeeper.yml b/.github/workflows/beekeeper.yml index a2e5c93639d..1b7346951cf 100644 --- a/.github/workflows/beekeeper.yml +++ b/.github/workflows/beekeeper.yml @@ -14,7 +14,7 @@ env: SETUP_CONTRACT_IMAGE: "ethersphere/bee-localchain" SETUP_CONTRACT_IMAGE_TAG: "0.9.2" BEELOCAL_BRANCH: "main" - BEEKEEPER_BRANCH: "master" + BEEKEEPER_BRANCH: "pss-fix" BEEKEEPER_METRICS_ENABLED: false REACHABILITY_OVERRIDE_PUBLIC: true BATCHFACTOR_OVERRIDE_PUBLIC: 2 From e25b6c32f438b13d2cac1e0d77f5c6031bc17bfa Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 29 Jan 2025 22:13:03 +0300 Subject: [PATCH 10/13] fix: undeflow uint8 --- .github/workflows/beekeeper.yml | 10 +++++----- pkg/node/node.go | 2 +- pkg/pushsync/pushsync.go | 15 ++++++++++----- pkg/pushsync/pushsync_test.go | 4 ++-- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/.github/workflows/beekeeper.yml b/.github/workflows/beekeeper.yml index 1b7346951cf..dd2c9ce4e64 100644 --- a/.github/workflows/beekeeper.yml +++ b/.github/workflows/beekeeper.yml @@ -132,11 +132,11 @@ jobs: - name: Test fullconnectivity id: fullconnectivity run: timeout ${TIMEOUT} bash -c 'until beekeeper check --cluster-name local-dns --checks=ci-full-connectivity; do echo "waiting for full connectivity..."; sleep .3; done' - - name: Test settlements - id: settlements - run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-settlements - - name: Sleep for time allowance to replenish - run: sleep 2 + # - name: Test settlements + # id: settlements + # run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-settlements + # - name: Sleep for time allowance to replenish + # run: sleep 2 - name: Test pss id: pss run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-pss diff --git a/pkg/node/node.go b/pkg/node/node.go index e9f327edbb3..44e600e85d1 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -961,7 +961,7 @@ func NewBee( } } - pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, warmupTime, shallowReceiptTolerance) + pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, warmupTime, uint8(shallowReceiptTolerance)) b.pushSyncCloser = pushSyncProtocol // set the pushSyncer in the PSS diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 2b63b2e214d..5db434059dd 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -97,7 +97,7 @@ type PushSync struct { errSkip *skippeers.List warmupPeriod time.Time - shallowReceiptTolerance int + shallowReceiptTolerance uint8 } type receiptResult struct { @@ -125,7 +125,7 @@ func New( signer crypto.Signer, tracer *tracing.Tracer, warmupTime time.Duration, - shallowReceiptTolerance int, + shallowReceiptTolerance uint8, ) *PushSync { ps := &PushSync{ address: address, @@ -568,15 +568,20 @@ func (ps *PushSync) checkReceipt(receipt *pb.Receipt) error { po := swarm.Proximity(addr.Bytes(), peer.Bytes()) - d, err := ps.radius() + r, err := ps.radius() if err != nil { return fmt.Errorf("pushsync: storage radius: %w", err) } - if po < (d-uint8(ps.shallowReceiptTolerance)) || uint32(po) < receipt.StorageRadius { + var tolerance uint8 + if r >= ps.shallowReceiptTolerance { // check for underflow of uint8 + tolerance = r - ps.shallowReceiptTolerance + } + + if po < tolerance || uint32(po) < receipt.StorageRadius { ps.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() ps.metrics.ShallowReceipt.Inc() - ps.logger.Debug("shallow receipt", "chunk_address", addr, "peer_address", peer, "proximity_order", po) + ps.logger.Debug("shallow receipt", "chunk_address", addr, "peer_address", peer, "proximity_order", po, "peer_radius", receipt.StorageRadius, "self_radius", r) return ErrShallowReceipt } diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 218cfcd9e7a..c41c301fe35 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -271,7 +271,7 @@ func TestShallowReceiptTolerance(t *testing.T) { chunkProximity := 2 pivotRadius := 4 - pivotTolerance := 2 + pivotTolerance := uint8(2) // create a pivot node and a mocked closest node pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") @@ -923,7 +923,7 @@ func createPushSyncNodeWithRadius( unwrap func(swarm.Chunk), signer crypto.Signer, radius uint8, - shallowReceiptTolerance int, + shallowReceiptTolerance uint8, mockOpts ...mock.Option, ) (*pushsync.PushSync, *testStorer) { t.Helper() From a43e8905037e1ba27a2637ba6ea41e5ab278cde5 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 29 Jan 2025 22:27:58 +0300 Subject: [PATCH 11/13] ci: beekeeper master --- .github/workflows/beekeeper.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/beekeeper.yml b/.github/workflows/beekeeper.yml index dd2c9ce4e64..a2e5c93639d 100644 --- a/.github/workflows/beekeeper.yml +++ b/.github/workflows/beekeeper.yml @@ -14,7 +14,7 @@ env: SETUP_CONTRACT_IMAGE: "ethersphere/bee-localchain" SETUP_CONTRACT_IMAGE_TAG: "0.9.2" BEELOCAL_BRANCH: "main" - BEEKEEPER_BRANCH: "pss-fix" + BEEKEEPER_BRANCH: "master" BEEKEEPER_METRICS_ENABLED: false REACHABILITY_OVERRIDE_PUBLIC: true BATCHFACTOR_OVERRIDE_PUBLIC: 2 @@ -132,11 +132,11 @@ jobs: - name: Test fullconnectivity id: fullconnectivity run: timeout ${TIMEOUT} bash -c 'until beekeeper check --cluster-name local-dns --checks=ci-full-connectivity; do echo "waiting for full connectivity..."; sleep .3; done' - # - name: Test settlements - # id: settlements - # run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-settlements - # - name: Sleep for time allowance to replenish - # run: sleep 2 + - name: Test settlements + id: settlements + run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-settlements + - name: Sleep for time allowance to replenish + run: sleep 2 - name: Test pss id: pss run: timeout ${TIMEOUT} beekeeper check --cluster-name local-dns --checks=ci-pss From 0eee54a909eb9dd3173fea6f954eae7a0bff9062 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 1 Feb 2025 04:16:28 +0300 Subject: [PATCH 12/13] perf: skippeers allocated buffer --- pkg/skippeers/skippeers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/skippeers/skippeers.go b/pkg/skippeers/skippeers.go index 743d383e10d..1b89c9c8e93 100644 --- a/pkg/skippeers/skippeers.go +++ b/pkg/skippeers/skippeers.go @@ -86,6 +86,7 @@ func (l *List) ChunkPeers(ch swarm.Address) (peers []swarm.Address) { now := time.Now().UnixNano() if p, ok := l.skip[ch.ByteString()]; ok { + peers = make([]swarm.Address, 0, len(p)) for peer, exp := range p { if exp > now { peers = append(peers, swarm.NewAddress([]byte(peer))) From 983b62dfa63e3951b6e0b3367fcadc25a7e2a67a Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 3 Feb 2025 14:58:21 +0300 Subject: [PATCH 13/13] chore: bump versions --- openapi/Swarm.yaml | 2 +- openapi/SwarmCommon.yaml | 2 +- pkg/pushsync/pushsync.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 8f765d2c066..a62033b9937 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -1,7 +1,7 @@ openapi: 3.0.3 info: - version: 7.2.0 + version: 7.3.0 title: Bee API description: "A list of the currently provided Interfaces to interact with the swarm, implementing file operations and sending messages" diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index dcd54c3853a..2d8024caa82 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1,6 +1,6 @@ openapi: 3.0.3 info: - version: 4.2.0 + version: 4.3.0 title: Common Data Types description: | \*****bzzz***** diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 5db434059dd..37165d6a481 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -38,7 +38,7 @@ const loggerName = "pushsync" const ( protocolName = "pushsync" - protocolVersion = "1.3.0" + protocolVersion = "1.3.1" streamName = "pushsync" )