Skip to content

Commit

Permalink
storage: Path type filters
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jul 12, 2022
1 parent e7c8082 commit 6ac5c16
Show file tree
Hide file tree
Showing 12 changed files with 286 additions and 33 deletions.
33 changes: 22 additions & 11 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,29 @@ type StorageMiner interface {
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
SealingAbort(ctx context.Context, call storiface.CallID) error //perm:admin

// SectorIndex
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error) //perm:admin
StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error //perm:admin
StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error //perm:admin
StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error //perm:admin
// paths.SectorIndex
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error) //perm:admin
StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error //perm:admin
StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error //perm:admin
StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error //perm:admin
// StorageFindSector returns list of paths where the specified sector files exist.
//
// If allowFetch is set, list of paths to which the sector can be fetched will also be returned.
// - Paths which have sector files locally (don't require fetching) will be listed first.
// - Paths which have sector files locally will not be filtered based on based on AllowTypes/DenyTypes.
// - Paths which require fetching will be filtered based on AllowTypes/DenyTypes. If multiple
// file types are specified, each type will be considered individually, and a union of all paths
// which can accommodate each file type will be returned.
StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) //perm:admin
StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) //perm:admin
StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error //perm:admin
StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) //perm:admin
StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) //perm:admin
StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) //perm:admin
// StorageBestAlloc returns list of paths where sector files of the specified type can be allocated, ordered by preference.
// Paths with more weight and more % of free space are preferred.
// Note: This method doesn't filter paths based on AllowTypes/DenyTypes.
StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) //perm:admin
StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error //perm:admin
StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) //perm:admin
StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) //perm:admin
StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) //perm:admin

StorageLocal(ctx context.Context) (map[storiface.ID]string, error) //perm:admin
StorageStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error) //perm:admin
Expand Down
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
39 changes: 37 additions & 2 deletions documentation/en/api-v0-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -3271,7 +3271,7 @@ Inputs:
Response: `{}`

### StorageAttach
SectorIndex
paths.SectorIndex


Perms: admin
Expand All @@ -3293,6 +3293,12 @@ Inputs:
],
"AllowTo": [
"string value"
],
"AllowTypes": [
"string value"
],
"DenyTypes": [
"string value"
]
},
{
Expand Down Expand Up @@ -3328,6 +3334,9 @@ Response:
```

### StorageBestAlloc
StorageBestAlloc returns list of paths where sector files of the specified type can be allocated, ordered by preference.
Paths with more weight and more % of free space are preferred.
Note: This method doesn't filter paths based on AllowTypes/DenyTypes.


Perms: admin
Expand Down Expand Up @@ -3358,6 +3367,12 @@ Response:
],
"AllowTo": [
"string value"
],
"AllowTypes": [
"string value"
],
"DenyTypes": [
"string value"
]
}
]
Expand Down Expand Up @@ -3403,6 +3418,14 @@ Inputs:
Response: `{}`

### StorageFindSector
StorageFindSector returns list of paths where the specified sector files exist.

If allowFetch is set, list of paths to which the sector can be fetched will also be returned.
- Paths which have sector files locally (don't require fetching) will be listed first.
- Paths which have sector files locally will not be filtered based on based on AllowTypes/DenyTypes.
- Paths which require fetching will be filtered based on AllowTypes/DenyTypes. If multiple
file types are specified, each type will be considered individually, and a union of all paths
which can accommodate each file type will be returned.


Perms: admin
Expand Down Expand Up @@ -3434,7 +3457,13 @@ Response:
"Weight": 42,
"CanSeal": true,
"CanStore": true,
"Primary": true
"Primary": true,
"AllowTypes": [
"string value"
],
"DenyTypes": [
"string value"
]
}
]
```
Expand Down Expand Up @@ -3502,6 +3531,12 @@ Response:
],
"AllowTo": [
"string value"
],
"AllowTypes": [
"string value"
],
"DenyTypes": [
"string value"
]
}
```
Expand Down
29 changes: 29 additions & 0 deletions storage/paths/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ func (i *Index) StorageList(ctx context.Context) (map[storiface.ID][]storiface.D
}

func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st fsutil.FsStat) error {
for i, typ := range si.AllowTypes {
_, err := storiface.TypeFromString(typ)
if err != nil {
// No need no hard-fail here, just warn the user
// (note that even with all-invalid entries we'll deny all types, so nothing unexpected should enter the path)
log.Errorw("bad path type in AllowTypes", "path", si.ID, "idx", i, "type", typ, "error", err)
}
}
for i, typ := range si.DenyTypes {
_, err := storiface.TypeFromString(typ)
if err != nil {
// No need no hard-fail here, just warn the user
log.Errorw("bad path type in DenyTypes", "path", si.ID, "idx", i, "type", typ, "error", err)
}
}

i.lk.Lock()
defer i.lk.Unlock()

Expand Down Expand Up @@ -136,6 +152,8 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st
i.stores[si.ID].info.CanStore = si.CanStore
i.stores[si.ID].info.Groups = si.Groups
i.stores[si.ID].info.AllowTo = si.AllowTo
i.stores[si.ID].info.AllowTypes = si.AllowTypes
i.stores[si.ID].info.DenyTypes = si.DenyTypes

return nil
}
Expand Down Expand Up @@ -312,6 +330,9 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
CanStore: st.info.CanStore,

Primary: isprimary[id],

AllowTypes: st.info.AllowTypes,
DenyTypes: st.info.DenyTypes,
})
}

Expand Down Expand Up @@ -345,6 +366,11 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
continue
}

if !ft.AnyAllowed(st.info.AllowTypes, st.info.DenyTypes) {
log.Debugf("not selecting on %s, not allowed by file type filters", st.info.ID)
continue
}

if allowTo != nil {
allow := false
for _, group := range st.info.Groups {
Expand Down Expand Up @@ -383,6 +409,9 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif
CanStore: st.info.CanStore,

Primary: false,

AllowTypes: st.info.AllowTypes,
DenyTypes: st.info.DenyTypes,
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions storage/paths/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, exi
continue
}

if !fileType.Allowed(si.AllowTypes, si.DenyTypes) {
continue
}

// TODO: Check free space

best = p.sectorPath(sid.ID, fileType)
Expand Down
11 changes: 6 additions & 5 deletions storage/paths/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,21 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist
}
}

apaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op)
// get a list of paths to fetch data into. Note: file type filters will apply inside this call.
fetchPaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
}

odt := storiface.FSOverheadSeal
overheadTable := storiface.FSOverheadSeal
if pathType == storiface.PathStorage {
odt = storiface.FsOverheadFinalized
overheadTable = storiface.FsOverheadFinalized
}

// If any path types weren't found in local storage, try fetching them

// First reserve storage
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt)
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, overheadTable)
if err != nil {
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
}
Expand All @@ -168,7 +169,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist
continue
}

dest := storiface.PathByType(apaths, fileType)
dest := storiface.PathByType(fetchPaths, fileType)
storageID := storiface.PathByType(ids, fileType)

url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest)
Expand Down
11 changes: 9 additions & 2 deletions storage/sealer/selector_alloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,20 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return false, false, xerrors.Errorf("finding best alloc storage: %w", err)
}

requested := s.alloc

for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, false, nil
requested = requested.SubAllowed(info.AllowTypes, info.AllowTypes)

// got all paths
if requested == storiface.FTNone {
break
}
}
}

return false, false, nil
return requested == storiface.FTNone, false, nil
}

func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
22 changes: 17 additions & 5 deletions storage/sealer/selector_existing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
type existingSelector struct {
index paths.SectorIndex
sector abi.SectorID
alloc storiface.SectorFileType
fileType storiface.SectorFileType
allowFetch bool
}

func newExistingSelector(index paths.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, allowFetch bool) *existingSelector {
return &existingSelector{
index: index,
sector: sector,
alloc: alloc,
fileType: alloc,
allowFetch: allowFetch,
}
}
Expand Down Expand Up @@ -52,18 +52,30 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return false, false, xerrors.Errorf("getting sector size: %w", err)
}

best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
best, err := s.index.StorageFindSector(ctx, s.sector, s.fileType, ssize, s.allowFetch)
if err != nil {
return false, false, xerrors.Errorf("finding best storage: %w", err)
}

requested := s.fileType

for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, false, nil
// we're not putting new sector files anywhere
if !s.allowFetch {
return true, false, nil
}

requested = requested.SubAllowed(info.AllowTypes, info.AllowTypes)

// got all paths
if requested == storiface.FTNone {
break
}
}
}

return false, false, nil
return requested == storiface.FTNone, false, nil
}

func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
14 changes: 11 additions & 3 deletions storage/sealer/selector_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return false, false, xerrors.Errorf("finding best dest storage: %w", err)
}

var ok bool
var ok, pref bool
requested := s.alloc

for _, info := range best {
if n, has := workerPaths[info.ID]; has {
Expand All @@ -83,12 +84,19 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
// either a no-op because the sector is already in the correct path,
// or the move a local move.
if n > 0 {
return true, true, nil
pref = true
}

requested = requested.SubAllowed(info.AllowTypes, info.AllowTypes)

// got all paths
if requested == storiface.FTNone {
break
}
}
}

return ok && s.allowRemote, false, nil
return ok && s.allowRemote, pref, nil
}

func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
Expand Down
Loading

0 comments on commit 6ac5c16

Please sign in to comment.