Skip to content

Commit

Permalink
Rename directory upon eviction
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Mar 14, 2024
1 parent f838b1f commit 53de5c3
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 36 deletions.
84 changes: 53 additions & 31 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uin
// NewBlockDirectory creates a new BlockDirectory. Must exist on disk.
func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirectory {
bd := BlockDirectory{
BlockRef: ref,
Path: path,
refCount: atomic.NewInt32(0),
removeDirectoryTimeout: 5 * time.Second,
activeQueriersCheckInterval: 100 * time.Millisecond,
logger: logger,
BlockRef: ref,
Path: path,
refCount: atomic.NewInt32(0),
deleteTimeout: 5 * time.Second,
checkInterval: 50 * time.Millisecond,
logger: logger,
}
if err := bd.resolveSize(); err != nil {
panic(err)
Expand All @@ -111,12 +111,12 @@ func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirect
// It maintains a counter for currently active readers.
type BlockDirectory struct {
BlockRef
Path string
refCount *atomic.Int32
removeDirectoryTimeout time.Duration
activeQueriersCheckInterval time.Duration
size int64
logger log.Logger
Path string
refCount *atomic.Int32
deleteTimeout time.Duration
checkInterval time.Duration
size int64
logger log.Logger
}

// Convenience function to create a new block from a directory.
Expand Down Expand Up @@ -175,31 +175,53 @@ const defaultActiveQueriersCheckInterval = 100 * time.Millisecond

// removeBlockDirectory is called by the cache when an item is evicted
// The cache key and the cache value are passed to this function.
// The function needs to be synchronous, because otherwise we could get a cache
// race condition where the item is already evicted from the cache, but the
// underlying directory isn't.
// This function does not immediately remove the block directory, but only
// renames it, which allows that existing readers can still used it. Once the
// reader count is down to 0 the moved directory is deleted.
func removeBlockDirectory(entry *cache.Entry[string, BlockDirectory]) {
b := entry.Value

timeout := time.After(b.removeDirectoryTimeout)
ticker := time.NewTicker(b.activeQueriersCheckInterval)
defer ticker.Stop()
// Shortcut: Remove directory immediately if there are no readers accessing the directory.
if b.refCount.Load() == 0 {
if err := os.RemoveAll(b.Path); err != nil {
level.Error(b.logger).Log("msg", "error deleting block directory", "path", b.Path, "err", err)
}
return
}

// Otherwise, rename the current block directory.
// Existing readers will still be able to access the files via their inodes.
newPath := b.Path + "-removed"
if err := os.Rename(b.Path, newPath); err != nil {
level.Error(b.logger).Log("msg", "failed to move block directory", "err", err)
return
}

for {
select {
case <-ticker.C:
if b.refCount.Load() == 0 {
if err := os.RemoveAll(b.Path); err != nil {
level.Error(b.logger).Log("msg", "error deleting block directory", "err", err)
// NB(chaudum): If a single gorouting for each directory turns out to be a
// problem then run a a single goroutine cleanup job instead.
go func(bd BlockDirectory, path string) {
timeout := time.After(bd.deleteTimeout)
ticker := time.NewTicker(bd.checkInterval)
defer ticker.Stop()

start := time.Now()
for {
select {
case <-ticker.C:
if b.refCount.Load() == 0 {
if err := os.RemoveAll(path); err != nil {
level.Error(b.logger).Log("msg", "error deleting block directory", "path", path, "err", err)
}
level.Debug(b.logger).Log("msg", "deleted block directory", "after", time.Since(start))
return
}
case <-timeout:
level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", bd.deleteTimeout)
if err := os.RemoveAll(path); err != nil {
level.Error(b.logger).Log("msg", "error force deleting block directory", "path", path, "err", err)
}
return
}
case <-timeout:
level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout)
if err := os.RemoveAll(b.Path); err != nil {
level.Error(b.logger).Log("msg", "error force deleting block directory", "err", err)
}
return
}
}
}(b, newPath)
}
10 changes: 5 additions & 5 deletions pkg/storage/stores/shipper/bloomshipper/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func TestBlockDirectory_Cleanup(t *testing.T) {
require.DirExists(t, extractedBlockDirectory)

blockDir := BlockDirectory{
Path: extractedBlockDirectory,
removeDirectoryTimeout: timeout,
activeQueriersCheckInterval: checkInterval,
logger: log.NewNopLogger(),
refCount: atomic.NewInt32(0),
Path: extractedBlockDirectory,
deleteTimeout: timeout,
checkInterval: checkInterval,
logger: log.NewNopLogger(),
refCount: atomic.NewInt32(0),
}
// acquire directory
blockDir.refCount.Inc()
Expand Down

0 comments on commit 53de5c3

Please sign in to comment.