Skip to content

Commit

Permalink
shard parsing returns verison
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Mar 19, 2024
1 parent 21a2edd commit b9f6121
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 19 deletions.
34 changes: 24 additions & 10 deletions pkg/logql/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ func (s Shard) String() string {

type Shards []Shard

type ShardVersion uint8

const (
PowerOfTwoVersion ShardVersion = iota
BoundedVersion
)

func (xs Shards) Encode() (encoded []string) {
for _, shard := range xs {
encoded = append(encoded, shard.String())
Expand All @@ -134,38 +141,45 @@ func (xs Shards) Encode() (encoded []string) {
}

// ParseShards parses a list of string encoded shards
func ParseShards(strs []string) (Shards, error) {
func ParseShards(strs []string) (Shards, ShardVersion, error) {
if len(strs) == 0 {
return nil, nil
return nil, PowerOfTwoVersion, nil
}
shards := make(Shards, 0, len(strs))

for _, str := range strs {
shard, err := ParseShard(str)
var prevVersion ShardVersion
for i, str := range strs {
shard, version, err := ParseShard(str)
if err != nil {
return nil, err
return nil, PowerOfTwoVersion, err
}

if i == 0 {
prevVersion = version
} else if prevVersion != version {
return nil, PowerOfTwoVersion, errors.New("shards must be of the same version")
}
shards = append(shards, shard)
}
return shards, nil
return shards, prevVersion, nil
}

func ParseShard(s string) (Shard, error) {
func ParseShard(s string) (Shard, ShardVersion, error) {

var bounded logproto.Shard
v2Err := json.Unmarshal([]byte(s), &bounded)
if v2Err == nil {
return Shard{Bounded: &bounded}, nil
return Shard{Bounded: &bounded}, BoundedVersion, nil
}

old, v1Err := astmapper.ParseShard(s)
if v1Err == nil {
return Shard{PowerOfTwo: &old}, nil
return Shard{PowerOfTwo: &old}, PowerOfTwoVersion, nil
}

err := errors.Wrap(
multierror.New(v1Err, v2Err).Err(),
"failed to parse shard",
)
return Shard{}, err
return Shard{}, PowerOfTwoVersion, err
}
75 changes: 69 additions & 6 deletions pkg/logql/shards_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ func TestShardString(t *testing.T) {

func TestParseShard(t *testing.T) {
for _, rc := range []struct {
str string
exp Shard
str string
version ShardVersion
exp Shard
}{
{
str: "1_of_2",
str: "1_of_2",
version: PowerOfTwoVersion,
exp: Shard{
PowerOfTwo: &astmapper.ShardAnnotation{
Shard: 1,
Expand All @@ -90,7 +92,8 @@ func TestParseShard(t *testing.T) {
},
},
{
str: `{"bounds":{"min":1,"max":2},"stats":null}`,
str: `{"bounds":{"min":1,"max":2},"stats":null}`,
version: BoundedVersion,
exp: Shard{
Bounded: &logproto.Shard{
Bounds: logproto.FPBounds{
Expand All @@ -101,7 +104,8 @@ func TestParseShard(t *testing.T) {
},
},
{
str: `{"bounds":{"min":1,"max":2},"stats":{"streams":0,"chunks":0,"bytes":1,"entries":0}}`,
str: `{"bounds":{"min":1,"max":2},"stats":{"streams":0,"chunks":0,"bytes":1,"entries":0}}`,
version: BoundedVersion,
exp: Shard{
Bounded: &logproto.Shard{
Stats: &logproto.IndexStatsResponse{
Expand All @@ -116,9 +120,68 @@ func TestParseShard(t *testing.T) {
},
} {
t.Run(rc.str, func(t *testing.T) {
shard, err := ParseShard(rc.str)
shard, version, err := ParseShard(rc.str)
require.NoError(t, err)
require.Equal(t, rc.version, version)
require.Equal(t, rc.exp, shard)
})
}
}

func TestParseShards(t *testing.T) {
for _, rc := range []struct {
strs []string
version ShardVersion
exp Shards
err bool
}{
{
strs: []string{"1_of_2", "1_of_2"},
version: PowerOfTwoVersion,
exp: Shards{
NewPowerOfTwoShard(astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
}),
NewPowerOfTwoShard(astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
}),
},
},
{
strs: []string{`{"bounds":{"min":1,"max":2},"stats":null}`, `{"bounds":{"min":1,"max":2},"stats":null}`},
version: BoundedVersion,
exp: Shards{
NewBoundedShard(logproto.Shard{
Bounds: logproto.FPBounds{
Min: 1,
Max: 2,
},
}),
NewBoundedShard(logproto.Shard{
Bounds: logproto.FPBounds{
Min: 1,
Max: 2,
},
}),
},
},
{
strs: []string{`{"bounds":{"min":1,"max":2},"stats":null}`, "1_of_2"},
version: PowerOfTwoVersion,
err: true,
},
} {
t.Run(fmt.Sprintf("%+v", rc.strs), func(t *testing.T) {
shards, version, err := ParseShards(rc.strs)
if rc.err {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, rc.version, version)
require.Equal(t, rc.exp, shards)
})
}
}
6 changes: 3 additions & 3 deletions pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type MockQuerier struct {
}

func (q MockQuerier) extractOldShard(xs []string) (*astmapper.ShardAnnotation, error) {
parsed, err := ParseShards(xs)
parsed, version, err := ParseShards(xs)
if err != nil {
return nil, err
}

if parsed[0].PowerOfTwo == nil {
return nil, fmt.Errorf("shard is not a power of two")
if version != PowerOfTwoVersion {
return nil, fmt.Errorf("unsupported shard version: %d", version)
}

return parsed[0].PowerOfTwo, nil
Expand Down

0 comments on commit b9f6121

Please sign in to comment.