Skip to content

Commit

Permalink
fix(blooms): Match series to newest block only (#15481)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

While running bloom filters in production we noticed some Loki clusters that showed a very high percentage of missing chunks when querying blooms, thus resulting in lower filter rate.

The reason is that old, superseded blocks are still considered up-to-date, because they cover a keyspace that is not covered by newer blocks with smaller keyspaces (due to larger individual series).

```
            | series fingerprint keyspace
------------+----------------------------------------------------------------
            |    o     o   o  o  o     o    o o    o       o    o   o    o
------------+----------------------------------------------------------------
iteration 1 |    111111111111111111111111111111111111111111111111111111111
iteration 2 |    22222222222222             3333333333333333        444444
iteration 3 |    5555555   6666666     77777777    888888888    9999999999
...
up-to-date  |    555555522266666661111177777777333388888888811119999999999
------------+----------------------------------------------------------------
            |          x
```

The chart shows the different blocks marked with the numbers 1 to 9 for a subset of the full series fingerprint keyspace. The blocks are generated in multiple successive bloom building iterations. The first block covers a larger keyspace (more series), because the individual blooms in the blocks are smaller in the beginning of the day. Later, the blooms get larger and therefore the block fingerprint ranges gets smaller. However, since we are dealing with fingerprint ranges, not individual fingerprints, the newer blocks cause "gaps" in the range of the previously larger keyspace. In the case above, every block except block 4, are considered up-to-date, since each of them covers a keyspace that is otherwise not covered.

When resolving blocks for a series at query time, we consider looking at all up-to-date blocks, which are referenced by the meta files. The series `x` in the chart shows, that it is within the range of 3 up-to-date blocks: 1, 2, 5. However, only the newest block (5) may contain the requested series.

This PR changes the block resolver on the index-gateway to only match the newest block to a series, based on the timestamp of the TSDB from with the blocks were generated.

---
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Dec 19, 2024
1 parent 60f031a commit 5846ea2
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 30 deletions.
3 changes: 1 addition & 2 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)
Expand Down Expand Up @@ -415,7 +414,6 @@ func (b *Builder) processTask(
Bounds: gap.Bounds,
},
},
Sources: []tsdb.SingleTenantTSDBIdentifier{task.TSDB},
}

// Fetch blocks that aren't up to date but are in the desired fingerprint range
Expand Down Expand Up @@ -492,6 +490,7 @@ func (b *Builder) processTask(
level.Debug(logger).Log("msg", "uploaded block", "progress_pct", fmt.Sprintf("%.2f", pct))

meta.Blocks = append(meta.Blocks, built.BlockRef)
meta.Sources = append(meta.Sources, task.TSDB)
}

if err := newBlocks.Err(); err != nil {
Expand Down
70 changes: 45 additions & 25 deletions pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -61,36 +62,55 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter
}

func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries {
result := make([]blockWithSeries, 0, len(metas))

for _, meta := range metas {
for _, block := range meta.Blocks {
slices.SortFunc(series, func(a, b *logproto.GroupedChunkRefs) int { return int(a.Fingerprint - b.Fingerprint) })

// skip blocks that are not within time interval
if !interval.Overlaps(block.Interval()) {
continue
result := make([]blockWithSeries, 0, len(metas))
cache := make(map[bloomshipper.BlockRef]int)

// find the newest block for each series
for _, s := range series {
var b *bloomshipper.BlockRef
var newestTs time.Time

for i := range metas {
for j := range metas[i].Blocks {
block := metas[i].Blocks[j]
// To keep backwards compatibility, we can only look at the source at index 0
// because in the past the slice had always length 1, see
// https://github.com/grafana/loki/blob/b4060154d198e17bef8ba0fbb1c99bb5c93a412d/pkg/bloombuild/builder/builder.go#L418
sourceTs := metas[i].Sources[0].TS
// Newer metas have len(Sources) == len(Blocks)
if len(metas[i].Sources) > j {
sourceTs = metas[i].Sources[j].TS
}
// skip blocks that are not within time interval
if !interval.Overlaps(block.Interval()) {
continue
}
// skip blocks that do not contain the series
if block.Cmp(s.Fingerprint) != v1.Overlap {
continue
}
// only use the block if it is newer than the previous
if sourceTs.After(newestTs) {
b = &block
newestTs = sourceTs
}
}
}

min := sort.Search(len(series), func(i int) bool {
return block.Cmp(series[i].Fingerprint) > v1.Before
})

max := sort.Search(len(series), func(i int) bool {
return block.Cmp(series[i].Fingerprint) == v1.After
})

// All fingerprints fall outside of the consumer's range
if min == len(series) || max == 0 || min == max {
continue
}
if b == nil {
continue
}

// At least one fingerprint is within bounds of the blocks
// so append to results
dst := make([]*logproto.GroupedChunkRefs, max-min)
_ = copy(dst, series[min:max])
idx, ok := cache[*b]
if ok {
result[idx].series = append(result[idx].series, s)
} else {
cache[*b] = len(result)
result = append(result, blockWithSeries{
block: block,
series: dst,
block: *b,
series: []*logproto.GroupedChunkRefs{s},
})
}
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/bloomgateway/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef {
Expand All @@ -28,6 +29,9 @@ func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshi
Blocks: []bloomshipper.BlockRef{
makeBlockRef(minFp, maxFp, from, through),
},
Sources: []tsdb.SingleTenantTSDBIdentifier{
{TS: through.Time()},
},
}
}

Expand Down Expand Up @@ -100,14 +104,21 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) {

t.Run("multiple overlapping blocks within time range covering full keyspace", func(t *testing.T) {
metas := []bloomshipper.Meta{
makeMeta(0x00, 0xdf, 1000, 1999),
makeMeta(0xc0, 0xff, 1000, 1999),
// 2 series overlap
makeMeta(0x00, 0xdf, 1000, 1499), // "old" meta covers first 4 series
makeMeta(0xc0, 0xff, 1500, 1999), // "new" meta covers last 4 series
}
res := blocksMatchingSeries(metas, interval, series)
for i := range res {
t.Logf("%s", res[i].block)
for j := range res[i].series {
t.Logf(" %016x", res[i].series[j].Fingerprint)
}
}
expected := []blockWithSeries{
{
block: metas[0].Blocks[0],
series: series[0:4],
series: series[0:2], // series 0x00c0 and 0x00d0 are covered in the newer block
},
{
block: metas[1].Blocks[0],
Expand Down

0 comments on commit 5846ea2

Please sign in to comment.