Skip to content

Commit

Permalink
dst: add hardRestart command
Browse files Browse the repository at this point in the history
This simulates a hard restart by renaming the storage directory. This method
is not foolproof as open file descriptors can still be written to. This commit
mitigates this somewhat by disallowing any WAL writes after a hard shutdown has
occurred. However, there could still be open file descriptors somewhere else.

Eventually, this will hopefully be implemented at the syscall layer if
possible by rejecting any writes during a hard shutdown.
  • Loading branch information
asubiotto committed Jun 5, 2024
1 parent 6eafeb6 commit 77ce331
Showing 1 changed file with 93 additions and 45 deletions.
138 changes: 93 additions & 45 deletions dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ const (
compact
snapshot
rotate
restart
gracefulRestart
hardRestart
)

func (c command) String() string {
Expand All @@ -65,23 +66,26 @@ func (c command) String() string {
return "snapshot"
case rotate:
return "rotate"
case restart:
return "restart"
case gracefulRestart:
return "gracefulRestart"
case hardRestart:
return "hardRestart"
default:
return "<unknown>"
}
}

var commands = []command{insert, compact, snapshot, rotate, restart}
var commands = []command{insert, compact, snapshot, rotate, gracefulRestart, hardRestart}

// probabilities are command probabilities. It is not strictly necessary that
// these sum to 1.
var probabilities = map[command]float64{
insert: 0.75,
compact: 0.25,
snapshot: 0.1,
rotate: 0.05,
restart: 0.01,
insert: 0.75,
compact: 0.25,
snapshot: 0.1,
rotate: 0.05,
gracefulRestart: 0.01,
hardRestart: 0.01,
}

var cumulativeProbabilities []float64
Expand Down Expand Up @@ -163,6 +167,12 @@ type fakeTicker struct {
}

func (t *fakeTicker) C() <-chan time.Time {
// Fake sleep to simulate preemption. This avoids hot loops in the WAL that
// can starve other goroutines from running since asyncpreemption is not
// implemented in WASM.
if rand.Float64() < 0.2 {
time.Sleep(1 * time.Millisecond)
}
return t.c
}

Expand Down Expand Up @@ -458,6 +468,46 @@ func TestDST(t *testing.T) {
commandDistribution := make(map[command]int)

ignoreGoroutinesAtStartOfTest := goleak.IgnoreCurrent()

waitForRunningGoroutines := func(t *testing.T) {
t.Helper()
// Unfortunately frostdb doesn't have goroutine lifecycle management
// and adding it could lead to subtle issues (e.g. on Close with
// many DBs). Instead, this test simply verifies all goroutines
// spawned up until this restart eventually exit after n retries.
const maxRetries = 10
for i := 0; i < maxRetries; i++ {
if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil {
break
} else if i == maxRetries-1 {
t.Fatalf("leaked goroutines found on Close: %v", err)
} else {
time.Sleep(1 * time.Millisecond)
}
}
}

restart := func(t *testing.T) {
t.Helper()
storeID++
c, err = newStore(
storageDir,
log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.lStore = logStore
return logStoreWrapper
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
require.NoError(t, err)
newDB, err := c.DB(ctx, dbName)
require.NoError(t, err)
table, err := newDB.Table(tableName, tableConfig)
require.NoError(t, err)
db.Store(newDB)
tp.Update(table)
_, err = w.write(ctx)
// This write should succeed.
require.NoError(t, err)
}
for i := 0; i < numCommands; i++ {
cmd := genCommand()
commandDistribution[cmd]++
Expand Down Expand Up @@ -497,48 +547,43 @@ func TestDST(t *testing.T) {
}
return nil
})
case restart:
case gracefulRestart:
// This is a hack to ensure some goroutines are scheduled before
// this restart.
// this gracefulRestart.
// TODO(asubiotto): Figure out if we still need this.
time.Sleep(1 * time.Millisecond)
// Graceful shutdown.
require.NoError(t, c.Close())
_ = errg.Wait()

// Unfortunately frostdb doesn't have goroutine lifecycle management
// and adding it could lead to subtle issues (e.g. on Close with
// many DBs). Instead, this test simply verifies all goroutines
// spawned up until this restart eventually exit after n retries.
const maxRetries = 10
for i := 0; i < maxRetries; i++ {
if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil {
break
} else if i == maxRetries-1 {
t.Fatalf("leaked goroutines found on Close: %v", err)
} else {
time.Sleep(1 * time.Millisecond)
}
}

storeID++
c, err = newStore(
storageDir,
log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.lStore = logStore
return logStoreWrapper
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
require.NoError(t, err)
newDB, err := c.DB(ctx, dbName)
require.NoError(t, err)
table, err := newDB.Table(tableName, tableConfig)
require.NoError(t, err)
db.Store(newDB)
tp.Update(table)
_, err = w.write(ctx)
// This write should succeed.
require.NoError(t, err)
waitForRunningGoroutines(t)
restart(t)
case hardRestart:
// Simulate a hard restart by recursively copying the storage
// directory at this point in time.
tmpDir := storageDir + ".tmp"
require.NoError(t, os.Rename(storageDir, tmpDir))
// Close to terminate all goroutines, although they shouldn't have
// any secondary effects. Accumulate any timestamps written to the
// WAL even on successful commit, since these will be lost and we
// don't expect to see them on startup.
logStoreWrapper.shutdown.Store(true)
// Wait before closing to avoid writes writing to the closed
// channel below. The snapshot of the directory has already been
// taken, so all of this is just cleanup code.
_ = errg.Wait()
// Close the WAL ticker chan to ensure draining of all WAL entries.
// This is done as a workaround so that pending WAL entries are
// fully written to the LogStore and recorded as "lost".
close(walTicker.c)
require.NoError(t, c.Close())
waitForRunningGoroutines(t)
logStoreWrapper.shutdown.Store(false)
walTicker.c = make(chan time.Time, 1)
// storageDir can get recreated by straggling goroutines (e.g. WAL
// writes), so ensure these are completely removed.
require.NoError(t, os.RemoveAll(storageDir))
require.NoError(t, os.Rename(tmpDir, storageDir))
restart(t)
}
}

Expand All @@ -555,6 +600,9 @@ func TestDST(t *testing.T) {

listFiles := func(dir string) string {
de, err := os.ReadDir(filepath.Join(storageDir, "databases", dbName, dir))
if errors.Is(err, os.ErrNotExist) {
return ""
}
require.NoError(t, err)
var files []string
for _, e := range de {
Expand Down

0 comments on commit 77ce331

Please sign in to comment.