Skip to content

Commit

Permalink
Rename directory upon eviction
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Mar 14, 2024
1 parent f838b1f commit be54b1e
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 41 deletions.
84 changes: 53 additions & 31 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uin
// NewBlockDirectory creates a new BlockDirectory. Must exist on disk.
func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirectory {
bd := BlockDirectory{
BlockRef: ref,
Path: path,
refCount: atomic.NewInt32(0),
removeDirectoryTimeout: 5 * time.Second,
activeQueriersCheckInterval: 100 * time.Millisecond,
logger: logger,
BlockRef: ref,
Path: path,
refCount: atomic.NewInt32(0),
deleteTimeout: 5 * time.Second,
checkInterval: 50 * time.Millisecond,
logger: logger,
}
if err := bd.resolveSize(); err != nil {
panic(err)
Expand All @@ -111,12 +111,12 @@ func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirect
// It maintains a counter for currently active readers.
type BlockDirectory struct {
BlockRef
Path string
refCount *atomic.Int32
removeDirectoryTimeout time.Duration
activeQueriersCheckInterval time.Duration
size int64
logger log.Logger
Path string
refCount *atomic.Int32
deleteTimeout time.Duration
checkInterval time.Duration
size int64
logger log.Logger
}

// Convenience function to create a new block from a directory.
Expand Down Expand Up @@ -175,31 +175,53 @@ const defaultActiveQueriersCheckInterval = 100 * time.Millisecond

// removeBlockDirectory is called by the cache when an item is evicted
// The cache key and the cache value are passed to this function.
// The function needs to be synchronous, because otherwise we could get a cache
// race condition where the item is already evicted from the cache, but the
// underlying directory isn't.
// This function does not immediately remove the block directory, but only
// renames it, which allows that existing readers can still used it. Once the
// reader count is down to 0 the moved directory is deleted.
func removeBlockDirectory(entry *cache.Entry[string, BlockDirectory]) {
b := entry.Value

timeout := time.After(b.removeDirectoryTimeout)
ticker := time.NewTicker(b.activeQueriersCheckInterval)
defer ticker.Stop()
// Shortcut: Remove directory immediately if there are no readers accessing the directory.
if b.refCount.Load() == 0 {
if err := os.RemoveAll(b.Path); err != nil {
level.Error(b.logger).Log("msg", "error deleting block directory", "path", b.Path, "err", err)
}
return
}

// Otherwise, rename the current block directory.
// Existing readers will still be able to access the files via their inodes.
newPath := b.Path + "-removed"
if err := os.Rename(b.Path, newPath); err != nil {
level.Error(b.logger).Log("msg", "failed to move block directory", "err", err)
return
}

for {
select {
case <-ticker.C:
if b.refCount.Load() == 0 {
if err := os.RemoveAll(b.Path); err != nil {
level.Error(b.logger).Log("msg", "error deleting block directory", "err", err)
// NB(chaudum): If a single goroutine for each directory turns out to be a
// problem then run a a single goroutine cleanup job instead.
go func(bd BlockDirectory, path string) {
timeout := time.After(bd.deleteTimeout)
ticker := time.NewTicker(bd.checkInterval)
defer ticker.Stop()

start := time.Now()
for {
select {
case <-ticker.C:
if b.refCount.Load() == 0 {
if err := os.RemoveAll(path); err != nil {
level.Error(b.logger).Log("msg", "error deleting block directory", "path", path, "err", err)
}
level.Debug(b.logger).Log("msg", "deleted block directory", "after", time.Since(start))
return
}
case <-timeout:
level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", bd.deleteTimeout)
if err := os.RemoveAll(path); err != nil {
level.Error(b.logger).Log("msg", "error force deleting block directory", "path", path, "err", err)
}
return
}
case <-timeout:
level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout)
if err := os.RemoveAll(b.Path); err != nil {
level.Error(b.logger).Log("msg", "error force deleting block directory", "err", err)
}
return
}
}
}(b, newPath)
}
21 changes: 14 additions & 7 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func TestBlockDirectory_Cleanup(t *testing.T) {
require.DirExists(t, extractedBlockDirectory)

blockDir := BlockDirectory{
Path: extractedBlockDirectory,
removeDirectoryTimeout: timeout,
activeQueriersCheckInterval: checkInterval,
logger: log.NewNopLogger(),
refCount: atomic.NewInt32(0),
Path: extractedBlockDirectory,
deleteTimeout: timeout,
checkInterval: checkInterval,
logger: log.NewNopLogger(),
refCount: atomic.NewInt32(0),
}
// acquire directory
blockDir.refCount.Inc()
Expand All @@ -96,7 +96,14 @@ func TestBlockDirectory_Cleanup(t *testing.T) {
Key: blockDir.Path,
Value: blockDir,
}
go removeBlockDirectory(e)
removeBlockDirectory(e)

// old block dir does not exist any more
require.NoDirExists(t, extractedBlockDirectory)

// has been renamed
newPath := extractedBlockDirectory + "-removed"
require.DirExists(t, newPath)

if tc.releaseQuerier {
// release directory
Expand All @@ -105,7 +112,7 @@ func TestBlockDirectory_Cleanup(t *testing.T) {

// ensure directory does not exist any more
require.Eventually(t, func() bool {
return directoryDoesNotExist(extractedBlockDirectory)
return !DirExists(newPath)
}, tc.expectDirectoryToBeDeletedWithin, 10*time.Millisecond)
})
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/compress_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomshipper

import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
Expand All @@ -13,9 +14,18 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

func directoryDoesNotExist(path string) bool {
_, err := os.Lstat(path)
return err != nil
func DirExists(path string) bool {
info, err := os.Lstat(path)
if err != nil {
if os.IsNotExist(err) {
return false
}
panic(fmt.Sprintf("error running os.Lstat(%q): %s", path, err))
}
if !info.IsDir() {
panic(fmt.Sprintf("%q is not a directory", path))
}
return true
}

const testArchiveFileName = "test-block-archive"
Expand Down

0 comments on commit be54b1e

Please sign in to comment.