Skip to content

Commit

Permalink
(chore) BloomStore: Clean up FetchBlocks() (#11876)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

* Removes `FetchBlocksWithQueue` and replace it with `FetchBlocks`
* Operate on single `BlockRef` when loading a block directory from
cache/fs/storage.
* Ensure order of responses from the FetchBlocks request

---------

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Feb 6, 2024
1 parent af66ece commit 69d152b
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 86 deletions.
4 changes: 2 additions & 2 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _ blooms
// Stop implements bloomshipper.Interface
func (s *mockBloomStore) Stop() {}

// Fetch implements bloomshipper.Interface
func (s *mockBloomStore) Fetch(_ context.Context, _ string, _ []bloomshipper.BlockRef, callback bloomshipper.ForEachBlockCallback) error {
// ForEach implements bloomshipper.Interface
func (s *mockBloomStore) ForEach(_ context.Context, _ string, _ []bloomshipper.BlockRef, callback bloomshipper.ForEachBlockCallback) error {
if s.err != nil {
time.Sleep(s.delay)
return s.err
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (w *worker) stopping(err error) error {
}

func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error {
return w.shipper.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds) error {
for _, b := range boundedRefs {
if b.blockRef.Bounds.Equal(bounds) {
return w.processBlock(bq, b.tasks)
Expand Down
128 changes: 55 additions & 73 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bloomshipper
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -125,120 +124,96 @@ func (f *Fetcher) writeBackMetas(ctx context.Context, metas []Meta) error {
return f.metasCache.Store(ctx, keys, data)
}

func (f *Fetcher) FetchBlocksWithQueue(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) {
responses := make(chan BlockDirectory, len(refs))
errors := make(chan error, len(refs))
for _, ref := range refs {
f.q.enqueue(downloadTask[BlockRef, BlockDirectory]{
func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) {
n := len(refs)

responses := make(chan downloadResponse[BlockDirectory], n)
errors := make(chan error, n)
for i := 0; i < n; i++ {
f.q.enqueue(downloadRequest[BlockRef, BlockDirectory]{
ctx: ctx,
item: ref,
key: f.client.Block(ref).Addr(),
item: refs[i],
key: f.client.Block(refs[i]).Addr(),
idx: i,
results: responses,
errors: errors,
})
}

results := make([]BlockDirectory, len(refs))

outer:
for i := 0; i < len(refs); i++ {
for i := 0; i < n; i++ {
select {
case err := <-errors:
return results, err
case res := <-responses:
for j, ref := range refs {
if res.BlockRef == ref {
results[j] = res
continue outer
}
}
return results, fmt.Errorf("no matching request found for response %s", res)
results[res.idx] = res.item
}
}

return results, nil
}

func (f *Fetcher) processTask(ctx context.Context, task downloadTask[BlockRef, BlockDirectory]) {
func (f *Fetcher) processTask(ctx context.Context, task downloadRequest[BlockRef, BlockDirectory]) {
if ctx.Err() != nil {
task.errors <- ctx.Err()
return
}

refs := []BlockRef{task.item}
results, err := f.FetchBlocks(ctx, refs)
result, err := f.fetchBlock(ctx, task.item)
if err != nil {
task.errors <- err
return
}

for _, res := range results {
task.results <- res
task.results <- downloadResponse[BlockDirectory]{
item: result,
key: task.key,
idx: task.idx,
}
}

// FetchBlocks returns a list of block directories
// It resolves them from three locations:
// fetchBlock resolves a block from three locations:
// 1. from cache
// 2. from file system
// 3. from remote storage
func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "fetch Blocks")
}
func (f *Fetcher) fetchBlock(ctx context.Context, ref BlockRef) (BlockDirectory, error) {
var zero BlockDirectory

keys := make([]string, 0, len(refs))
for _, ref := range refs {
keys = append(keys, f.client.Block(ref).Addr())
}
cacheHits, cacheBufs, _, err := f.blocksCache.Fetch(ctx, keys)
if err != nil {
return nil, err
if ctx.Err() != nil {
return zero, errors.Wrap(ctx.Err(), "fetch block")
}

results := make([]BlockDirectory, 0, len(refs))
keys := []string{f.client.Block(ref).Addr()}

fromCache, missing, err := f.processBlocksCacheResponse(ctx, refs, cacheHits, cacheBufs)
_, fromCache, _, err := f.blocksCache.Fetch(ctx, keys)
if err != nil {
return nil, err
return zero, err
}
results = append(results, fromCache...)

fromLocalFS, missing, err := f.loadBlocksFromFS(ctx, missing)
if err != nil {
return nil, err
// item found in cache
if len(fromCache) == 1 {
return fromCache[0], nil
}
results = append(results, fromLocalFS...)

fromStorage, err := f.client.GetBlocks(ctx, missing)
fromLocalFS, _, err := f.loadBlocksFromFS(ctx, []BlockRef{ref})
if err != nil {
return nil, err
return zero, err
}
results = append(results, fromStorage...)

err = f.writeBackBlocks(ctx, fromStorage)
return results, err
}

func (f *Fetcher) processBlocksCacheResponse(_ context.Context, refs []BlockRef, keys []string, entries []BlockDirectory) ([]BlockDirectory, []BlockRef, error) {
found := make(map[string]BlockDirectory, len(refs))
for i, k := range keys {
found[k] = entries[i]
// item found on local file system
if len(fromLocalFS) == 1 {
err = f.writeBackBlocks(ctx, fromLocalFS)
return fromLocalFS[0], err
}

blockDirs := make([]BlockDirectory, 0, len(found))
missing := make([]BlockRef, 0, len(refs)-len(keys))

var lastErr error
for i, ref := range refs {
if raw, ok := found[f.client.Block(ref).Addr()]; ok {
blockDirs = append(blockDirs, raw)
} else {
missing = append(missing, refs[i])
}
fromStorage, err := f.client.GetBlock(ctx, ref)
if err != nil {
return zero, err
}

return blockDirs, missing, lastErr
// item found in storage
err = f.writeBackBlocks(ctx, []BlockDirectory{fromStorage})
return fromStorage, err
}

func (f *Fetcher) loadBlocksFromFS(_ context.Context, refs []BlockRef) ([]BlockDirectory, []BlockRef, error) {
Expand Down Expand Up @@ -289,18 +264,25 @@ func (f *Fetcher) writeBackBlocks(ctx context.Context, blocks []BlockDirectory)
return f.blocksCache.Store(ctx, keys, blocks)
}

type processFunc[T any, R any] func(context.Context, downloadTask[T, R])
type processFunc[T any, R any] func(context.Context, downloadRequest[T, R])

type downloadTask[T any, R any] struct {
type downloadRequest[T any, R any] struct {
ctx context.Context
item T
key string
results chan<- R
idx int
results chan<- downloadResponse[R]
errors chan<- error
}

type downloadResponse[R any] struct {
item R
key string
idx int
}

type downloadQueue[T any, R any] struct {
queue chan downloadTask[T, R]
queue chan downloadRequest[T, R]
mu keymutex.KeyMutex
wg sync.WaitGroup
done chan struct{}
Expand All @@ -310,7 +292,7 @@ type downloadQueue[T any, R any] struct {

func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R], logger log.Logger) *downloadQueue[T, R] {
q := &downloadQueue[T, R]{
queue: make(chan downloadTask[T, R], size),
queue: make(chan downloadRequest[T, R], size),
mu: keymutex.NewHashed(workers),
done: make(chan struct{}),
process: process,
Expand All @@ -323,7 +305,7 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R]
return q
}

func (q *downloadQueue[T, R]) enqueue(t downloadTask[T, R]) {
func (q *downloadQueue[T, R]) enqueue(t downloadRequest[T, R]) {
q.queue <- t
}

Expand All @@ -339,7 +321,7 @@ func (q *downloadQueue[T, R]) runWorker() {
}
}

func (q *downloadQueue[T, R]) do(ctx context.Context, task downloadTask[T, R]) {
func (q *downloadQueue[T, R]) do(ctx context.Context, task downloadRequest[T, R]) {
q.mu.LockKey(task.key)
defer func() {
err := q.mu.UnlockKey(task.key)
Expand Down
15 changes: 11 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type ForEachBlockCallback func(bq *v1.BlockQuerier, bounds v1.FingerprintBounds)

type Interface interface {
GetBlockRefs(ctx context.Context, tenant string, interval Interval) ([]BlockRef, error)
Fetch(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error
ForEach(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error
Stop()
}

Expand Down Expand Up @@ -58,14 +58,21 @@ func (s *Shipper) GetBlockRefs(ctx context.Context, tenantID string, interval In
return blockRefs, nil
}

func (s *Shipper) Fetch(ctx context.Context, _ string, blocks []BlockRef, callback ForEachBlockCallback) error {
func (s *Shipper) ForEach(ctx context.Context, _ string, blocks []BlockRef, callback ForEachBlockCallback) error {
blockDirs, err := s.store.FetchBlocks(ctx, blocks)
if err != nil {
return err
}

for _, dir := range blockDirs {
err := runCallback(callback, dir.BlockQuerier(), dir.BlockRef.Bounds)
if len(blockDirs) != len(blocks) {
return fmt.Errorf("number of responses (%d) does not match number of requests (%d)", len(blockDirs), len(blocks))
}

for i := range blocks {
if blockDirs[i].BlockRef != blocks[i] {
return fmt.Errorf("invalid order of responses: expected: %v, got: %v", blocks[i], blockDirs[i].BlockRef)
}
err := runCallback(callback, blockDirs[i].BlockQuerier(), blockDirs[i].BlockRef.Bounds)
if err != nil {
return err
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"

"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -107,7 +108,7 @@ func (b *bloomStoreEntry) FetchMetas(ctx context.Context, params MetaSearchParam

// FetchBlocks implements Store.
func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef) ([]BlockDirectory, error) {
return b.fetcher.FetchBlocksWithQueue(ctx, refs)
return b.fetcher.FetchBlocks(ctx, refs)
}

// Fetcher implements Store.
Expand Down Expand Up @@ -299,13 +300,24 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]Bloc

results := make([]BlockDirectory, 0, len(blocks))
for i := range fetchers {
res, err := fetchers[i].FetchBlocksWithQueue(ctx, refs[i])
res, err := fetchers[i].FetchBlocks(ctx, refs[i])
results = append(results, res...)
if err != nil {
return results, err
}
}

// sort responses (results []BlockDirectory) based on requests (blocks []BlockRef)
slices.SortFunc(results, func(a, b BlockDirectory) int {
ia, ib := slices.Index(blocks, a.BlockRef), slices.Index(blocks, b.BlockRef)
if ia < ib {
return -1
} else if ia > ib {
return +1
}
return 0
})

return results, nil
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,8 @@ func TestBloomStore_FetchBlocks(t *testing.T) {
require.NoError(t, err)
require.Len(t, blockDirs, 4)

// Note the order: b1 and b2 come from cache, so they are in the beginning of the response
// Do we need to sort the response based on the request order of block refs?
require.ElementsMatch(t,
[]BlockRef{b1.BlockRef, b3.BlockRef, b2.BlockRef, b4.BlockRef},
require.Equal(t,
[]BlockRef{b1.BlockRef, b2.BlockRef, b3.BlockRef, b4.BlockRef},
[]BlockRef{blockDirs[0].BlockRef, blockDirs[1].BlockRef, blockDirs[2].BlockRef, blockDirs[3].BlockRef},
)
}

0 comments on commit 69d152b

Please sign in to comment.