Skip to content

Commit

Permalink
dst: block WAL operations on shutdown
Browse files Browse the repository at this point in the history
This is in preparation for hard restarts.
  • Loading branch information
asubiotto committed Jun 5, 2024
1 parent db2625b commit 6eafeb6
Showing 1 changed file with 64 additions and 11 deletions.
75 changes: 64 additions & 11 deletions dst/dst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,13 @@ func (w *writerHelper) write(ctx context.Context) (uint64, error) {
// store to track writes that are successfully committed in special cases (e.g.
// when a hard shutdown copies the WAL's directory from under it).
type testLogStore struct {
internalWal.LogStore
// accOnNoError is set to true if the LogStore should accumulate logs even
// on successful commit.
accOnNoError atomic.Bool
lStore internalWal.LogStore

// shutdown is set when the store is closing. This is a temporary solution
// to prevent the WAL using open file descriptors if we want to capture
// a snapshot of the WAL directory.
shutdown atomic.Bool

// acc does not need to be protected by a mutex since StoreLogs is called
// synchronously.
acc int64checksum
Expand Down Expand Up @@ -268,18 +271,68 @@ func (s *testLogStore) droppedLogsCallback(logs []types.LogEntry) {
_ = s.accumulateLogs(logs)
}

func (s *testLogStore) Close() error {
return s.lStore.Close()
}

func (s *testLogStore) StoreLogs(logs []types.LogEntry) error {
if err := s.LogStore.StoreLogs(logs); err != nil {
if s.shutdown.Load() {
_ = s.accumulateLogs(logs)
return err
return errors.New("store is shutting down")
}
// Successful commit.
if s.accOnNoError.Load() {
return s.accumulateLogs(logs)

if err := s.lStore.StoreLogs(logs); err != nil {
_ = s.accumulateLogs(logs)
return err
}

return nil
}

// FirstIndex returns the first index written. 0 for no entries.
func (s *testLogStore) FirstIndex() (uint64, error) {
if s.shutdown.Load() {
return 0, errors.New("store is shutting down")
}
return s.lStore.FirstIndex()
}

// LastIndex returns the last index written. 0 for no entries.
func (s *testLogStore) LastIndex() (uint64, error) {
if s.shutdown.Load() {
return 0, errors.New("store is shutting down")
}
return s.lStore.LastIndex()
}

// GetLog gets a log entry at a given index.
func (s *testLogStore) GetLog(index uint64, log *types.LogEntry) error {
if s.shutdown.Load() {
return errors.New("store is shutting down")
}
return s.lStore.GetLog(index, log)
}

// TruncateBack truncates the back of the log by removing all entries that
// are after the provided `index`. In other words the entry at `index`
// becomes the last entry in the log.
func (s *testLogStore) TruncateBack(index uint64) error {
if s.shutdown.Load() {
return errors.New("store is shutting down")
}
return s.lStore.TruncateBack(index)
}

// TruncateFront truncates the front of the log by removing all entries that
// are before the provided `index`. In other words the entry at
// `index` becomes the first entry in the log.
func (s *testLogStore) TruncateFront(index uint64) error {
if s.shutdown.Load() {
return errors.New("store is shutting down")
}
return s.lStore.TruncateFront(index)
}

// canIgnoreError returns whether the given error can be ignored. Specifically,
// errors returned by operations when the database is closing are not a problem.
func canIgnoreError(err error) bool {
Expand Down Expand Up @@ -365,7 +418,7 @@ func TestDST(t *testing.T) {
storeID := 0
c, err := newStore(
storageDir, log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.LogStore = logStore
logStoreWrapper.lStore = logStore
return logStoreWrapper
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
Expand Down Expand Up @@ -472,7 +525,7 @@ func TestDST(t *testing.T) {
c, err = newStore(
storageDir,
log.WithPrefix(logger, "storeID", storeID), objectStorage, func(logStore internalWal.LogStore) internalWal.LogStore {
logStoreWrapper.LogStore = logStore
logStoreWrapper.lStore = logStore
return logStoreWrapper
}, logStoreWrapper.droppedLogsCallback, walTicker,
)
Expand Down

0 comments on commit 6eafeb6

Please sign in to comment.