diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index f871006c7e2..dcd54c3853a 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -928,6 +928,8 @@ components: type: boolean lastSyncedBlock: type: integer + committedDepth: + type: integer StatusResponse: type: object diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go index 596327783e6..08167adbd72 100644 --- a/pkg/api/api_test.go +++ b/pkg/api/api_test.go @@ -712,7 +712,6 @@ func createRedistributionAgentService( tranService, &mockHealth{}, log.Noop, - 0, ) } diff --git a/pkg/api/status.go b/pkg/api/status.go index fdd6f0cc584..804cd94aef5 100644 --- a/pkg/api/status.go +++ b/pkg/api/status.go @@ -30,6 +30,7 @@ type statusSnapshotResponse struct { BatchCommitment uint64 `json:"batchCommitment"` IsReachable bool `json:"isReachable"` LastSyncedBlock uint64 `json:"lastSyncedBlock"` + CommittedDepth uint8 `json:"committedDepth"` } type statusResponse struct { @@ -94,6 +95,7 @@ func (s *Service) statusGetHandler(w http.ResponseWriter, _ *http.Request) { BatchCommitment: ss.BatchCommitment, IsReachable: ss.IsReachable, LastSyncedBlock: ss.LastSyncedBlock, + CommittedDepth: uint8(ss.CommittedDepth), }) } @@ -141,6 +143,7 @@ func (s *Service) statusGetPeersHandler(w http.ResponseWriter, r *http.Request) snapshot.BatchCommitment = ss.BatchCommitment snapshot.IsReachable = ss.IsReachable snapshot.LastSyncedBlock = ss.LastSyncedBlock + snapshot.CommittedDepth = uint8(ss.CommittedDepth) } mu.Lock() diff --git a/pkg/api/status_test.go b/pkg/api/status_test.go index 654e94708a8..0ef4dc6c95c 100644 --- a/pkg/api/status_test.go +++ b/pkg/api/status_test.go @@ -40,6 +40,7 @@ func TestGetStatus(t *testing.T) { BatchCommitment: 1, IsReachable: true, LastSyncedBlock: 6092500, + CommittedDepth: 1, } ssMock := &statusSnapshotMock{ @@ -49,6 +50,7 @@ func TestGetStatus(t *testing.T) { storageRadius: ssr.StorageRadius, commitment: ssr.BatchCommitment, chainState: &postage.ChainState{Block: ssr.LastSyncedBlock}, + committedDepth: ssr.CommittedDepth, } statusSvc := status.NewService( @@ -122,6 +124,7 @@ type statusSnapshotMock struct { commitment uint64 chainState *postage.ChainState neighborhoods []*storer.NeighborhoodStat + committedDepth uint8 } func (m *statusSnapshotMock) SyncRate() float64 { return m.syncRate } @@ -135,3 +138,4 @@ func (m *statusSnapshotMock) ReserveSizeWithinRadius() uint64 { func (m *statusSnapshotMock) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) { return m.neighborhoods, nil } +func (m *statusSnapshotMock) CommittedDepth() uint8 { return m.committedDepth } diff --git a/pkg/node/node.go b/pkg/node/node.go index 09f24712056..bd68ada5f26 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -907,7 +907,7 @@ func NewBee( return nil, fmt.Errorf("status service: %w", err) } - saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile, uint8(o.ReserveCapacityDoubling)) + saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile) b.saludCloser = saludService rC, unsub := saludService.SubscribeNetworkStorageRadius() @@ -1086,7 +1086,6 @@ func NewBee( transactionService, saludService, logger, - uint8(o.ReserveCapacityDoubling), ) if err != nil { return nil, fmt.Errorf("storage incentives agent: %w", err) diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index 397362499b4..d47abf3fd76 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -52,8 +52,6 @@ type service struct { radiusSubsMtx sync.Mutex radiusC []chan uint8 - - capacityDoubling uint8 } func New( @@ -66,20 +64,18 @@ func New( minPeersPerbin int, durPercentile float64, connsPercentile float64, - capacityDoubling uint8, ) *service { metrics := newMetrics() s := &service{ - quit: make(chan struct{}), - logger: logger.WithName(loggerName).Register(), - status: status, - topology: topology, - metrics: metrics, - isSelfHealthy: atomic.NewBool(true), - reserve: reserve, - capacityDoubling: capacityDoubling, + quit: make(chan struct{}), + logger: logger.WithName(loggerName).Register(), + status: status, + topology: topology, + metrics: metrics, + isSelfHealthy: atomic.NewBool(true), + reserve: reserve, } s.wg.Add(1) @@ -173,7 +169,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, return } - networkRadius, nHoodRadius := s.radius(peers) + networkRadius, nHoodRadius := s.committedDepth(peers) avgDur := totaldur / float64(len(peers)) pDur := percentileDur(peers, durPercentile) pConns := percentileConns(peers, connsPercentile) @@ -199,8 +195,8 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, continue } - if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-2) { - s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr) + if networkRadius > 0 && peer.status.CommittedDepth < uint32(networkRadius-2) { + s.logger.Debug("radius health failure", "radius", peer.status.CommittedDepth, "peer_address", peer.addr) } else if peer.dur.Seconds() > pDur { s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr) } else if peer.status.ConnectedPeers < pConns { @@ -220,12 +216,10 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64, } } - networkRadiusEstimation := s.reserve.StorageRadius() + s.capacityDoubling - selfHealth := true - if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius { + if nHoodRadius == networkRadius && s.reserve.CommittedDepth() != networkRadius { selfHealth = false - s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius) + s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.CommittedDepth(), "network_radius", networkRadius) } s.isSelfHealthy.Store(selfHealth) @@ -294,24 +288,24 @@ func percentileConns(peers []peer, p float64) uint64 { } // radius finds the most common radius. -func (s *service) radius(peers []peer) (uint8, uint8) { +func (s *service) committedDepth(peers []peer) (uint8, uint8) { - var networkRadius [swarm.MaxBins]int - var nHoodRadius [swarm.MaxBins]int + var networkDepth [swarm.MaxBins]int + var nHoodDepth [swarm.MaxBins]int for _, peer := range peers { - if peer.status.StorageRadius < uint32(swarm.MaxBins) { + if peer.status.CommittedDepth < uint32(swarm.MaxBins) { if peer.neighbor { - nHoodRadius[peer.status.StorageRadius]++ + nHoodDepth[peer.status.CommittedDepth]++ } - networkRadius[peer.status.StorageRadius]++ + networkDepth[peer.status.CommittedDepth]++ } } - networkR := maxIndex(networkRadius[:]) - hoodR := maxIndex(nHoodRadius[:]) + networkD := maxIndex(networkDepth[:]) + hoodD := maxIndex(nHoodDepth[:]) - return uint8(networkR), uint8(hoodR) + return uint8(networkD), uint8(hoodD) } // commitment finds the most common batch commitment. diff --git a/pkg/salud/salud_test.go b/pkg/salud/salud_test.go index 5fc4dda733d..e430bf1c868 100644 --- a/pkg/salud/salud_test.go +++ b/pkg/salud/salud_test.go @@ -31,28 +31,28 @@ func TestSalud(t *testing.T) { t.Parallel() peers := []peer{ // fully healhy - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true}, // healthy since radius >= most common radius - 2 - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 7}, 1, true}, // radius too low - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 5}, 1, false}, // dur too long - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false}, - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 2, false}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 2, false}, // connections not enough - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 90, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 90, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, false}, // commitment wrong - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 35, ReserveSize: 100}, 1, false}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 35, ReserveSize: 100, CommittedDepth: 8}, 1, false}, } statusM := &statusMock{make(map[string]peer)} @@ -66,11 +66,12 @@ func TestSalud(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) reserve := mockstorer.NewReserve( - mockstorer.WithRadius(8), + mockstorer.WithRadius(6), mockstorer.WithReserveSize(100), + mockstorer.WithCapacityDoubling(2), ) - service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0) + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) err := spinlock.Wait(time.Minute, func() bool { return len(topM.PeersHealth()) == len(peers) @@ -114,9 +115,10 @@ func TestSelfUnhealthyRadius(t *testing.T) { reserve := mockstorer.NewReserve( mockstorer.WithRadius(7), mockstorer.WithReserveSize(100), + mockstorer.WithCapacityDoubling(0), ) - service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0) + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { @@ -135,8 +137,8 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) { t.Parallel() peers := []peer{ // fully healhy - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true}, - {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", CommittedDepth: 8}, 0, true}, + {swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", CommittedDepth: 8}, 0, true}, } statusM := &statusMock{make(map[string]peer)} @@ -151,9 +153,10 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) { reserve := mockstorer.NewReserve( mockstorer.WithRadius(6), mockstorer.WithReserveSize(100), + mockstorer.WithCapacityDoubling(2), ) - service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 2) + service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8) testutil.CleanupCloser(t, service) err := spinlock.Wait(time.Minute, func() bool { @@ -183,7 +186,7 @@ func TestSubToRadius(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) c, unsub := service.SubscribeNetworkStorageRadius() t.Cleanup(unsub) @@ -216,7 +219,7 @@ func TestUnsub(t *testing.T) { topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...)) - service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0) + service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8) testutil.CleanupCloser(t, service) c, unsub := service.SubscribeNetworkStorageRadius() diff --git a/pkg/status/internal/pb/status.pb.go b/pkg/status/internal/pb/status.pb.go index fb057b77ae8..d5cd8454235 100644 --- a/pkg/status/internal/pb/status.pb.go +++ b/pkg/status/internal/pb/status.pb.go @@ -74,6 +74,7 @@ type Snapshot struct { IsReachable bool `protobuf:"varint,8,opt,name=IsReachable,proto3" json:"IsReachable,omitempty"` ReserveSizeWithinRadius uint64 `protobuf:"varint,9,opt,name=ReserveSizeWithinRadius,proto3" json:"ReserveSizeWithinRadius,omitempty"` LastSyncedBlock uint64 `protobuf:"varint,10,opt,name=LastSyncedBlock,proto3" json:"LastSyncedBlock,omitempty"` + CommittedDepth uint32 `protobuf:"varint,11,opt,name=CommittedDepth,proto3" json:"CommittedDepth,omitempty"` } func (m *Snapshot) Reset() { *m = Snapshot{} } @@ -179,6 +180,13 @@ func (m *Snapshot) GetLastSyncedBlock() uint64 { return 0 } +func (m *Snapshot) GetCommittedDepth() uint32 { + if m != nil { + return m.CommittedDepth + } + return 0 +} + func init() { proto.RegisterType((*Get)(nil), "status.Get") proto.RegisterType((*Snapshot)(nil), "status.Snapshot") @@ -187,27 +195,28 @@ func init() { func init() { proto.RegisterFile("status.proto", fileDescriptor_dfe4fce6682daf5b) } var fileDescriptor_dfe4fce6682daf5b = []byte{ - // 317 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0xd1, 0xbd, 0x4a, 0x03, 0x41, - 0x10, 0x07, 0xf0, 0x6c, 0xbe, 0x33, 0x26, 0x2a, 0xdb, 0xb8, 0x85, 0x1c, 0x47, 0x10, 0x39, 0x2c, - 0x6c, 0x6c, 0xac, 0x2f, 0x85, 0x08, 0x2a, 0x61, 0xaf, 0x10, 0xec, 0xf6, 0xee, 0x86, 0xdc, 0xe1, - 0x65, 0x37, 0xdc, 0x4e, 0x84, 0xf8, 0x14, 0x16, 0x3e, 0x94, 0x65, 0x4a, 0x4b, 0x49, 0x5e, 0x44, - 0xb2, 0x51, 0x48, 0x4e, 0x2c, 0xe7, 0xb7, 0xcb, 0xec, 0x7f, 0x67, 0xa0, 0x6f, 0x49, 0xd1, 0xdc, - 0x5e, 0xce, 0x4a, 0x43, 0x86, 0xb7, 0xb7, 0xd5, 0xb0, 0x05, 0x8d, 0x1b, 0xa4, 0xe1, 0x7b, 0x03, - 0xba, 0x91, 0x56, 0x33, 0x9b, 0x19, 0xe2, 0x3e, 0x1c, 0x48, 0xb4, 0x58, 0xbe, 0x60, 0x94, 0xbf, - 0xa2, 0x60, 0x3e, 0x0b, 0x9a, 0x72, 0x97, 0xf8, 0x10, 0xfa, 0xe3, 0x79, 0x51, 0xd8, 0x85, 0x4e, - 0xa4, 0x22, 0x14, 0x75, 0x9f, 0x05, 0x4c, 0xee, 0x19, 0x3f, 0x83, 0x41, 0x44, 0xa6, 0x54, 0x13, - 0x94, 0x2a, 0xcd, 0xe7, 0x56, 0x34, 0x7c, 0x16, 0x0c, 0xe4, 0x3e, 0xf2, 0x73, 0x38, 0x1c, 0x19, - 0xad, 0x31, 0x21, 0x4c, 0xc7, 0x88, 0xa5, 0x15, 0x4d, 0xf7, 0x5c, 0x45, 0xf9, 0x05, 0x1c, 0x3f, - 0x60, 0x3e, 0xc9, 0x62, 0x53, 0x66, 0xc6, 0xa4, 0x2e, 0x58, 0xcb, 0xdd, 0xfc, 0xe3, 0x5c, 0x40, - 0x27, 0x44, 0xbc, 0x37, 0x29, 0x8a, 0xb6, 0xcf, 0x82, 0x9e, 0xfc, 0x2d, 0x79, 0x00, 0x47, 0xa1, - 0xa2, 0x24, 0x1b, 0x99, 0xe9, 0x34, 0xa7, 0x29, 0x6a, 0x12, 0x1d, 0xd7, 0xa4, 0xca, 0x9b, 0x19, - 0xdc, 0x5a, 0x89, 0x2a, 0xc9, 0x54, 0x5c, 0xa0, 0xe8, 0xfa, 0x2c, 0xe8, 0xca, 0x5d, 0xe2, 0xd7, - 0x70, 0xb2, 0x33, 0x92, 0xc7, 0x9c, 0xb2, 0x5c, 0xff, 0xfc, 0xb4, 0xe7, 0x7a, 0xfe, 0x77, 0xbc, - 0x49, 0x71, 0xa7, 0x2c, 0x45, 0x0b, 0x9d, 0x60, 0x1a, 0x16, 0x26, 0x79, 0x16, 0xb0, 0x4d, 0x51, - 0xe1, 0xf0, 0xf4, 0x63, 0xe5, 0xb1, 0xe5, 0xca, 0x63, 0x5f, 0x2b, 0x8f, 0xbd, 0xad, 0xbd, 0xda, - 0x72, 0xed, 0xd5, 0x3e, 0xd7, 0x5e, 0xed, 0xa9, 0x3e, 0x8b, 0xe3, 0xb6, 0x5b, 0xe5, 0xd5, 0x77, - 0x00, 0x00, 0x00, 0xff, 0xff, 0x92, 0x33, 0xb8, 0x1e, 0xda, 0x01, 0x00, 0x00, + // 332 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0xd1, 0x4f, 0x4b, 0xc3, 0x30, + 0x18, 0x06, 0xf0, 0x65, 0xff, 0x97, 0x6d, 0x2a, 0xb9, 0x98, 0x83, 0x94, 0x32, 0x44, 0x8a, 0x07, + 0x2f, 0x5e, 0x3c, 0x77, 0x82, 0x08, 0x2a, 0x23, 0x3d, 0x08, 0xde, 0xd2, 0xf6, 0x65, 0x29, 0x76, + 0x49, 0x69, 0xde, 0x09, 0xf3, 0x53, 0xf8, 0x81, 0xfc, 0x00, 0x1e, 0x77, 0xf4, 0x28, 0xdb, 0x17, + 0x91, 0x65, 0x0a, 0x5b, 0xc5, 0xe3, 0xfb, 0x6b, 0x79, 0xf3, 0xe4, 0x09, 0x1d, 0x58, 0x94, 0x38, + 0xb7, 0x17, 0x45, 0x69, 0xd0, 0xb0, 0xf6, 0x76, 0x1a, 0xb5, 0x68, 0xe3, 0x06, 0x70, 0xf4, 0xde, + 0xa0, 0xdd, 0x48, 0xcb, 0xc2, 0x2a, 0x83, 0xcc, 0xa7, 0x7d, 0x01, 0x16, 0xca, 0x17, 0x88, 0xb2, + 0x57, 0xe0, 0xc4, 0x27, 0x41, 0x53, 0xec, 0x12, 0x1b, 0xd1, 0xc1, 0x64, 0x9e, 0xe7, 0x76, 0xa1, + 0x13, 0x21, 0x11, 0x78, 0xdd, 0x27, 0x01, 0x11, 0x7b, 0xc6, 0x4e, 0xe9, 0x30, 0x42, 0x53, 0xca, + 0x29, 0x08, 0x99, 0x66, 0x73, 0xcb, 0x1b, 0x3e, 0x09, 0x86, 0x62, 0x1f, 0xd9, 0x19, 0x3d, 0x18, + 0x1b, 0xad, 0x21, 0x41, 0x48, 0x27, 0x00, 0xa5, 0xe5, 0x4d, 0x77, 0x5c, 0x45, 0xd9, 0x39, 0x3d, + 0x7a, 0x80, 0x6c, 0xaa, 0x62, 0x53, 0x2a, 0x63, 0x52, 0x17, 0xac, 0xe5, 0xfe, 0xfc, 0xe3, 0x8c, + 0xd3, 0x4e, 0x08, 0x70, 0x6f, 0x52, 0xe0, 0x6d, 0x9f, 0x04, 0x3d, 0xf1, 0x3b, 0xb2, 0x80, 0x1e, + 0x86, 0x12, 0x13, 0x35, 0x36, 0xb3, 0x59, 0x86, 0x33, 0xd0, 0xc8, 0x3b, 0x6e, 0x49, 0x95, 0x37, + 0x1d, 0xdc, 0x5a, 0x01, 0x32, 0x51, 0x32, 0xce, 0x81, 0x77, 0x7d, 0x12, 0x74, 0xc5, 0x2e, 0xb1, + 0x2b, 0x7a, 0xbc, 0x53, 0xc9, 0x63, 0x86, 0x2a, 0xd3, 0x3f, 0x37, 0xed, 0xb9, 0x9d, 0xff, 0x7d, + 0xde, 0xa4, 0xb8, 0x93, 0x16, 0xa3, 0x85, 0x4e, 0x20, 0x0d, 0x73, 0x93, 0x3c, 0x73, 0xba, 0x4d, + 0x51, 0xe1, 0x6d, 0x3b, 0x9b, 0x4c, 0x08, 0xe9, 0x35, 0x14, 0xa8, 0x78, 0xdf, 0x95, 0x58, 0xd1, + 0xf0, 0xe4, 0x63, 0xe5, 0x91, 0xe5, 0xca, 0x23, 0x5f, 0x2b, 0x8f, 0xbc, 0xad, 0xbd, 0xda, 0x72, + 0xed, 0xd5, 0x3e, 0xd7, 0x5e, 0xed, 0xa9, 0x5e, 0xc4, 0x71, 0xdb, 0x3d, 0xf9, 0xe5, 0x77, 0x00, + 0x00, 0x00, 0xff, 0xff, 0x97, 0x7e, 0x47, 0xd4, 0x02, 0x02, 0x00, 0x00, } func (m *Get) Marshal() (dAtA []byte, err error) { @@ -253,6 +262,11 @@ func (m *Snapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.CommittedDepth != 0 { + i = encodeVarintStatus(dAtA, i, uint64(m.CommittedDepth)) + i-- + dAtA[i] = 0x58 + } if m.LastSyncedBlock != 0 { i = encodeVarintStatus(dAtA, i, uint64(m.LastSyncedBlock)) i-- @@ -371,6 +385,9 @@ func (m *Snapshot) Size() (n int) { if m.LastSyncedBlock != 0 { n += 1 + sovStatus(uint64(m.LastSyncedBlock)) } + if m.CommittedDepth != 0 { + n += 1 + sovStatus(uint64(m.CommittedDepth)) + } return n } @@ -658,6 +675,25 @@ func (m *Snapshot) Unmarshal(dAtA []byte) error { break } } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CommittedDepth", wireType) + } + m.CommittedDepth = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStatus + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.CommittedDepth |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStatus(dAtA[iNdEx:]) diff --git a/pkg/status/internal/pb/status.proto b/pkg/status/internal/pb/status.proto index 1cd76212b79..7885139fd9c 100644 --- a/pkg/status/internal/pb/status.proto +++ b/pkg/status/internal/pb/status.proto @@ -25,4 +25,5 @@ message Snapshot { bool IsReachable = 8; uint64 ReserveSizeWithinRadius = 9; uint64 LastSyncedBlock = 10; + uint32 CommittedDepth = 11; } diff --git a/pkg/status/status.go b/pkg/status/status.go index d8106b4e4fb..38cccc7bea1 100644 --- a/pkg/status/status.go +++ b/pkg/status/status.go @@ -22,7 +22,7 @@ const loggerName = "status" const ( protocolName = "status" - protocolVersion = "1.1.1" + protocolVersion = "1.1.2" streamName = "status" ) @@ -39,6 +39,7 @@ type Reserve interface { ReserveSize() int ReserveSizeWithinRadius() uint64 StorageRadius() uint8 + CommittedDepth() uint8 } type topologyDriver interface { @@ -86,12 +87,14 @@ func (s *Service) LocalSnapshot() (*Snapshot, error) { reserveSizeWithinRadius uint64 connectedPeers uint64 neighborhoodSize uint64 + committedDepth uint8 ) if s.reserve != nil { storageRadius = s.reserve.StorageRadius() reserveSize = uint64(s.reserve.ReserveSize()) reserveSizeWithinRadius = s.reserve.ReserveSizeWithinRadius() + committedDepth = s.reserve.CommittedDepth() } if s.sync != nil { @@ -128,6 +131,7 @@ func (s *Service) LocalSnapshot() (*Snapshot, error) { BatchCommitment: commitment, IsReachable: s.topologyDriver.IsReachable(), LastSyncedBlock: s.chainState.GetChainState().Block, + CommittedDepth: uint32(committedDepth), }, nil } diff --git a/pkg/status/status_test.go b/pkg/status/status_test.go index 019cdae578c..5c60e887959 100644 --- a/pkg/status/status_test.go +++ b/pkg/status/status_test.go @@ -33,6 +33,7 @@ func TestStatus(t *testing.T) { NeighborhoodSize: 1, IsReachable: true, LastSyncedBlock: 6092500, + CommittedDepth: 1, } sssMock := &statusSnapshotMock{want} @@ -203,3 +204,4 @@ func (m *statusSnapshotMock) GetChainState() *postage.ChainState { func (m *statusSnapshotMock) ReserveSizeWithinRadius() uint64 { return m.Snapshot.ReserveSizeWithinRadius } +func (m *statusSnapshotMock) CommittedDepth() uint8 { return uint8(m.Snapshot.CommittedDepth) } diff --git a/pkg/storageincentives/agent.go b/pkg/storageincentives/agent.go index 63e20b0c4bd..3be9ebb28ea 100644 --- a/pkg/storageincentives/agent.go +++ b/pkg/storageincentives/agent.go @@ -70,7 +70,6 @@ type Agent struct { chainStateGetter postage.ChainStateGetter commitLock sync.Mutex health Health - capacityDoubling uint8 } func New(overlay swarm.Address, @@ -90,7 +89,6 @@ func New(overlay swarm.Address, tranService transaction.Service, health Health, logger log.Logger, - capacityDoubling uint8, ) (*Agent, error) { a := &Agent{ overlay: overlay, @@ -106,7 +104,6 @@ func New(overlay swarm.Address, redistributionStatuser: redistributionStatuser, health: health, chainStateGetter: chainStateGetter, - capacityDoubling: capacityDoubling, } state, err := NewRedistributionState(logger, ethAddress, stateStore, erc20Service, tranService) @@ -394,14 +391,14 @@ func (a *Agent) handleClaim(ctx context.Context, round uint64) error { func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) { // minimum proximity between the achor and the stored chunks - commitedDepth := a.store.StorageRadius() + a.capacityDoubling + committedDepth := a.store.CommittedDepth() if a.state.IsFrozen() { a.logger.Info("skipping round because node is frozen") return false, nil } - isPlaying, err := a.contract.IsPlaying(ctx, commitedDepth) + isPlaying, err := a.contract.IsPlaying(ctx, committedDepth) if err != nil { a.metrics.ErrCheckIsPlaying.Inc() return false, err @@ -434,21 +431,21 @@ func (a *Agent) handleSample(ctx context.Context, round uint64) (bool, error) { } now := time.Now() - sample, err := a.makeSample(ctx, commitedDepth) + sample, err := a.makeSample(ctx, committedDepth) if err != nil { return false, err } dur := time.Since(now) a.metrics.SampleDuration.Set(dur.Seconds()) - a.logger.Info("produced sample", "hash", sample.ReserveSampleHash, "radius", commitedDepth, "round", round) + a.logger.Info("produced sample", "hash", sample.ReserveSampleHash, "radius", committedDepth, "round", round) a.state.SetSampleData(round, sample, dur) return true, nil } -func (a *Agent) makeSample(ctx context.Context, commitedDepth uint8) (SampleData, error) { +func (a *Agent) makeSample(ctx context.Context, committedDepth uint8) (SampleData, error) { salt, err := a.contract.ReserveSalt(ctx) if err != nil { return SampleData{}, err @@ -459,7 +456,7 @@ func (a *Agent) makeSample(ctx context.Context, commitedDepth uint8) (SampleData return SampleData{}, err } - rSample, err := a.store.ReserveSample(ctx, salt, commitedDepth, uint64(timeLimiter), a.minBatchBalance()) + rSample, err := a.store.ReserveSample(ctx, salt, committedDepth, uint64(timeLimiter), a.minBatchBalance()) if err != nil { return SampleData{}, err } @@ -473,7 +470,7 @@ func (a *Agent) makeSample(ctx context.Context, commitedDepth uint8) (SampleData Anchor1: salt, ReserveSampleItems: rSample.Items, ReserveSampleHash: sampleHash, - StorageRadius: commitedDepth, + StorageRadius: committedDepth, } return sample, nil diff --git a/pkg/storageincentives/agent_test.go b/pkg/storageincentives/agent_test.go index 0ae0eda22f9..e3f05147981 100644 --- a/pkg/storageincentives/agent_test.go +++ b/pkg/storageincentives/agent_test.go @@ -182,6 +182,7 @@ func createService( reserve := resMock.NewReserve( resMock.WithRadius(radius), resMock.WithSample(storer.RandSample(t, nil)), + resMock.WithCapacityDoubling(int(doubling)), ) return storageincentives.New( @@ -201,7 +202,6 @@ func createService( transactionmock.New(), &mockHealth{}, log.Noop, - doubling, ) } diff --git a/pkg/storer/mock/mockreserve.go b/pkg/storer/mock/mockreserve.go index 897403fe4ce..7c6a1642aba 100644 --- a/pkg/storer/mock/mockreserve.go +++ b/pkg/storer/mock/mockreserve.go @@ -146,6 +146,11 @@ func (s *ReserveStore) SetStorageRadius(r uint8) { s.radius = r s.mtx.Unlock() } +func (s *ReserveStore) CommittedDepth() uint8 { + s.mtx.Lock() + defer s.mtx.Unlock() + return s.radius + uint8(s.capacityDoubling) +} // IntervalChunks returns a set of chunk in a requested interval. func (s *ReserveStore) SubscribeBin(ctx context.Context, bin uint8, start uint64) (<-chan *storer.BinC, func(), <-chan error) { diff --git a/pkg/storer/mock/mockstorer.go b/pkg/storer/mock/mockstorer.go index 69e8630d846..6ab457ab759 100644 --- a/pkg/storer/mock/mockstorer.go +++ b/pkg/storer/mock/mockstorer.go @@ -220,6 +220,8 @@ func (m *mockStorer) ChunkStore() storage.ReadOnlyChunkStore { func (m *mockStorer) StorageRadius() uint8 { return 0 } +func (m *mockStorer) CommittedDepth() uint8 { return 0 } + func (m *mockStorer) IsWithinStorageRadius(_ swarm.Address) bool { return true } func (m *mockStorer) DebugInfo(_ context.Context) (storer.Info, error) { diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index ecbeb340588..3ae3e6df99d 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -416,6 +416,14 @@ func (db *DB) StorageRadius() uint8 { return db.reserve.Radius() } +func (db *DB) CommittedDepth() uint8 { + if db.reserve == nil { + return 0 + } + + return uint8(db.reserveOptions.capacityDoubling) + db.reserve.Radius() +} + func (db *DB) ReserveSize() int { if db.reserve == nil { return 0 @@ -506,21 +514,20 @@ type NeighborhoodStat struct { func (db *DB) NeighborhoodsStat(ctx context.Context) ([]*NeighborhoodStat, error) { radius := db.StorageRadius() - - responsibilityRadius := radius + uint8(db.reserveOptions.capacityDoubling) + committedDepth := db.CommittedDepth() prefixes := neighborhoodPrefixes(db.baseAddr, int(radius), db.reserveOptions.capacityDoubling) neighs := make([]*NeighborhoodStat, len(prefixes)) for i, n := range prefixes { neighs[i] = &NeighborhoodStat{ - Neighborhood: swarm.NewNeighborhood(n, responsibilityRadius), + Neighborhood: swarm.NewNeighborhood(n, committedDepth), ReserveSizeWithinRadius: 0, - Proximity: min(responsibilityRadius, swarm.Proximity(n.Bytes(), db.baseAddr.Bytes()))} + Proximity: min(committedDepth, swarm.Proximity(n.Bytes(), db.baseAddr.Bytes()))} } err := db.reserve.IterateChunksItems(0, func(ch *reserve.ChunkBinItem) (bool, error) { for _, n := range neighs { - if swarm.Proximity(ch.Address.Bytes(), n.Neighborhood.Bytes()) >= responsibilityRadius { + if swarm.Proximity(ch.Address.Bytes(), n.Neighborhood.Bytes()) >= committedDepth { n.ReserveSizeWithinRadius++ break } diff --git a/pkg/storer/sample.go b/pkg/storer/sample.go index 965e8e5b8ad..9ecd97423df 100644 --- a/pkg/storer/sample.go +++ b/pkg/storer/sample.go @@ -61,7 +61,7 @@ type Sample struct { func (db *DB) ReserveSample( ctx context.Context, anchor []byte, - commitedDepth uint8, + committedDepth uint8, consensusTime uint64, minBatchBalance *big.Int, ) (Sample, error) { @@ -98,7 +98,7 @@ func (db *DB) ReserveSample( }() err := db.reserve.IterateChunksItems(db.StorageRadius(), func(ch *reserve.ChunkBinItem) (bool, error) { - if swarm.Proximity(ch.Address.Bytes(), anchor) < commitedDepth { + if swarm.Proximity(ch.Address.Bytes(), anchor) < committedDepth { return false, nil } select { @@ -261,12 +261,12 @@ func (db *DB) ReserveSample( allStats.TotalDuration = time.Since(t) if err := g.Wait(); err != nil { - db.logger.Info("reserve sampler finished with error", "err", err, "duration", time.Since(t), "storage_radius", commitedDepth, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) + db.logger.Info("reserve sampler finished with error", "err", err, "duration", time.Since(t), "storage_radius", committedDepth, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) return Sample{}, fmt.Errorf("sampler: failed creating sample: %w", err) } - db.logger.Info("reserve sampler finished", "duration", time.Since(t), "storage_radius", commitedDepth, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) + db.logger.Info("reserve sampler finished", "duration", time.Since(t), "storage_radius", committedDepth, "consensus_time_ns", consensusTime, "stats", fmt.Sprintf("%+v", allStats)) return Sample{Stats: *allStats, Items: sampleItems}, nil } diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 2094596976f..2628807a24e 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -163,6 +163,7 @@ type ReserveStore interface { type RadiusChecker interface { IsWithinStorageRadius(addr swarm.Address) bool StorageRadius() uint8 + CommittedDepth() uint8 } // LocalStore is a read-only ChunkStore. It can be used to check if chunk is known