Skip to content

Commit

Permalink
Merge pull request #370 from parca-dev/storage-iterator-ranges
Browse files Browse the repository at this point in the history
pkg/storage: Fix iterators to only get index range within chunks
  • Loading branch information
brancz authored Oct 25, 2021
2 parents 3ab314d + 3735c66 commit a6867a7
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 7 deletions.
3 changes: 3 additions & 0 deletions pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func (q *Query) QueryRange(ctx context.Context, req *pb.QueryRangeRequest) (*pb.
it := series.Iterator()
for it.Next() {
p := it.At()
if p.ProfileMeta().Timestamp == 0 {
return nil, status.Error(codes.Internal, "profile's timestamp is 0")
}
metricsSeries.Samples = append(metricsSeries.Samples, &pb.MetricsSample{
Timestamp: timestamppb.New(timestamp.Time(p.ProfileMeta().Timestamp)),
Value: p.ProfileTree().RootCumulativeValue(),
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/series_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,21 @@ func (n *MemSeriesIteratorTreeNode) FlatValues() []*ProfileTreeValueNode {
return res
}

func getIndexRange(it MemSeriesValuesIterator, numSamples uint16, mint, maxt int64) (uint64, uint64, error) {
func getIndexRange(it MemSeriesValuesIterator, numSamples uint64, mint, maxt int64) (uint64, uint64, error) {
// figure out the index of the first sample > mint and the last sample < maxt
start := uint64(0)
end := uint64(0)
i := uint16(0)
i := uint64(0)
for it.Next() {
if i == numSamples {
end++
break
}
t := it.At()
// MultiChunkIterator might return sparse values - shouldn't usually happen though.
if t == 0 {
break
}
if t < mint {
start++
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/series_iterator_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ func (ms *MemMergeSeries) Iterator() ProfileSeriesIterator {
maxt = ms.s.maxTime
}

var numSamples uint64

chunkStart, chunkEnd := ms.s.timestamps.indexRange(mint, maxt)
timestamps := make([]chunkenc.Chunk, 0, chunkEnd-chunkStart)
for _, t := range ms.s.timestamps[chunkStart:chunkEnd] {
numSamples += uint64(t.chunk.NumSamples())
timestamps = append(timestamps, t.chunk)
}

sl := &SliceProfileSeriesIterator{i: -1}

start, end, err := getIndexRange(NewMultiChunkIterator(timestamps), ms.s.numSamples, mint, maxt)
start, end, err := getIndexRange(NewMultiChunkIterator(timestamps), numSamples, mint, maxt)
if err != nil {
sl.err = err
return sl
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/series_iterator_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,17 @@ func (rs *MemRangeSeries) Iterator() ProfileSeriesIterator {
rs.s.mu.RLock()
defer rs.s.mu.RUnlock()

var numSamples uint64

chunkStart, chunkEnd := rs.s.timestamps.indexRange(rs.mint, rs.maxt)
timestamps := make([]chunkenc.Chunk, 0, chunkEnd-chunkStart)
for _, t := range rs.s.timestamps[chunkStart:chunkEnd] {
numSamples += uint64(t.chunk.NumSamples())
timestamps = append(timestamps, t.chunk)
}

it := NewMultiChunkIterator(timestamps)
start, end, err := getIndexRange(it, rs.s.numSamples, rs.mint, rs.maxt)
start, end, err := getIndexRange(it, numSamples, rs.mint, rs.maxt)
if err != nil {
return &MemRangeSeriesIterator{err: err}
}
Expand Down Expand Up @@ -111,7 +114,6 @@ func (rs *MemRangeSeries) Iterator() ProfileSeriesIterator {
periodsIterator.Seek(start)
}

numSamples := uint64(rs.s.numSamples)
if end-start < numSamples {
numSamples = end - start - 1
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/series_iterator_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ func (rs *MemRootSeries) Iterator() ProfileSeriesIterator {
rs.s.mu.RLock()
defer rs.s.mu.RUnlock()

var numSamples uint64

chunkStart, chunkEnd := rs.s.timestamps.indexRange(rs.mint, rs.maxt)
timestamps := make([]chunkenc.Chunk, 0, chunkEnd-chunkStart)
for _, t := range rs.s.timestamps[chunkStart:chunkEnd] {
numSamples += uint64(t.chunk.NumSamples())
timestamps = append(timestamps, t.chunk)
}

it := NewMultiChunkIterator(timestamps)
start, end, err := getIndexRange(it, rs.s.numSamples, rs.mint, rs.maxt)
start, end, err := getIndexRange(it, numSamples, rs.mint, rs.maxt)
if start == end {
return &MemRootSeriesIterator{err: fmt.Errorf("no samples within the time range")}
}
Expand All @@ -58,7 +61,7 @@ func (rs *MemRootSeries) Iterator() ProfileSeriesIterator {
rootIterator.Seek(start)
}

numSamples := uint64(rs.s.numSamples)
// Set numSamples correctly if only subset selected.
if end-start < numSamples {
numSamples = end - start - 1
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/series_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ func TestGetIndexRange(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(2), start)
require.Equal(t, uint64(4), end)

start, end, err = getIndexRange(NewMultiChunkIterator([]chunkenc.Chunk{c}), 123, 1, 12)
require.NoError(t, err)
require.Equal(t, uint64(0), start)
require.Equal(t, uint64(5), end)
}

func TestIteratorRangeSum(t *testing.T) {
Expand Down

0 comments on commit a6867a7

Please sign in to comment.