Skip to content

Commit

Permalink
feat: add commited depth field to status protocol (#4892)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinconic authored Nov 5, 2024
1 parent e161ee8 commit fb6c2e8
Show file tree
Hide file tree
Showing 18 changed files with 151 additions and 92 deletions.
2 changes: 2 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,8 @@ components:
type: boolean
lastSyncedBlock:
type: integer
committedDepth:
type: integer

StatusResponse:
type: object
Expand Down
1 change: 0 additions & 1 deletion pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ func createRedistributionAgentService(
tranService,
&mockHealth{},
log.Noop,
0,
)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type statusSnapshotResponse struct {
BatchCommitment uint64 `json:"batchCommitment"`
IsReachable bool `json:"isReachable"`
LastSyncedBlock uint64 `json:"lastSyncedBlock"`
CommittedDepth uint8 `json:"committedDepth"`
}

type statusResponse struct {
Expand Down Expand Up @@ -94,6 +95,7 @@ func (s *Service) statusGetHandler(w http.ResponseWriter, _ *http.Request) {
BatchCommitment: ss.BatchCommitment,
IsReachable: ss.IsReachable,
LastSyncedBlock: ss.LastSyncedBlock,
CommittedDepth: uint8(ss.CommittedDepth),
})
}

Expand Down Expand Up @@ -141,6 +143,7 @@ func (s *Service) statusGetPeersHandler(w http.ResponseWriter, r *http.Request)
snapshot.BatchCommitment = ss.BatchCommitment
snapshot.IsReachable = ss.IsReachable
snapshot.LastSyncedBlock = ss.LastSyncedBlock
snapshot.CommittedDepth = uint8(ss.CommittedDepth)
}

mu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestGetStatus(t *testing.T) {
BatchCommitment: 1,
IsReachable: true,
LastSyncedBlock: 6092500,
CommittedDepth: 1,
}

ssMock := &statusSnapshotMock{
Expand All @@ -49,6 +50,7 @@ func TestGetStatus(t *testing.T) {
storageRadius: ssr.StorageRadius,
commitment: ssr.BatchCommitment,
chainState: &postage.ChainState{Block: ssr.LastSyncedBlock},
committedDepth: ssr.CommittedDepth,
}

statusSvc := status.NewService(
Expand Down Expand Up @@ -122,6 +124,7 @@ type statusSnapshotMock struct {
commitment uint64
chainState *postage.ChainState
neighborhoods []*storer.NeighborhoodStat
committedDepth uint8
}

func (m *statusSnapshotMock) SyncRate() float64 { return m.syncRate }
Expand All @@ -135,3 +138,4 @@ func (m *statusSnapshotMock) ReserveSizeWithinRadius() uint64 {
func (m *statusSnapshotMock) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) {
return m.neighborhoods, nil
}
func (m *statusSnapshotMock) CommittedDepth() uint8 { return m.committedDepth }
3 changes: 1 addition & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ func NewBee(
return nil, fmt.Errorf("status service: %w", err)
}

saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile, uint8(o.ReserveCapacityDoubling))
saludService := salud.New(nodeStatus, kad, localStore, logger, warmupTime, api.FullMode.String(), salud.DefaultMinPeersPerBin, salud.DefaultDurPercentile, salud.DefaultConnsPercentile)
b.saludCloser = saludService

rC, unsub := saludService.SubscribeNetworkStorageRadius()
Expand Down Expand Up @@ -1086,7 +1086,6 @@ func NewBee(
transactionService,
saludService,
logger,
uint8(o.ReserveCapacityDoubling),
)
if err != nil {
return nil, fmt.Errorf("storage incentives agent: %w", err)
Expand Down
48 changes: 21 additions & 27 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ type service struct {

radiusSubsMtx sync.Mutex
radiusC []chan uint8

capacityDoubling uint8
}

func New(
Expand All @@ -66,20 +64,18 @@ func New(
minPeersPerbin int,
durPercentile float64,
connsPercentile float64,
capacityDoubling uint8,
) *service {

metrics := newMetrics()

s := &service{
quit: make(chan struct{}),
logger: logger.WithName(loggerName).Register(),
status: status,
topology: topology,
metrics: metrics,
isSelfHealthy: atomic.NewBool(true),
reserve: reserve,
capacityDoubling: capacityDoubling,
quit: make(chan struct{}),
logger: logger.WithName(loggerName).Register(),
status: status,
topology: topology,
metrics: metrics,
isSelfHealthy: atomic.NewBool(true),
reserve: reserve,
}

s.wg.Add(1)
Expand Down Expand Up @@ -173,7 +169,7 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
return
}

networkRadius, nHoodRadius := s.radius(peers)
networkRadius, nHoodRadius := s.committedDepth(peers)
avgDur := totaldur / float64(len(peers))
pDur := percentileDur(peers, durPercentile)
pConns := percentileConns(peers, connsPercentile)
Expand All @@ -199,8 +195,8 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
continue
}

if networkRadius > 0 && peer.status.StorageRadius < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.StorageRadius, "peer_address", peer.addr)
if networkRadius > 0 && peer.status.CommittedDepth < uint32(networkRadius-2) {
s.logger.Debug("radius health failure", "radius", peer.status.CommittedDepth, "peer_address", peer.addr)
} else if peer.dur.Seconds() > pDur {
s.logger.Debug("response duration below threshold", "duration", peer.dur, "peer_address", peer.addr)
} else if peer.status.ConnectedPeers < pConns {
Expand All @@ -220,12 +216,10 @@ func (s *service) salud(mode string, minPeersPerbin int, durPercentile float64,
}
}

networkRadiusEstimation := s.reserve.StorageRadius() + s.capacityDoubling

selfHealth := true
if nHoodRadius == networkRadius && networkRadiusEstimation != networkRadius {
if nHoodRadius == networkRadius && s.reserve.CommittedDepth() != networkRadius {
selfHealth = false
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", networkRadiusEstimation, "network_radius", networkRadius)
s.logger.Warning("node is unhealthy due to storage radius discrepancy", "self_radius", s.reserve.CommittedDepth(), "network_radius", networkRadius)
}

s.isSelfHealthy.Store(selfHealth)
Expand Down Expand Up @@ -294,24 +288,24 @@ func percentileConns(peers []peer, p float64) uint64 {
}

// radius finds the most common radius.
func (s *service) radius(peers []peer) (uint8, uint8) {
func (s *service) committedDepth(peers []peer) (uint8, uint8) {

var networkRadius [swarm.MaxBins]int
var nHoodRadius [swarm.MaxBins]int
var networkDepth [swarm.MaxBins]int
var nHoodDepth [swarm.MaxBins]int

for _, peer := range peers {
if peer.status.StorageRadius < uint32(swarm.MaxBins) {
if peer.status.CommittedDepth < uint32(swarm.MaxBins) {
if peer.neighbor {
nHoodRadius[peer.status.StorageRadius]++
nHoodDepth[peer.status.CommittedDepth]++
}
networkRadius[peer.status.StorageRadius]++
networkDepth[peer.status.CommittedDepth]++
}
}

networkR := maxIndex(networkRadius[:])
hoodR := maxIndex(nHoodRadius[:])
networkD := maxIndex(networkDepth[:])
hoodD := maxIndex(nHoodDepth[:])

return uint8(networkR), uint8(hoodR)
return uint8(networkD), uint8(hoodD)
}

// commitment finds the most common batch commitment.
Expand Down
43 changes: 23 additions & 20 deletions pkg/salud/salud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,28 @@ func TestSalud(t *testing.T) {
t.Parallel()
peers := []peer{
// fully healhy
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, true},

// healthy since radius >= most common radius - 2
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 7, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 7}, 1, true},

// radius too low
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 5, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 5}, 1, false},

// dur too long
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 2, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 2, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 2, false},

// connections not enough
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 90, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 90, StorageRadius: 8, BeeMode: "full", BatchCommitment: 50, ReserveSize: 100, CommittedDepth: 8}, 1, false},

// commitment wrong
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 35, ReserveSize: 100}, 1, false},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", BatchCommitment: 35, ReserveSize: 100, CommittedDepth: 8}, 1, false},
}

statusM := &statusMock{make(map[string]peer)}
Expand All @@ -66,11 +66,12 @@ func TestSalud(t *testing.T) {
topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

reserve := mockstorer.NewReserve(
mockstorer.WithRadius(8),
mockstorer.WithRadius(6),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)

err := spinlock.Wait(time.Minute, func() bool {
return len(topM.PeersHealth()) == len(peers)
Expand Down Expand Up @@ -114,9 +115,10 @@ func TestSelfUnhealthyRadius(t *testing.T) {
reserve := mockstorer.NewReserve(
mockstorer.WithRadius(7),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(0),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 0)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
Expand All @@ -135,8 +137,8 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) {
t.Parallel()
peers := []peer{
// fully healhy
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full"}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", CommittedDepth: 8}, 0, true},
{swarm.RandAddress(t), &status.Snapshot{ConnectedPeers: 100, StorageRadius: 8, BeeMode: "full", CommittedDepth: 8}, 0, true},
}

statusM := &statusMock{make(map[string]peer)}
Expand All @@ -151,9 +153,10 @@ func TestSelfHealthyCapacityDoubling(t *testing.T) {
reserve := mockstorer.NewReserve(
mockstorer.WithRadius(6),
mockstorer.WithReserveSize(100),
mockstorer.WithCapacityDoubling(2),
)

service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8, 2)
service := salud.New(statusM, topM, reserve, log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

err := spinlock.Wait(time.Minute, func() bool {
Expand Down Expand Up @@ -183,7 +186,7 @@ func TestSubToRadius(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)

c, unsub := service.SubscribeNetworkStorageRadius()
t.Cleanup(unsub)
Expand Down Expand Up @@ -216,7 +219,7 @@ func TestUnsub(t *testing.T) {

topM := topMock.NewTopologyDriver(topMock.WithPeers(addrs...))

service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8, 0)
service := salud.New(&statusMock{make(map[string]peer)}, topM, mockstorer.NewReserve(), log.Noop, -1, "full", 0, 0.8, 0.8)
testutil.CleanupCloser(t, service)

c, unsub := service.SubscribeNetworkStorageRadius()
Expand Down
Loading

0 comments on commit fb6c2e8

Please sign in to comment.