Skip to content

Commit

Permalink
Merge pull request #507 from parca-dev/fix-sample-index
Browse files Browse the repository at this point in the history
pkg/storage: Fix bug so samples seek to correct index
  • Loading branch information
brancz committed Dec 21, 2021
2 parents 3d5c692 + ea085ac commit 26dc2f1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/series_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ type MemSeriesInstantFlatProfile struct {
durationsIterator MemSeriesValuesIterator
periodsIterator MemSeriesValuesIterator

sampleIterators map[string]MemSeriesValuesIterator
sampleIterators map[string]*MultiChunksIterator
locations map[string][]*metastore.Location
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/storage/series_iterator_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (rs *MemRangeSeries) Iterator() ProfileSeriesIterator {
rootIt.Seek(start)
}

var sampleIterators map[string]MemSeriesValuesIterator
var sampleIterators map[string]*MultiChunksIterator

root := &MemSeriesIteratorTreeNode{}
if rs.trees {
Expand Down Expand Up @@ -109,7 +109,7 @@ func (rs *MemRangeSeries) Iterator() ProfileSeriesIterator {
memItStack.Pop()
}
} else {
sampleIterators = make(map[string]MemSeriesValuesIterator, len(rs.s.samples))
sampleIterators = make(map[string]*MultiChunksIterator, len(rs.s.samples))
for key, chunks := range rs.s.samples {
sampleIterators[key] = NewMultiChunkIterator(chunks)
}
Expand All @@ -123,6 +123,12 @@ func (rs *MemRangeSeries) Iterator() ProfileSeriesIterator {
timestampIterator.Seek(start)
durationsIterator.Seek(start)
periodsIterator.Seek(start)

if !rs.trees {
for _, sampleIterator := range sampleIterators {
sampleIterator.Seek(start)
}
}
}

if end-start < numSamples {
Expand Down Expand Up @@ -159,7 +165,7 @@ type MemRangeSeriesIterator struct {
durationsIterator MemSeriesValuesIterator
periodsIterator MemSeriesValuesIterator

sampleIterators map[string]MemSeriesValuesIterator
sampleIterators map[string]*MultiChunksIterator

numSamples uint64 // uint16 might not be enough for many chunks (~500+)
err error
Expand Down Expand Up @@ -251,6 +257,10 @@ func (it *MemRangeSeriesIterator) Next() bool {
it.err = errors.New("unexpected end of samples iterator")
return false
}
if sread := sit.Read(); sread != read {
it.err = fmt.Errorf("sample iterator in wrong iteration, expected %d got %d", read, sread)
return false
}
}
}

Expand Down
32 changes: 15 additions & 17 deletions pkg/storage/series_iterator_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/google/uuid"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
)
Expand All @@ -28,39 +29,36 @@ func TestMemRangeSeries_Iterator(t *testing.T) {
app, err := s.Appender()
require.NoError(t, err)

s1 := makeSample(2, []uuid.UUID{
uuid.MustParse("00000000-0000-0000-0000-000000000002"),
uuid.MustParse("00000000-0000-0000-0000-000000000001"),
})
k1 := makeStacktraceKey(s1)

for i := 1; i <= 500; i++ {
p := Profile{
s1.Value = int64(i)
p := &FlatProfile{
Meta: InstantProfileMeta{
Timestamp: int64(i),
Duration: time.Second.Nanoseconds(),
Period: time.Second.Nanoseconds(),
},
Tree: &ProfileTree{
Roots: &ProfileTreeRootNode{
ProfileTreeNode: &ProfileTreeNode{
flatValues: []*ProfileTreeValueNode{{Value: int64(i)}},
},
},
samples: map[string]*Sample{
string(k1): s1,
},
}
err = app.Append(ctx, &p)
err = app.AppendFlat(ctx, p)
require.NoError(t, err)
}

it := (&MemRangeSeries{s: s, mint: 74, maxt: 420, trees: true}).Iterator()
it := (&MemRangeSeries{s: s, mint: 74, maxt: 420}).Iterator()

seen := int64(75)
for it.Next() {
p := it.At()
require.Equal(t, seen, p.ProfileMeta().Timestamp)

itt := p.ProfileTree().Iterator()
for itt.HasMore() {
if itt.NextChild() {
require.Equal(t, seen, itt.At().FlatValues()[0].Value)
itt.StepInto()
}
itt.StepUp()
for _, sample := range p.Samples() {
require.Equal(t, seen, sample.Value)
}
seen++
}
Expand Down

0 comments on commit 26dc2f1

Please sign in to comment.