Skip to content

Commit

Permalink
shardResolver.GetShardingRanges support
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Mar 18, 2024
1 parent fdbe1fa commit 691ba04
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 15 deletions.
15 changes: 10 additions & 5 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,27 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/util/math"
"github.com/grafana/loki/pkg/logproto"

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
util_log "github.com/grafana/loki/pkg/util/log"
)

type ShardResolver interface {
Shards(expr syntax.Expr) (int, uint64, error)
ShardingRanges(expr syntax.Expr, targetBytesPerShard uint64) ([]logproto.Shard, error)
GetStats(e syntax.Expr) (stats.Stats, error)
}

type ConstantShards int

func (s ConstantShards) Shards(_ syntax.Expr) (int, uint64, error) { return int(s), 0, nil }
func (s ConstantShards) Shards(_ syntax.Expr) (int, uint64, error) { return int(s), 0, nil }
func (s ConstantShards) ShardingRanges(_ syntax.Expr, _ uint64) ([]logproto.Shard, error) {
return sharding.LinearShards(int(s), 0), nil
}
func (s ConstantShards) GetStats(_ syntax.Expr) (stats.Stats, error) { return stats.Stats{}, nil }

const (
Expand Down Expand Up @@ -148,7 +153,7 @@ func (m ShardMapper) mapBinOpExpr(e *syntax.BinOpExpr, r *downstreamRecorder) (*
e.RHS = rhsSampleExpr

// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard)))
bytesPerShard := uint64(max(int(lhsBytesPerShard), int(rhsBytesPerShard)))

return e, bytesPerShard, nil
}
Expand Down Expand Up @@ -273,7 +278,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
}

// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard)))
bytesPerShard := uint64(max(int(lhsBytesPerShard), int(rhsBytesPerShard)))

return &syntax.BinOpExpr{
SampleExpr: lhs,
Expand Down Expand Up @@ -442,7 +447,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
}

// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard)))
bytesPerShard := uint64(max(int(lhsBytesPerShard), int(rhsBytesPerShard)))

return &syntax.BinOpExpr{
SampleExpr: lhs,
Expand Down
77 changes: 77 additions & 0 deletions pkg/querier/queryrange/shard_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/dustin/go-humanize"
"github.com/efficientgo/core/errors"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
Expand Down Expand Up @@ -208,3 +209,79 @@ func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, uint64, error) {
)
return factor, bytesPerShard, nil
}

func (r *dynamicShardResolver) ShardingRanges(expr syntax.Expr, targetBytesPerShard uint64) ([]logproto.Shard, error) {
sp, ctx := opentracing.StartSpanFromContext(r.ctx, "dynamicShardResolver.ShardingRanges")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Finish()

adjustedFrom := r.from

// NB(owen-d): there should only ever be 1 matcher group passed
// to this call as we call it separately for different legs
// of binary ops, but I'm putting in the loop for completion
grps, err := syntax.MatcherGroups(expr)
if err != nil {
return nil, err
}

for _, grp := range grps {
diff := grp.Interval + grp.Offset

// For instant queries, when start == end,
// we have a default lookback which we add here
if grp.Interval == 0 {
diff = diff + r.defaultLookback
}

// use the oldest adjustedFrom
if r.from.Add(-diff).Before(adjustedFrom) {
adjustedFrom = r.from.Add(-diff)
}
}

exprStr := expr.String()
// try to get shards for the given expression
// if it fails, fallback to linearshards based on stats
resp, err := r.handler.Do(ctx, &logproto.ShardsRequest{
From: adjustedFrom,
Through: r.through,
Query: expr.String(),
TargetBytesPerShard: targetBytesPerShard,
})

if err != nil {
// TODO(owen-d): check unimplemented to fallback
if false {
n, bytesPerShard, err := r.Shards(expr)
if err != nil {
return nil, errors.Wrap(err, "falling back to building linear shards from stats")
}
level.Debug(log).Log(
"msg", "falling back to building linear shards from stats",
"bytes_per_shard", bytesPerShard,
"shards", n,
"query", exprStr,
)
return sharding.LinearShards(n, uint64(n)*bytesPerShard), nil
}

return nil, errors.Wrapf(err, "failed to get shards for expression, got %T: %+v", err, err)

}

casted, ok := resp.(*ShardsResponse)
if !ok {
return nil, fmt.Errorf("expected *ShardsResponse while querying index, got %T", resp)
}

level.Debug(log).Log(
"msg", "retrieved sharding ranges",
"target_bytes_per_shard", targetBytesPerShard,
"shards", len(casted.Response.Shards),
"query", exprStr,
)

return casted.Response.Shards, err
}
39 changes: 31 additions & 8 deletions pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
"github.com/grafana/loki/pkg/validation"
"github.com/prometheus/common/model"
)

const (
DefaultTSDBMaxBytesPerShard = 600 << 20 // 600MB
)

// PowerOfTwoSharding is a slimmed down legacy sharding implementation
// designed for use as a fallback when the newer impls aren't found
// (i.e. during a system upgrade to support the new impl)
Expand All @@ -31,13 +34,32 @@ func (p PowerOfTwoSharding) ShardsFor(bytes uint64, maxBytesPerShard uint64) []l
}}
}

bytesPerShard := bytes
if factor > 0 {
bytesPerShard = bytes / uint64(factor)
return LinearShards(factor, bytes)

}

// LinearShards is a sharding implementation that splits the data into
// equal sized shards covering the entire keyspace. It populates
// the `bytes` of each shard's stats with a proportional estimation
func LinearShards(n int, bytes uint64) []logproto.Shard {
if n < 2 {
return []logproto.Shard{
{
Bounds: logproto.FPBounds{
Min: 0,
Max: math.MaxUint64,
},
Stats: &stats.Stats{
Bytes: bytes,
},
},
}
}
fpPerShard := model.Fingerprint(math.MaxUint64) / model.Fingerprint(factor)

shards := make([]logproto.Shard, factor)
bytesPerShard := bytes / uint64(n)
fpPerShard := model.Fingerprint(math.MaxUint64) / model.Fingerprint(n)

shards := make([]logproto.Shard, n)
for i := range shards {
shards[i] = logproto.Shard{
Bounds: logproto.FPBounds{
Expand All @@ -53,11 +75,12 @@ func (p PowerOfTwoSharding) ShardsFor(bytes uint64, maxBytesPerShard uint64) []l
// and the max bound should be math.MaxUint64
// NB(owen-d): this can only happen when maxShards is used
// and the maxShards isn't a factor of 2
shards[len(shards)-1].Stats.Bytes += bytes % uint64(factor)
shards[len(shards)-1].Stats.Bytes += bytes % uint64(n)
shards[len(shards)-1].Bounds.Max = math.MaxUint64
}

return shards

}

// Since we shard by powers of two and we increase shard factor
Expand All @@ -69,7 +92,7 @@ func GuessShardFactor(bytes, maxBytesPerShard uint64, maxShards int) int {
// If maxBytesPerShard is 0, we use the default value
// to avoid division by zero
if maxBytesPerShard < 1 {
maxBytesPerShard = validation.DefaultTSDBMaxBytesPerShard
maxBytesPerShard = DefaultTSDBMaxBytesPerShard
}

minShards := float64(bytes) / float64(maxBytesPerShard)
Expand Down
5 changes: 3 additions & 2 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
ruler_config "github.com/grafana/loki/pkg/ruler/config"
"github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
"github.com/grafana/loki/pkg/util/flagext"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/validation"
Expand All @@ -48,8 +49,8 @@ const (

bytesInMB = 1048576

defaultPerStreamRateLimit = 3 << 20 // 3MB
DefaultTSDBMaxBytesPerShard = 600 << 20 // 600MB
defaultPerStreamRateLimit = 3 << 20 // 3MB
DefaultTSDBMaxBytesPerShard = sharding.DefaultTSDBMaxBytesPerShard
defaultPerStreamBurstLimit = 5 * defaultPerStreamRateLimit

DefaultPerTenantQueryTimeout = "1m"
Expand Down

0 comments on commit 691ba04

Please sign in to comment.