Skip to content

Commit

Permalink
Migrate LoadSlabBuffer to raw SQL (#1323)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisSchinnerl authored Jun 21, 2024
1 parent f87d122 commit 866bf8f
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 79 deletions.
11 changes: 0 additions & 11 deletions stores/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,6 @@ type (
Shards []dbSector `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete shards too
}

dbBufferedSlab struct {
Model

DBSlab dbSlab

Filename string
}

dbSector struct {
Model

Expand Down Expand Up @@ -330,9 +322,6 @@ func (dbSector) TableName() string { return "sectors" }
// TableName implements the gorm.Tabler interface.
func (dbSlab) TableName() string { return "slabs" }

// TableName implements the gorm.Tabler interface.
func (dbBufferedSlab) TableName() string { return "buffered_slabs" }

// TableName implements the gorm.Tabler interface.
func (dbSlice) TableName() string { return "slices" }

Expand Down
28 changes: 16 additions & 12 deletions stores/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2722,7 +2722,13 @@ func TestPartialSlab(t *testing.T) {
t.Fatal("wrong data")
}

var buffer dbBufferedSlab
type bufferedSlab struct {
ID uint
DBSlab dbSlab `gorm:"foreignKey:DBBufferedSlabID"`
Filename string
}

var buffer bufferedSlab
sk, _ := slabs[0].Key.MarshalBinary()
if err := ss.db.Joins("DBSlab").Take(&buffer, "DBSlab.key = ?", secretKey(sk)).Error; err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -2784,7 +2790,7 @@ func TestPartialSlab(t *testing.T) {
} else if !bytes.Equal(data, slab2Data) {
t.Fatal("wrong data")
}
buffer = dbBufferedSlab{}
buffer = bufferedSlab{}
sk, _ = slabs[0].Key.MarshalBinary()
if err := ss.db.Joins("DBSlab").Take(&buffer, "DBSlab.key = ?", secretKey(sk)).Error; err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -2825,13 +2831,13 @@ func TestPartialSlab(t *testing.T) {
} else if !bytes.Equal(slab3Data, append(data1, data2...)) {
t.Fatal("wrong data")
}
buffer = dbBufferedSlab{}
buffer = bufferedSlab{}
sk, _ = slabs[0].Key.MarshalBinary()
if err := ss.db.Joins("DBSlab").Take(&buffer, "DBSlab.key = ?", secretKey(sk)).Error; err != nil {
t.Fatal(err)
}
assertBuffer(buffer1Name, rhpv2.SectorSize, true, false)
buffer = dbBufferedSlab{}
buffer = bufferedSlab{}
sk, _ = slabs[1].Key.MarshalBinary()
if err := ss.db.Joins("DBSlab").Take(&buffer, "DBSlab.key = ?", secretKey(sk)).Error; err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -2860,11 +2866,11 @@ func TestPartialSlab(t *testing.T) {
assertBuffer(buffer1Name, rhpv2.SectorSize, true, true)
assertBuffer(buffer2Name, 1, false, false)

var foo []dbBufferedSlab
var foo []bufferedSlab
if err := ss.db.Find(&foo).Error; err != nil {
t.Fatal(err)
}
buffer = dbBufferedSlab{}
buffer = bufferedSlab{}
if err := ss.db.Take(&buffer, "id = ?", packedSlabs[0].BufferID).Error; err != nil {
t.Fatal(err)
}
Expand All @@ -2883,7 +2889,7 @@ func TestPartialSlab(t *testing.T) {
t.Fatal(err)
}

buffer = dbBufferedSlab{}
buffer = bufferedSlab{}
if err := ss.db.Take(&buffer, "id = ?", packedSlabs[0].BufferID).Error; !errors.Is(err, gorm.ErrRecordNotFound) {
t.Fatal("shouldn't be able to find buffer", err)
}
Expand Down Expand Up @@ -4131,10 +4137,8 @@ func TestSlabCleanup(t *testing.T) {
}

// create buffered slab
bs := dbBufferedSlab{
Filename: "foo",
}
if err := ss.db.Create(&bs).Error; err != nil {
bsID := uint(1)
if err := ss.db.Exec("INSERT INTO buffered_slabs (filename) VALUES ('foo');").Error; err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -4223,7 +4227,7 @@ func TestSlabCleanup(t *testing.T) {
// create another object that references a slab with buffer
ek, _ = object.GenerateEncryptionKey().MarshalBinary()
bufferedSlab := dbSlab{
DBBufferedSlabID: bs.ID,
DBBufferedSlabID: bsID,
DBContractSet: cs,
Health: 1,
Key: ek,
Expand Down
92 changes: 39 additions & 53 deletions stores/slabbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
sql "go.sia.tech/renterd/stores/sql"
"go.uber.org/zap"
"lukechampine.com/frand"
)

Expand All @@ -40,94 +41,79 @@ type SlabBuffer struct {
type bufferGroupID [6]byte

type SlabBufferManager struct {
alerts alerts.Alerter
bufferedSlabCompletionThreshold int64
db sql.Database
dir string
s *SQLStore
logger *zap.SugaredLogger

mu sync.Mutex
completeBuffers map[bufferGroupID][]*SlabBuffer
incompleteBuffers map[bufferGroupID][]*SlabBuffer
buffersByKey map[string]*SlabBuffer
}

func newSlabBufferManager(sqlStore *SQLStore, slabBufferCompletionThreshold int64, partialSlabDir string) (*SlabBufferManager, error) {
func newSlabBufferManager(ctx context.Context, a alerts.Alerter, db sql.Database, logger *zap.SugaredLogger, slabBufferCompletionThreshold int64, partialSlabDir string) (*SlabBufferManager, error) {
if slabBufferCompletionThreshold < 0 || slabBufferCompletionThreshold > 1<<22 {
return nil, fmt.Errorf("invalid slabBufferCompletionThreshold %v", slabBufferCompletionThreshold)
}

// load existing buffers
var buffers []dbBufferedSlab
err := sqlStore.db.
Joins("DBSlab").
Find(&buffers).
Error
buffers, orphans, err := db.LoadSlabBuffers(ctx)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to load slab buffers: %w", err)
}

mgr := &SlabBufferManager{
alerts: a,
bufferedSlabCompletionThreshold: slabBufferCompletionThreshold,
db: db,
dir: partialSlabDir,
s: sqlStore,
completeBuffers: make(map[bufferGroupID][]*SlabBuffer),
incompleteBuffers: make(map[bufferGroupID][]*SlabBuffer),
buffersByKey: make(map[string]*SlabBuffer),
logger: logger,

completeBuffers: make(map[bufferGroupID][]*SlabBuffer),
incompleteBuffers: make(map[bufferGroupID][]*SlabBuffer),
buffersByKey: make(map[string]*SlabBuffer),
}
for _, buffer := range buffers {
if buffer.DBSlab.ID == 0 {
// Buffer doesn't have a slab. We can delete it.
sqlStore.logger.Warn(fmt.Sprintf("buffer %v has no associated slab, deleting it", buffer.Filename))
if err := sqlStore.db.Delete(&buffer).Error; err != nil {
return nil, fmt.Errorf("failed to delete buffer %v: %v", buffer.ID, err)
}
if err := os.RemoveAll(filepath.Join(partialSlabDir, buffer.Filename)); err != nil {
return nil, fmt.Errorf("failed to remove buffer file %v: %v", buffer.Filename, err)
}
continue
}
// Get the encryption key.
var ec object.EncryptionKey
if err := ec.UnmarshalBinary(buffer.DBSlab.Key); err != nil {
return nil, err

for _, orphan := range orphans {
// Buffer doesn't have a slab. We can delete it.
logger.Warn(fmt.Sprintf("buffer '%v' has no associated slab, deleting it", orphan))
if err := os.RemoveAll(filepath.Join(partialSlabDir, orphan)); err != nil {
return nil, fmt.Errorf("failed to remove buffer file %v: %v", orphan, err)
}
}

for _, buffer := range buffers {
// Open the file.
file, err := os.OpenFile(filepath.Join(partialSlabDir, buffer.Filename), os.O_RDWR, 0600)
if err != nil {
_ = sqlStore.alerts.RegisterAlert(sqlStore.shutdownCtx, alerts.Alert{
_ = a.RegisterAlert(ctx, alerts.Alert{
ID: types.HashBytes([]byte(buffer.Filename)),
Severity: alerts.SeverityCritical,
Message: "failed to read buffer file on startup",
Data: map[string]interface{}{
"filename": buffer.Filename,
"slabKey": ec,
"slabKey": buffer.Key,
},
Timestamp: time.Now(),
})
sqlStore.logger.Errorf("failed to open buffer file %v for slab %v: %v", buffer.Filename, buffer.DBSlab.Key, err)
logger.Errorf("failed to open buffer file %v for slab %v: %v", buffer.Filename, buffer.Key, err)
continue
}
// Get the size of the buffer by looking at all slices using it
var size int64
err = sqlStore.db.Model(&dbSlab{}).
Joins("INNER JOIN slices sli ON slabs.id = sli.db_slab_id").
Select("COALESCE(MAX(offset+length), 0) as Size").
Where("slabs.db_buffered_slab_id = ?", buffer.ID).
Scan(&size).
Error
if err != nil {
return nil, err
}

// Create the slab buffer.
sb := &SlabBuffer{
dbID: buffer.ID,
dbID: uint(buffer.ID),
filename: buffer.Filename,
slabKey: ec,
maxSize: int64(bufferedSlabSize(buffer.DBSlab.MinShards)),
slabKey: buffer.Key,
maxSize: int64(bufferedSlabSize(buffer.MinShards)),
file: file,
size: size,
size: buffer.Size,
}
// Add the buffer to the manager.
gid := bufferGID(buffer.DBSlab.MinShards, buffer.DBSlab.TotalShards, uint32(buffer.DBSlab.DBContractSetID))
if size >= int64(sb.maxSize-slabBufferCompletionThreshold) {
gid := bufferGID(buffer.MinShards, buffer.TotalShards, uint32(buffer.ContractSetID))
if sb.size >= int64(sb.maxSize-slabBufferCompletionThreshold) {
mgr.completeBuffers[gid] = append(mgr.completeBuffers[gid], sb)
} else {
mgr.incompleteBuffers[gid] = append(mgr.incompleteBuffers[gid], sb)
Expand Down Expand Up @@ -204,7 +190,7 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m
// If there is still data left, create a new buffer.
if len(data) > 0 {
var sb *SlabBuffer
err = mgr.s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error {
err = mgr.db.Transaction(ctx, func(tx sql.DatabaseTx) error {
sb, err = createSlabBuffer(ctx, tx, contractSet, mgr.dir, minShards, totalShards)
return err
})
Expand Down Expand Up @@ -319,7 +305,7 @@ func (mgr *SlabBufferManager) SlabsForUpload(ctx context.Context, lockingDuratio
data := make([]byte, buffer.size)
_, err := buffer.file.ReadAt(data, 0)
if err != nil {
mgr.s.alerts.RegisterAlert(ctx, alerts.Alert{
mgr.alerts.RegisterAlert(ctx, alerts.Alert{
ID: types.HashBytes([]byte(buffer.filename)),
Severity: alerts.SeverityCritical,
Message: "failed to read data from buffer",
Expand All @@ -329,7 +315,7 @@ func (mgr *SlabBufferManager) SlabsForUpload(ctx context.Context, lockingDuratio
},
Timestamp: time.Now(),
})
mgr.s.logger.Error(ctx, fmt.Sprintf("failed to read buffer %v: %s", buffer.filename, err))
mgr.logger.Error(ctx, fmt.Sprintf("failed to read buffer %v: %s", buffer.filename, err))
return nil, err
}
slabs = append(slabs, api.PackedSlab{
Expand Down Expand Up @@ -361,9 +347,9 @@ func (mgr *SlabBufferManager) RemoveBuffers(fileNames ...string) {
// an error because the buffers are not meant to be used anymore
// anyway.
if err := buffers[i].file.Close(); err != nil {
mgr.s.logger.Errorf("failed to close buffer %v: %v", buffers[i].filename, err)
mgr.logger.Errorf("failed to close buffer %v: %v", buffers[i].filename, err)
} else if err := os.RemoveAll(filepath.Join(mgr.dir, buffers[i].filename)); err != nil {
mgr.s.logger.Errorf("failed to remove buffer %v: %v", buffers[i].filename, err)
mgr.logger.Errorf("failed to remove buffer %v: %v", buffers[i].filename, err)
}
delete(mgr.buffersByKey, buffers[i].slabKey.String())
buffers[i] = buffers[len(buffers)-1]
Expand Down
4 changes: 2 additions & 2 deletions stores/slabbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestRecordAppendToCompletedBuffer(t *testing.T) {
defer ss.Close()

completionThreshold := int64(1000)
mgr, err := newSlabBufferManager(ss.SQLStore, completionThreshold, t.TempDir())
mgr, err := newSlabBufferManager(context.Background(), ss.alerts, ss.bMain, ss.logger, completionThreshold, t.TempDir())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestMarkBufferCompleteTwice(t *testing.T) {
ss := newTestSQLStore(t, defaultTestSQLStoreConfig)
defer ss.Close()

mgr, err := newSlabBufferManager(ss.SQLStore, 0, t.TempDir())
mgr, err := newSlabBufferManager(context.Background(), ss.alerts, ss.bMain, ss.logger, 0, t.TempDir())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion stores/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) {
shutdownCtxCancel: shutdownCtxCancel,
}

ss.slabBufferMgr, err = newSlabBufferManager(ss, cfg.SlabBufferCompletionThreshold, cfg.PartialSlabDir)
ss.slabBufferMgr, err = newSlabBufferManager(shutdownCtx, cfg.Alerts, dbMain, l.Named("slabbuffers"), cfg.SlabBufferCompletionThreshold, cfg.PartialSlabDir)
if err != nil {
return nil, modules.ConsensusChangeID{}, err
}
Expand Down
13 changes: 13 additions & 0 deletions stores/sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type (
Database interface {
io.Closer

// LoadSlabBuffers loads the slab buffers from the database.
LoadSlabBuffers(ctx context.Context) ([]LoadedSlabBuffer, []string, error)

// Migrate runs all missing migrations on the database.
Migrate(ctx context.Context) error

Expand Down Expand Up @@ -304,6 +307,16 @@ type (
WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error)
}

LoadedSlabBuffer struct {
ID int64
ContractSetID int64
Filename string
Key object.EncryptionKey
MinShards uint8
Size int64
TotalShards uint8
}

UsedContract struct {
ID int64
FCID FileContractID
Expand Down
Loading

0 comments on commit 866bf8f

Please sign in to comment.