Skip to content

Commit

Permalink
Make block cache evict function synchronous
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 6, 2024
1 parent 7bbbf23 commit 0a05c39
Showing 1 changed file with 49 additions and 49 deletions.
98 changes: 49 additions & 49 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package bloomshipper

import (
"fmt"
"os"
"path"
"time"
Expand Down Expand Up @@ -37,26 +36,18 @@ func NewBlocksCache(config config.Config, reg prometheus.Registerer, logger log.
logger,
stats.BloomBlocksCache,
calculateBlockDirectorySize,
func(_ string, value BlockDirectory) {
value.removeDirectoryAsync()
})
}

func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uint64 {
value := entry.Value
bloomFileStats, _ := os.Lstat(path.Join(value.Path, v1.BloomFileName))
seriesFileStats, _ := os.Lstat(path.Join(value.Path, v1.SeriesFileName))
return uint64(bloomFileStats.Size() + seriesFileStats.Size())
removeBlockDirectory,
)
}

func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirectory {
return BlockDirectory{
BlockRef: ref,
Path: path,
activeQueriers: atomic.NewInt32(0),
removeDirectoryTimeout: time.Minute,
refCount: atomic.NewInt32(0),
removeDirectoryTimeout: 10 * time.Second,
activeQueriersCheckInterval: 100 * time.Millisecond,
logger: logger,
activeQueriersCheckInterval: defaultActiveQueriersCheckInterval,
}
}

Expand All @@ -65,63 +56,72 @@ func NewBlockDirectory(ref BlockRef, path string, logger log.Logger) BlockDirect
type BlockDirectory struct {
BlockRef
Path string
refCount *atomic.Int32
removeDirectoryTimeout time.Duration
activeQueriers *atomic.Int32
logger log.Logger
activeQueriersCheckInterval time.Duration
logger log.Logger
}

// Convenience function to create a new block from a directory.
// Must not be called outside of BlockQuerier().
func (b BlockDirectory) Block() *v1.Block {
return v1.NewBlock(v1.NewDirectoryBlockReader(b.Path))
}

// Acquire increases the ref counter on the directory.
func (b BlockDirectory) Acquire() {
_ = b.refCount.Inc()
}

// Release decreases the ref counter on the directory.
func (b BlockDirectory) Release() error {
_ = b.refCount.Dec()
return nil
}

// BlockQuerier returns a new block querier from the directory.
// It increments the counter of active queriers for this directory.
// The counter is decreased when the returned querier is closed.
func (b BlockDirectory) BlockQuerier() *ClosableBlockQuerier {
b.activeQueriers.Inc()
b.Acquire()
return &ClosableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b.Block()),
close: func() error {
_ = b.activeQueriers.Dec()
return nil
},
close: b.Release,
}
}

const defaultActiveQueriersCheckInterval = 100 * time.Millisecond
func calculateBlockDirectorySize(entry *cache.Entry[string, BlockDirectory]) uint64 {
value := entry.Value
bloomFileStats, _ := os.Lstat(path.Join(value.Path, v1.BloomFileName))
seriesFileStats, _ := os.Lstat(path.Join(value.Path, v1.SeriesFileName))
return uint64(bloomFileStats.Size() + seriesFileStats.Size())
}

func (b *BlockDirectory) removeDirectoryAsync() {
go func() {
timeout := time.After(b.removeDirectoryTimeout)
ticker := time.NewTicker(b.activeQueriersCheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if b.activeQueriers.Load() == 0 {
err := deleteFolder(b.Path)
if err == nil {
return
}
// removeBlockDirectory is called by the cache when an item is evicted
// The cache key and the cache value are passed to this function.
// The function needs to be synchronous, because otherwise we could get a cache
// race condition where the item is already evicted from the cache, but the
// underlying directory isn't.
func removeBlockDirectory(_ string, b BlockDirectory) {
timeout := time.After(b.removeDirectoryTimeout)
ticker := time.NewTicker(b.activeQueriersCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if b.refCount.Load() == 0 {
if err := os.RemoveAll(b.Path); err != nil {
level.Error(b.logger).Log("msg", "error deleting block directory", "err", err)
}
case <-timeout:
level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout)
err := deleteFolder(b.Path)
if err == nil {
return
}
return
}
case <-timeout:
level.Warn(b.logger).Log("msg", "force deleting block folder after timeout", "timeout", b.removeDirectoryTimeout)
if err := os.RemoveAll(b.Path); err != nil {
level.Error(b.logger).Log("msg", "error force deleting block directory", "err", err)
}
return
}
}()
}

func deleteFolder(folderPath string) error {
err := os.RemoveAll(folderPath)
if err != nil {
return fmt.Errorf("error deleting bloom block directory: %w", err)
}
return nil
}

0 comments on commit 0a05c39

Please sign in to comment.