Skip to content

Commit

Permalink
Simplify sectors cache by removing FileContract association (#1661)
Browse files Browse the repository at this point in the history
Another PR that will eventually help with moving the uploader into its
own package by moving the `ObjectStore` further up the stack, out of the
`uploader`.

This should also make pruning a bit more robust when it comes to
renewals.
  • Loading branch information
ChrisSchinnerl authored Nov 12, 2024
2 parents 199a567 + 99a15da commit 92ed015
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 228 deletions.
7 changes: 0 additions & 7 deletions api/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,6 @@ type (
RenterFunds types.Currency `json:"renterFunds"`
}

// ContractRootsResponse is the response type for the /contract/:id/roots
// endpoint.
ContractRootsResponse struct {
Roots []types.Hash256 `json:"roots"`
Uploading []types.Hash256 `json:"uploading"`
}

// ContractsArchiveRequest is the request type for the /contracts/archive endpoint.
ContractsArchiveRequest = map[types.FileContractID]string

Expand Down
6 changes: 0 additions & 6 deletions api/slab.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ type (
Slabs []UploadedPackedSlab `json:"slabs"`
}

// UploadSectorRequest is the request type for the /upload/:id/sector endpoint.
UploadSectorRequest struct {
ContractID types.FileContractID `json:"contractID"`
Root types.Hash256 `json:"root"`
}

UnhealthySlabsResponse struct {
Slabs []UnhealthySlab `json:"slabs"`
}
Expand Down
7 changes: 2 additions & 5 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,9 @@ type (
}

UploadingSectorsCache interface {
AddSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error
AddSectors(uID api.UploadID, roots ...types.Hash256) error
FinishUpload(uID api.UploadID)
HandleRenewal(fcid, renewedFrom types.FileContractID)
Pending(fcid types.FileContractID) (size uint64)
Sectors(fcid types.FileContractID) (roots []types.Hash256)
Sectors() (sectors []types.Hash256)
StartUpload(uID api.UploadID) error
}

Expand Down Expand Up @@ -590,7 +588,6 @@ func (b *Bus) addRenewal(ctx context.Context, renewedFrom types.FileContractID,
return api.ContractMetadata{}, err
}

b.sectors.HandleRenewal(renewal.ID, renewal.RenewedFrom)
b.broadcastAction(webhooks.Event{
Module: api.ModuleContract,
Event: api.EventRenew,
Expand Down
9 changes: 3 additions & 6 deletions bus/client/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ func (c *Client) Contract(ctx context.Context, id types.FileContractID) (contrac

// ContractRoots returns the sector roots, as well as the ones that are still
// uploading, for the contract with given id.
func (c *Client) ContractRoots(ctx context.Context, contractID types.FileContractID) (roots, uploading []types.Hash256, err error) {
var resp api.ContractRootsResponse
if err = c.c.WithContext(ctx).GET(fmt.Sprintf("/contract/%s/roots", contractID), &resp); err != nil {
return
}
return resp.Roots, resp.Uploading, nil
func (c *Client) ContractRoots(ctx context.Context, contractID types.FileContractID) (roots []types.Hash256, err error) {
err = c.c.WithContext(ctx).GET(fmt.Sprintf("/contract/%s/roots", contractID), &roots)
return
}

// ContractSets returns the contract sets of the bus.
Expand Down
9 changes: 3 additions & 6 deletions bus/client/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@ import (
"go.sia.tech/renterd/api"
)

// AddUploadingSector adds the given sector to the upload with given id.
func (c *Client) AddUploadingSector(ctx context.Context, uID api.UploadID, id types.FileContractID, root types.Hash256) (err error) {
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/upload/%s/sector", uID), api.UploadSectorRequest{
ContractID: id,
Root: root,
}, nil)
// AddUploadingSectors adds the given sectors to the upload with given id.
func (c *Client) AddUploadingSectors(ctx context.Context, uID api.UploadID, roots []types.Hash256) (err error) {
err = c.c.WithContext(ctx).POST(fmt.Sprintf("/upload/%s/sector", uID), &roots, nil)
return
}

Expand Down
34 changes: 5 additions & 29 deletions bus/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ func (b *Bus) contractPruneHandlerPOST(jc jape.Context) {

// build map of uploading sectors
pending := make(map[types.Hash256]struct{})
for _, root := range b.sectors.Sectors(fcid) {
for _, root := range b.sectors.Sectors() {
pending[root] = struct{}{}
}

Expand Down Expand Up @@ -1000,16 +1000,6 @@ func (b *Bus) contractsPrunableDataHandlerGET(jc jape.Context) {

// build the response
for fcid, size := range sizes {
// adjust the amount of prunable data with the pending uploads, due to
// how we record contract spending a contract's size might already
// include pending sectors
pending := b.sectors.Pending(fcid)
if pending > size.Prunable {
size.Prunable = 0
} else {
size.Prunable -= pending
}

contracts = append(contracts, api.ContractPrunableData{
ID: fcid,
ContractSize: size,
Expand Down Expand Up @@ -1046,17 +1036,6 @@ func (b *Bus) contractSizeHandlerGET(jc jape.Context) {
} else if jc.Check("failed to fetch contract size", err) != nil {
return
}

// adjust the amount of prunable data with the pending uploads, due to how
// we record contract spending a contract's size might already include
// pending sectors
pending := b.sectors.Pending(id)
if pending > size.Prunable {
size.Prunable = 0
} else {
size.Prunable -= pending
}

jc.Encode(size)
}

Expand Down Expand Up @@ -1188,10 +1167,7 @@ func (b *Bus) contractIDRootsHandlerGET(jc jape.Context) {

roots, err := b.store.ContractRoots(jc.Request.Context(), id)
if jc.Check("couldn't fetch contract sectors", err) == nil {
jc.Encode(api.ContractRootsResponse{
Roots: roots,
Uploading: b.sectors.Sectors(id),
})
jc.Encode(roots)
}
}

Expand Down Expand Up @@ -2075,11 +2051,11 @@ func (b *Bus) uploadAddSectorHandlerPOST(jc jape.Context) {
if jc.DecodeParam("id", &id) != nil {
return
}
var req api.UploadSectorRequest
if jc.Decode(&req) != nil {
var roots []types.Hash256
if jc.Decode(&roots) != nil {
return
}
jc.Check("failed to add sector", b.sectors.AddSector(id, req.ContractID, req.Root))
jc.Check("failed to add sectors", b.sectors.AddSectors(id, roots...))
}

func (b *Bus) uploadFinishedHandlerDELETE(jc jape.Context) {
Expand Down
76 changes: 10 additions & 66 deletions internal/bus/sectorscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sync"
"time"

"go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
)
Expand All @@ -19,36 +18,23 @@ const (

type (
SectorsCache struct {
mu sync.Mutex
uploads map[api.UploadID]*ongoingUpload
renewedTo map[types.FileContractID]types.FileContractID
mu sync.Mutex
uploads map[api.UploadID]*ongoingUpload
}

ongoingUpload struct {
started time.Time
contractSectors map[types.FileContractID][]types.Hash256
started time.Time
sectors []types.Hash256
}
)

func (ou *ongoingUpload) addSector(fcid types.FileContractID, root types.Hash256) {
ou.contractSectors[fcid] = append(ou.contractSectors[fcid], root)
}

func (ou *ongoingUpload) sectors(fcid types.FileContractID) (roots []types.Hash256) {
if sectors, exists := ou.contractSectors[fcid]; exists && time.Since(ou.started) < cacheExpiry {
roots = append(roots, sectors...)
}
return
}

func NewSectorsCache() *SectorsCache {
return &SectorsCache{
uploads: make(map[api.UploadID]*ongoingUpload),
renewedTo: make(map[types.FileContractID]types.FileContractID),
uploads: make(map[api.UploadID]*ongoingUpload),
}
}

func (sc *SectorsCache) AddSector(uID api.UploadID, fcid types.FileContractID, root types.Hash256) error {
func (sc *SectorsCache) AddSectors(uID api.UploadID, roots ...types.Hash256) error {
sc.mu.Lock()
defer sc.mu.Unlock()

Expand All @@ -57,8 +43,7 @@ func (sc *SectorsCache) AddSector(uID api.UploadID, fcid types.FileContractID, r
return fmt.Errorf("%w; id '%v'", api.ErrUnknownUpload, uID)
}

fcid = sc.latestFCID(fcid)
ongoing.addSector(fcid, root)
ongoing.sectors = append(ongoing.sectors, roots...)
return nil
}

Expand All @@ -73,46 +58,13 @@ func (sc *SectorsCache) FinishUpload(uID api.UploadID) {
delete(sc.uploads, uID)
}
}

// prune renewed to map
for old, new := range sc.renewedTo {
if _, exists := sc.renewedTo[new]; exists {
delete(sc.renewedTo, old)
}
}
}

func (sc *SectorsCache) HandleRenewal(fcid, renewedFrom types.FileContractID) {
sc.mu.Lock()
defer sc.mu.Unlock()

for _, upload := range sc.uploads {
if _, exists := upload.contractSectors[renewedFrom]; exists {
upload.contractSectors[fcid] = upload.contractSectors[renewedFrom]
upload.contractSectors[renewedFrom] = nil
}
}
sc.renewedTo[renewedFrom] = fcid
}

func (sc *SectorsCache) Pending(fcid types.FileContractID) (size uint64) {
sc.mu.Lock()
defer sc.mu.Unlock()

fcid = sc.latestFCID(fcid)
for _, ongoing := range sc.uploads {
size += uint64(len(ongoing.sectors(fcid))) * rhp.SectorSize
}
return
}

func (sc *SectorsCache) Sectors(fcid types.FileContractID) (roots []types.Hash256) {
func (sc *SectorsCache) Sectors() (sectors []types.Hash256) {
sc.mu.Lock()
defer sc.mu.Unlock()

fcid = sc.latestFCID(fcid)
for _, ongoing := range sc.uploads {
roots = append(roots, ongoing.sectors(fcid)...)
sectors = append(sectors, ongoing.sectors...)
}
return
}
Expand All @@ -127,15 +79,7 @@ func (sc *SectorsCache) StartUpload(uID api.UploadID) error {
}

sc.uploads[uID] = &ongoingUpload{
started: time.Now(),
contractSectors: make(map[types.FileContractID][]types.Hash256),
started: time.Now(),
}
return nil
}

func (um *SectorsCache) latestFCID(fcid types.FileContractID) types.FileContractID {
if latest, ok := um.renewedTo[fcid]; ok {
return latest
}
return fcid
}
Loading

0 comments on commit 92ed015

Please sign in to comment.