Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dataobj): add streams tracking and encoding #15713

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions pkg/dataobj/internal/sections/streams/iter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package streams

import (
"context"
"fmt"
"time"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// Iter iterates over streams in the provided decoder. All sections are
// iterated over.
func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Stream] {
return result.Iter(func(yield func(Stream) bool) error {
sections, err := dec.Sections(ctx)
if err != nil {
return err
}

streamsDec := dec.StreamsDecoder()

for _, section := range sections {
if section.Type != filemd.SECTION_TYPE_STREAMS {
continue
}

for result := range iterSection(ctx, streamsDec, section) {
if result.Err() != nil || !yield(result.MustValue()) {
return result.Err()
}
}
}

return nil
})
}

func iterSection(ctx context.Context, dec encoding.StreamsDecoder, section *filemd.SectionInfo) result.Seq[Stream] {
return result.Iter(func(yield func(Stream) bool) error {
// We need to pull the columns twice: once from the dataset implementation
// and once for the metadata to retrieve column type.
//
// TODO(rfratto): find a way to expose this information from
// encoding.StreamsDataset to avoid the double call.
streamsColumns, err := dec.Columns(ctx, section)
if err != nil {
return err
}

dset := encoding.StreamsDataset(dec, section)

columns, err := result.Collect(dset.ListColumns(ctx))
if err != nil {
return err
}

for result := range dataset.Iter(ctx, columns) {
row, err := result.Value()
if err != nil {
return err
}

stream, err := decodeRow(streamsColumns, row)
if err != nil {
return err
} else if !yield(stream) {
return nil
}
}

return nil
})
}

func decodeRow(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error) {
var stream Stream

for columnIndex, columnValue := range row.Values {
if columnValue.IsNil() || columnValue.IsZero() {
continue
}

column := columns[columnIndex]
switch column.Type {
case streamsmd.COLUMN_TYPE_STREAM_ID:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.ID = columnValue.Int64()

case streamsmd.COLUMN_TYPE_MIN_TIMESTAMP:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.MinTimestamp = time.Unix(0, columnValue.Int64()).UTC()

case streamsmd.COLUMN_TYPE_MAX_TIMESTAMP:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.MaxTimestamp = time.Unix(0, columnValue.Int64()).UTC()

case streamsmd.COLUMN_TYPE_ROWS:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.Rows = int(columnValue.Int64())

case streamsmd.COLUMN_TYPE_LABEL:
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_STRING {
return stream, fmt.Errorf("invalid type %s for %s", ty, column.Type)
}
stream.Labels = append(stream.Labels, labels.Label{
Name: column.Info.Name,
Value: columnValue.String(),
})

default:
// TODO(rfratto): We probably don't want to return an error on unexpected
// columns because it breaks forward compatibility. Should we log
// something here?
}
}

return stream, nil
}
263 changes: 263 additions & 0 deletions pkg/dataobj/internal/sections/streams/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
// Package streams defines types used for the data object streams section. The
// streams section holds a list of streams present in the data object.
package streams

import (
"errors"
"fmt"
"sort"
"time"

"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
)

// A Stream is an individual stream within a data object.
type Stream struct {
// ID to uniquely represent a stream in a data object. Valid IDs start at 1.
// IDs are used to track streams across multiple sections in the same data
// object.
ID int64

Labels labels.Labels // Stream labels.
MinTimestamp time.Time // Minimum timestamp in the stream.
MaxTimestamp time.Time // Maximum timestamp in the stream.
Rows int // Number of rows in the stream.
}

// Streams tracks information about streams in a data object.
type Streams struct {
lastID atomic.Int64

lookup map[uint64][]*Stream

// orderedStreams is used for consistently iterating over the list of
// streams. It contains streamed added in append order.
ordered []*Stream
}

// New creates a new Streams section.
func New() *Streams {
return &Streams{
lookup: make(map[uint64][]*Stream),
}
}

// Record a stream record within the Streams section. The provided timestamp is
// used to track the minimum and maximum timestamp of a stream. The number of
// calls to Record is used to track the number of rows for a stream.
func (s *Streams) Record(streamLabels labels.Labels, ts time.Time) {
ts = ts.UTC()

stream := s.getOrAddStream(streamLabels)
if stream.MinTimestamp.IsZero() || ts.Before(stream.MinTimestamp) {
stream.MinTimestamp = ts
}
if stream.MaxTimestamp.IsZero() || ts.After(stream.MaxTimestamp) {
stream.MaxTimestamp = ts
}
stream.Rows++
}

func (s *Streams) getOrAddStream(streamLabels labels.Labels) *Stream {
hash := streamLabels.Hash()
matches, ok := s.lookup[hash]
if !ok {
return s.addStream(hash, streamLabels)
}

for _, stream := range matches {
if labels.Equal(stream.Labels, streamLabels) {
return stream
}
}

return s.addStream(hash, streamLabels)
}

func (s *Streams) addStream(hash uint64, streamLabels labels.Labels) *Stream {
// Ensure streamLabels are sorted prior to adding to ensure consistent column
// ordering.
sort.Sort(streamLabels)

newStream := &Stream{ID: s.lastID.Add(1), Labels: streamLabels}
s.lookup[hash] = append(s.lookup[hash], newStream)
s.ordered = append(s.ordered, newStream)
return newStream
}

// StreamID returns the stream ID for the provided streamLabels. If the stream
// has not been recorded, StreamID returns 0.
func (s *Streams) StreamID(streamLabels labels.Labels) int64 {
hash := streamLabels.Hash()
matches, ok := s.lookup[hash]
if !ok {
return 0
}

for _, stream := range matches {
if labels.Equal(stream.Labels, streamLabels) {
return stream.ID
}
}

return 0
}

// EncodeTo encodes the list of recorded streams to the provided encoder.
// pageSize controls the target sizes for pages and metadata respectively.
// EncodeTo may generate multiple sections if the list of streams is too big to
// fit into a single section.
func (s *Streams) EncodeTo(enc *encoding.Encoder, pageSize int) error {
// TODO(rfratto): handle one section becoming too large. This can happen when
// the number of columns is very wide. There are two approaches to handle
// this:
//
// 1. Split streams into multiple sections.
// 2. Move some columns into an aggregated column which holds multiple label
// keys and values.

idBuilder, err := numberColumnBuilder(pageSize)
if err != nil {
return fmt.Errorf("creating id column: %w", err)
}
minTimestampBuilder, err := numberColumnBuilder(pageSize)
if err != nil {
return fmt.Errorf("creating minimum ts column: %w", err)
}
maxTimestampBuilder, err := numberColumnBuilder(pageSize)
if err != nil {
return fmt.Errorf("creating maximum ts column: %w", err)
}
rowsCountBuilder, err := numberColumnBuilder(pageSize)
if err != nil {
return fmt.Errorf("creating maximum ts column: %w", err)
rfratto marked this conversation as resolved.
Show resolved Hide resolved
}

var (
labelBuilders []*dataset.ColumnBuilder
labelBuilderlookup = map[string]int{} // Name to index
)

getLabelColumn := func(name string) (*dataset.ColumnBuilder, error) {
idx, ok := labelBuilderlookup[name]
if ok {
return labelBuilders[idx], nil
}

builder, err := dataset.NewColumnBuilder(name, dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_STRING,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
})
if err != nil {
return nil, fmt.Errorf("creating label column: %w", err)
}

labelBuilders = append(labelBuilders, builder)
labelBuilderlookup[name] = len(labelBuilders) - 1
return builder, nil
}

// Populate our column builders.
for i, stream := range s.ordered {
// Append only fails if the rows are out-of-order, which can't happen here.
_ = idBuilder.Append(i, dataset.Int64Value(stream.ID))
_ = minTimestampBuilder.Append(i, dataset.Int64Value(stream.MinTimestamp.UnixNano()))
_ = maxTimestampBuilder.Append(i, dataset.Int64Value(stream.MaxTimestamp.UnixNano()))
_ = rowsCountBuilder.Append(i, dataset.Int64Value(int64(stream.Rows)))

for _, label := range stream.Labels {
builder, err := getLabelColumn(label.Name)
if err != nil {
return fmt.Errorf("getting label column: %w", err)
}
_ = builder.Append(i, dataset.StringValue(label.Value))
}
}

// Encode our builders to sections. We ignore errors after enc.OpenStreams
// (which may fail due to a caller) since we guarantee correct usage of the
// encoding API.
streamsEnc, err := enc.OpenStreams()
if err != nil {
return fmt.Errorf("opening streams section: %w", err)
}
defer func() {
// Discard on defer for safety. This will return an error if we
// successfully committed.
_ = streamsEnc.Discard()
}()

{
var errs []error
errs = append(errs, encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_STREAM_ID, idBuilder))
errs = append(errs, encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_MIN_TIMESTAMP, minTimestampBuilder))
errs = append(errs, encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_MAX_TIMESTAMP, maxTimestampBuilder))
errs = append(errs, encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_ROWS, rowsCountBuilder))
if err := errors.Join(errs...); err != nil {
return fmt.Errorf("encoding columns: %w", err)
}
}

for _, labelBuilder := range labelBuilders {
// For consistency we'll make sure each label builder has the same number
// of rows as the other columns (which is the number of streams).
labelBuilder.Backfill(len(s.ordered))

err := encodeColumn(streamsEnc, streamsmd.COLUMN_TYPE_LABEL, labelBuilder)
if err != nil {
return fmt.Errorf("encoding label column: %w", err)
}
}

return streamsEnc.Commit()
}

func numberColumnBuilder(pageSize int) (*dataset.ColumnBuilder, error) {
return dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
})
}

func encodeColumn(enc *encoding.StreamsEncoder, columnType streamsmd.ColumnType, builder *dataset.ColumnBuilder) error {
column, err := builder.Flush()
if err != nil {
return fmt.Errorf("flushing column: %w", err)
}

columnEnc, err := enc.OpenColumn(columnType, &column.Info)
if err != nil {
return fmt.Errorf("opening column encoder: %w", err)
}
defer func() {
// Discard on defer for safety. This will return an error if we
// successfully committed.
_ = columnEnc.Discard()
}()

for _, page := range column.Pages {
err := columnEnc.AppendPage(page)
if err != nil {
return fmt.Errorf("appending page: %w", err)
}
}

return columnEnc.Commit()
}

// Reset resets all state, allowing Streams to be reused.
func (s *Streams) Reset() {
s.lastID.Store(0)
clear(s.lookup)
s.ordered = s.ordered[:0]
}
Loading
Loading