diff --git a/dst/dst_test.go b/dst/dst_test.go index 09b995f7c1..44aad2b87b 100644 --- a/dst/dst_test.go +++ b/dst/dst_test.go @@ -52,7 +52,8 @@ const ( compact snapshot rotate - restart + gracefulRestart + hardRestart ) func (c command) String() string { @@ -65,23 +66,26 @@ func (c command) String() string { return "snapshot" case rotate: return "rotate" - case restart: - return "restart" + case gracefulRestart: + return "gracefulRestart" + case hardRestart: + return "hardRestart" default: return "" } } -var commands = []command{insert, compact, snapshot, rotate, restart} +var commands = []command{insert, compact, snapshot, rotate, gracefulRestart, hardRestart} // probabilities are command probabilities. It is not strictly necessary that // these sum to 1. var probabilities = map[command]float64{ - insert: 0.75, - compact: 0.25, - snapshot: 0.1, - rotate: 0.05, - restart: 0.01, + insert: 0.75, + compact: 0.25, + snapshot: 0.1, + rotate: 0.05, + gracefulRestart: 0.01, + hardRestart: 0.01, } var cumulativeProbabilities []float64 @@ -163,6 +167,12 @@ type fakeTicker struct { } func (t *fakeTicker) C() <-chan time.Time { + // Fake sleep to simulate preemption. This avoids hot loops in the WAL that + // can starve other goroutines from running since asyncpreemption is not + // implemented in WASM. + if rand.Float64() < 0.2 { + time.Sleep(1 * time.Millisecond) + } return t.c } @@ -458,6 +468,46 @@ func TestDST(t *testing.T) { commandDistribution := make(map[command]int) ignoreGoroutinesAtStartOfTest := goleak.IgnoreCurrent() + + waitForRunningGoroutines := func(t *testing.T) { + t.Helper() + // Unfortunately frostdb doesn't have goroutine lifecycle management + // and adding it could lead to subtle issues (e.g. on Close with + // many DBs). Instead, this test simply verifies all goroutines + // spawned up until this restart eventually exit after n retries. + const maxRetries = 10 + for i := 0; i < maxRetries; i++ { + if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil { + break + } else if i == maxRetries-1 { + t.Fatalf("leaked goroutines found on Close: %v", err) + } else { + time.Sleep(1 * time.Millisecond) + } + } + } + + restart := func(t *testing.T) { + t.Helper() + storeID++ + c, err = newStore( + storageDir, + log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore { + logStoreWrapper.lStore = logStore + return logStoreWrapper + }, logStoreWrapper.droppedLogsCallback, walTicker, + ) + require.NoError(t, err) + newDB, err := c.DB(ctx, dbName) + require.NoError(t, err) + table, err := newDB.Table(tableName, tableConfig) + require.NoError(t, err) + db.Store(newDB) + tp.Update(table) + _, err = w.write(ctx) + // This write should succeed. + require.NoError(t, err) + } for i := 0; i < numCommands; i++ { cmd := genCommand() commandDistribution[cmd]++ @@ -497,48 +547,43 @@ func TestDST(t *testing.T) { } return nil }) - case restart: + case gracefulRestart: // This is a hack to ensure some goroutines are scheduled before - // this restart. + // this gracefulRestart. // TODO(asubiotto): Figure out if we still need this. time.Sleep(1 * time.Millisecond) // Graceful shutdown. require.NoError(t, c.Close()) _ = errg.Wait() - - // Unfortunately frostdb doesn't have goroutine lifecycle management - // and adding it could lead to subtle issues (e.g. on Close with - // many DBs). Instead, this test simply verifies all goroutines - // spawned up until this restart eventually exit after n retries. - const maxRetries = 10 - for i := 0; i < maxRetries; i++ { - if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil { - break - } else if i == maxRetries-1 { - t.Fatalf("leaked goroutines found on Close: %v", err) - } else { - time.Sleep(1 * time.Millisecond) - } - } - - storeID++ - c, err = newStore( - storageDir, - log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore { - logStoreWrapper.lStore = logStore - return logStoreWrapper - }, logStoreWrapper.droppedLogsCallback, walTicker, - ) - require.NoError(t, err) - newDB, err := c.DB(ctx, dbName) - require.NoError(t, err) - table, err := newDB.Table(tableName, tableConfig) - require.NoError(t, err) - db.Store(newDB) - tp.Update(table) - _, err = w.write(ctx) - // This write should succeed. - require.NoError(t, err) + waitForRunningGoroutines(t) + restart(t) + case hardRestart: + // Simulate a hard restart by recursively copying the storage + // directory at this point in time. + tmpDir := storageDir + ".tmp" + require.NoError(t, os.Rename(storageDir, tmpDir)) + // Close to terminate all goroutines, although they shouldn't have + // any secondary effects. Accumulate any timestamps written to the + // WAL even on successful commit, since these will be lost and we + // don't expect to see them on startup. + logStoreWrapper.shutdown.Store(true) + // Wait before closing to avoid writes writing to the closed + // channel below. The snapshot of the directory has already been + // taken, so all of this is just cleanup code. + _ = errg.Wait() + // Close the WAL ticker chan to ensure draining of all WAL entries. + // This is done as a workaround so that pending WAL entries are + // fully written to the LogStore and recorded as "lost". + close(walTicker.c) + require.NoError(t, c.Close()) + waitForRunningGoroutines(t) + logStoreWrapper.shutdown.Store(false) + walTicker.c = make(chan time.Time, 1) + // storageDir can get recreated by straggling goroutines (e.g. WAL + // writes), so ensure these are completely removed. + require.NoError(t, os.RemoveAll(storageDir)) + require.NoError(t, os.Rename(tmpDir, storageDir)) + restart(t) } } @@ -555,6 +600,9 @@ func TestDST(t *testing.T) { listFiles := func(dir string) string { de, err := os.ReadDir(filepath.Join(storageDir, "databases", dbName, dir)) + if errors.Is(err, os.ErrNotExist) { + return "" + } require.NoError(t, err) var files []string for _, e := range de {