From 1e3126c58e87325c125979518e7b69325db4d342 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 14 Jan 2025 09:58:46 -0500 Subject: [PATCH] chore(dataobj): add logs tracking and encoding This commit is the equivalent of #15713 for the logs section: log records are accumulated into column builders and then flushed into a section at encoding time. Some minor API changes have been made to sections/streams to ensure consistency with sections/logs. --- pkg/dataobj/internal/sections/logs/iter.go | 136 +++++++++ pkg/dataobj/internal/sections/logs/logs.go | 282 ++++++++++++++++++ .../internal/sections/logs/logs_test.go | 102 +++++++ pkg/dataobj/internal/sections/streams/iter.go | 4 +- .../internal/sections/streams/streams.go | 36 ++- .../internal/sections/streams/streams_test.go | 4 +- 6 files changed, 546 insertions(+), 18 deletions(-) create mode 100644 pkg/dataobj/internal/sections/logs/iter.go create mode 100644 pkg/dataobj/internal/sections/logs/logs.go create mode 100644 pkg/dataobj/internal/sections/logs/logs_test.go diff --git a/pkg/dataobj/internal/sections/logs/iter.go b/pkg/dataobj/internal/sections/logs/iter.go new file mode 100644 index 0000000000000..4aa77168dfff6 --- /dev/null +++ b/pkg/dataobj/internal/sections/logs/iter.go @@ -0,0 +1,136 @@ +package logs + +import ( + "context" + "fmt" + "time" + + "github.com/grafana/loki/pkg/push" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +// Iter iterates over records in the provided decoder. All logs sections are +// iterated over in order. +func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] { + return result.Iter(func(yield func(Record) bool) error { + sections, err := dec.Sections(ctx) + if err != nil { + return err + } + + logsDec := dec.LogsDecoder() + + for _, section := range sections { + if section.Type != filemd.SECTION_TYPE_LOGS { + continue + } + + for result := range iterSection(ctx, logsDec, section) { + if result.Err() != nil || !yield(result.MustValue()) { + return result.Err() + } + } + } + + return nil + }) +} + +func iterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.SectionInfo) result.Seq[Record] { + return result.Iter(func(yield func(Record) bool) error { + // We need to pull the columns twice: once from the dataset implementation + // and once for the metadata to retrieve column type. + // + // TODO(rfratto): find a way to expose this information from + // encoding.StreamsDataset to avoid the double call. + streamsColumns, err := dec.Columns(ctx, section) + if err != nil { + return err + } + + dset := encoding.LogsDataset(dec, section) + + columns, err := result.Collect(dset.ListColumns(ctx)) + if err != nil { + return err + } + + for result := range dataset.Iter(ctx, columns) { + row, err := result.Value() + if err != nil { + return err + } + + record, err := decodeRecord(streamsColumns, row) + if err != nil { + return err + } else if !yield(record) { + return nil + } + } + + return nil + }) +} + +func decodeRecord(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) { + record := Record{ + // Preallocate metadata to exact number of metadata columns to avoid + // oversizing. + Metadata: make(push.LabelsAdapter, 0, metadataColumns(columns)), + } + + for columnIndex, columnValue := range row.Values { + if columnValue.IsNil() || columnValue.IsZero() { + continue + } + + column := columns[columnIndex] + switch column.Type { + case logsmd.COLUMN_TYPE_STREAM_ID: + if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 { + return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type) + } + record.StreamID = columnValue.Int64() + + case logsmd.COLUMN_TYPE_TIMESTAMP: + if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 { + return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type) + } + record.Timestamp = time.Unix(0, columnValue.Int64()).UTC() + + case logsmd.COLUMN_TYPE_METADATA: + if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_STRING { + return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type) + } + record.Metadata = append(record.Metadata, push.LabelAdapter{ + Name: column.Info.Name, + Value: columnValue.String(), + }) + + case logsmd.COLUMN_TYPE_MESSAGE: + if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_STRING { + return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type) + } + record.Line = columnValue.String() + } + } + + return record, nil +} + +func metadataColumns(columns []*logsmd.ColumnDesc) int { + var count int + for _, column := range columns { + if column.Type == logsmd.COLUMN_TYPE_METADATA { + count++ + } + } + return count +} diff --git a/pkg/dataobj/internal/sections/logs/logs.go b/pkg/dataobj/internal/sections/logs/logs.go new file mode 100644 index 0000000000000..f70e2bcc209c1 --- /dev/null +++ b/pkg/dataobj/internal/sections/logs/logs.go @@ -0,0 +1,282 @@ +// Package logs defines types used for the data object logs section. The logs +// section holds a list of log records across multiple streams. +package logs + +import ( + "cmp" + "context" + "errors" + "fmt" + "slices" + "time" + + "github.com/grafana/loki/pkg/push" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +// A Record is an individual log record within the logs section. +type Record struct { + StreamID int64 + Timestamp time.Time + Metadata push.LabelsAdapter + Line string +} + +// Logs accumulate a set of [Record]s within a data object. +type Logs struct { + rows int + pageSize int + + streamIDs *dataset.ColumnBuilder + timestamps *dataset.ColumnBuilder + + metadatas []*dataset.ColumnBuilder + metadataLookup map[string]int // map of metadata key to index in metadatas + + messages *dataset.ColumnBuilder +} + +// Nwe creates a new Logs section. The pageSize argument specifies how large +// pages should be. +func New(pageSize int) *Logs { + // We control the Value/Encoding tuple so creating column builders can't + // fail; if it does, we're left in an unrecoverable state where nothing can + // be encoded properly so we panic. + streamIDs, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + }) + if err != nil { + panic(fmt.Sprintf("creating stream ID column: %v", err)) + } + + timestamps, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + }) + if err != nil { + panic(fmt.Sprintf("creating timestamp column: %v", err)) + } + + messages, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ + PageSizeHint: pageSize, + Value: datasetmd.VALUE_TYPE_STRING, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + }) + if err != nil { + panic(fmt.Sprintf("creating message column: %v", err)) + } + + return &Logs{ + pageSize: pageSize, + + streamIDs: streamIDs, + timestamps: timestamps, + + metadataLookup: make(map[string]int), + + messages: messages, + } +} + +// Append adds a new entry to the set of Logs. +func (l *Logs) Append(entry Record) { + // Sort metadata to ensure consistent encoding. Metadata is sorted by key. + // While keys must be unique, we sort by value if two keys match; this + // ensures that the same value always gets encoded for duplicate keys. + slices.SortFunc(entry.Metadata, func(a, b push.LabelAdapter) int { + if res := cmp.Compare(a.Name, b.Name); res != 0 { + return res + } + return cmp.Compare(a.Value, b.Value) + }) + + // We ignore the errors below; they only fail if given out-of-order data + // (where the row number is less than the previous row number), which can't + // ever happen here. + + _ = l.streamIDs.Append(l.rows, dataset.Int64Value(entry.StreamID)) + _ = l.timestamps.Append(l.rows, dataset.Int64Value(entry.Timestamp.UnixNano())) + _ = l.messages.Append(l.rows, dataset.StringValue(entry.Line)) + + for _, m := range entry.Metadata { + col := l.getMetadataColumn(m.Name) + _ = col.Append(l.rows, dataset.StringValue(m.Value)) + } + + l.rows++ +} + +func (l *Logs) getMetadataColumn(key string) *dataset.ColumnBuilder { + idx, ok := l.metadataLookup[key] + if !ok { + col, err := dataset.NewColumnBuilder(key, dataset.BuilderOptions{ + PageSizeHint: l.pageSize, + Value: datasetmd.VALUE_TYPE_STRING, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_ZSTD, + }) + if err != nil { + // We control the Value/Encoding tuple so this can't fail; if it does, + // we're left in an unrecoverable state where nothing can be encoded + // properly so we panic. + panic(fmt.Sprintf("creating metadata column: %v", err)) + } + + l.metadatas = append(l.metadatas, col) + l.metadataLookup[key] = len(l.metadatas) - 1 + return col + } + return l.metadatas[idx] +} + +// EncodeTo encodes the set of logs to the provided encoder. Before encoding, +// log records are sorted by StreamID and Timestamp. +// +// EncodeTo may generate multiple sections if the list of log records is too +// big to fit into a single section. +// +// After encoding successfully, [Logs.Reset] is called. +func (l *Logs) EncodeTo(enc *encoding.Encoder) error { + // TODO(rfratto): handle one section becoming too large. This can happen when + // the number of columns is very wide, due to a lot of metadata columns. + // There are two approaches to handle this: + // + // 1. Split streams into multiple sections. + // 2. Move some columns into an aggregated column which holds multiple label + // keys and values. + + // Create a sorted dataset for us to encode. + dset, err := l.sort() + if err != nil { + return fmt.Errorf("sorting logs: %w", err) + } + cols, err := result.Collect(dset.ListColumns(context.Background())) // dset is in memory; "real" context not needed. + if err != nil { + return fmt.Errorf("listing columns: %w", err) + } + + logsEnc, err := enc.OpenLogs() + if err != nil { + return fmt.Errorf("opening logs section: %w", err) + } + defer func() { + // Discard on defer for safety. This will return an error if we + // successfully committed. + _ = logsEnc.Discard() + }() + + // Encode our columns. The slice order here *must* match the order in + // [Logs.sort]! + { + errs := make([]error, 0, len(cols)) + errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_STREAM_ID, cols[0])) + errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_TIMESTAMP, cols[1])) + for _, mdCol := range cols[2 : len(cols)-1] { + errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_METADATA, mdCol)) + } + errs = append(errs, encodeColumn(logsEnc, logsmd.COLUMN_TYPE_MESSAGE, cols[len(cols)-1])) + if err := errors.Join(errs...); err != nil { + return fmt.Errorf("encoding columns: %w", err) + } + } + + if err := logsEnc.Commit(); err != nil { + return err + } + l.Reset() + return nil +} + +func (l *Logs) sort() (dataset.Dataset, error) { + // Our columns are ordered as follows: + // + // 1. StreamID + // 2. Timestamp + // 3. Metadata columns + // 4. Message + // + // Do *not* change this order without updating [Logs.EncodeTo]! + // + // TODO(rfratto): find a clean way to decorate columns with additional + // metadata so we don't have to rely on order. + columns := make([]*dataset.MemColumn, 0, 3+len(l.metadatas)) + + // Flush never returns an error so we ignore it here to keep the code simple. + // + // TODO(rfratto): remove error return from Flush to clean up code. + streamID, _ := l.streamIDs.Flush() + timestamp, _ := l.timestamps.Flush() + columns = append(columns, streamID, timestamp) + + for _, mdBuilder := range l.metadatas { + mdBuilder.Backfill(l.rows) + + mdColumn, _ := mdBuilder.Flush() + columns = append(columns, mdColumn) + } + + messages, _ := l.messages.Flush() + columns = append(columns, messages) + + // dset is in memory, so we don't need a "real" context in dataset.Sort. + dset := dataset.FromMemory(columns) + return dataset.Sort(context.Background(), dset, []dataset.Column{streamID, timestamp}, l.pageSize) +} + +func encodeColumn(enc *encoding.LogsEncoder, columnType logsmd.ColumnType, column dataset.Column) error { + columnEnc, err := enc.OpenColumn(columnType, column.ColumnInfo()) + if err != nil { + return fmt.Errorf("opening %s column encoder: %w", columnType, err) + } + defer func() { + // Discard on defer for safety. This will return an error if we + // successfully committed. + _ = columnEnc.Discard() + }() + + // Our column is in memory, so we don't need a "real" context in the calls + // below. + for result := range column.ListPages(context.Background()) { + page, err := result.Value() + if err != nil { + return fmt.Errorf("getting %s page: %w", columnType, err) + } + + data, err := page.ReadPage(context.Background()) + if err != nil { + return fmt.Errorf("reading %s page: %w", columnType, err) + } + + memPage := &dataset.MemPage{ + Info: *page.PageInfo(), + Data: data, + } + if err := columnEnc.AppendPage(memPage); err != nil { + return fmt.Errorf("appending %s page: %w", columnType, err) + } + } + + return columnEnc.Commit() +} + +// Reset resets all state, allowing Logs to be reused. +func (l *Logs) Reset() { + l.rows = 0 + + l.streamIDs.Reset() + l.timestamps.Reset() + l.metadatas = l.metadatas[:0] + clear(l.metadataLookup) + l.messages.Reset() +} diff --git a/pkg/dataobj/internal/sections/logs/logs_test.go b/pkg/dataobj/internal/sections/logs/logs_test.go new file mode 100644 index 0000000000000..0f4ab08c3f904 --- /dev/null +++ b/pkg/dataobj/internal/sections/logs/logs_test.go @@ -0,0 +1,102 @@ +package logs_test + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/grafana/loki/pkg/push" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" + "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs" +) + +func Test(t *testing.T) { + records := []logs.Record{ + { + StreamID: 1, + Timestamp: time.Unix(10, 0).UTC(), + Metadata: nil, + Line: "hello world", + }, + { + StreamID: 2, + Timestamp: time.Unix(100, 0).UTC(), + Metadata: push.LabelsAdapter{ + {Name: "cluster", Value: "test"}, + {Name: "app", Value: "bar"}, + }, + Line: "goodbye world", + }, + { + StreamID: 1, + Timestamp: time.Unix(5, 0).UTC(), + Metadata: push.LabelsAdapter{ + {Name: "cluster", Value: "test"}, + {Name: "app", Value: "foo"}, + }, + Line: "foo bar", + }, + } + + tracker := logs.New(1024) + for _, record := range records { + tracker.Append(record) + } + + buf, err := buildObject(tracker) + require.NoError(t, err) + + // The order of records should be sorted by stream ID then timestamp, and all + // metadata should be sorted by key then value. + expect := []logs.Record{ + { + StreamID: 1, + Timestamp: time.Unix(5, 0).UTC(), + Metadata: push.LabelsAdapter{ + {Name: "app", Value: "foo"}, + {Name: "cluster", Value: "test"}, + }, + Line: "foo bar", + }, + { + StreamID: 1, + Timestamp: time.Unix(10, 0).UTC(), + Metadata: push.LabelsAdapter{}, + Line: "hello world", + }, + { + StreamID: 2, + Timestamp: time.Unix(100, 0).UTC(), + Metadata: push.LabelsAdapter{ + {Name: "app", Value: "bar"}, + {Name: "cluster", Value: "test"}, + }, + Line: "goodbye world", + }, + } + + dec := encoding.ReadSeekerDecoder(bytes.NewReader(buf)) + + var actual []logs.Record + for result := range logs.Iter(context.Background(), dec) { + record, err := result.Value() + require.NoError(t, err) + actual = append(actual, record) + } + + require.Equal(t, expect, actual) +} + +func buildObject(lt *logs.Logs) ([]byte, error) { + var buf bytes.Buffer + enc := encoding.NewEncoder(&buf) + if err := lt.EncodeTo(enc); err != nil { + return nil, err + } else if err := enc.Flush(); err != nil { + return nil, err + } + return buf.Bytes(), nil +} diff --git a/pkg/dataobj/internal/sections/streams/iter.go b/pkg/dataobj/internal/sections/streams/iter.go index 50d3d2f27403a..4443aa10eb646 100644 --- a/pkg/dataobj/internal/sections/streams/iter.go +++ b/pkg/dataobj/internal/sections/streams/iter.go @@ -15,8 +15,8 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/result" ) -// Iter iterates over streams in the provided decoder. All sections are -// iterated over. +// Iter iterates over streams in the provided decoder. All streams sections are +// iterated over in order. func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Stream] { return result.Iter(func(yield func(Stream) bool) error { sections, err := dec.Sections(ctx) diff --git a/pkg/dataobj/internal/sections/streams/streams.go b/pkg/dataobj/internal/sections/streams/streams.go index 8e4ec6eed6a02..2a3f8f7323833 100644 --- a/pkg/dataobj/internal/sections/streams/streams.go +++ b/pkg/dataobj/internal/sections/streams/streams.go @@ -32,19 +32,21 @@ type Stream struct { // Streams tracks information about streams in a data object. type Streams struct { - lastID atomic.Int64 - - lookup map[uint64][]*Stream + pageSize int + lastID atomic.Int64 + lookup map[uint64][]*Stream // orderedStreams is used for consistently iterating over the list of // streams. It contains streamed added in append order. ordered []*Stream } -// New creates a new Streams section. -func New() *Streams { +// New creates a new Streams section. The pageSize argument specifies how large +// pages should be. +func New(pageSize int) *Streams { return &Streams{ - lookup: make(map[uint64][]*Stream), + pageSize: pageSize, + lookup: make(map[uint64][]*Stream), } } @@ -113,10 +115,12 @@ func (s *Streams) StreamID(streamLabels labels.Labels) int64 { } // EncodeTo encodes the list of recorded streams to the provided encoder. -// pageSize controls the target sizes for pages and metadata respectively. +// // EncodeTo may generate multiple sections if the list of streams is too big to // fit into a single section. -func (s *Streams) EncodeTo(enc *encoding.Encoder, pageSize int) error { +// +// After encoding successfully, [Streams.Reset] is called. +func (s *Streams) EncodeTo(enc *encoding.Encoder) error { // TODO(rfratto): handle one section becoming too large. This can happen when // the number of columns is very wide. There are two approaches to handle // this: @@ -125,19 +129,19 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder, pageSize int) error { // 2. Move some columns into an aggregated column which holds multiple label // keys and values. - idBuilder, err := numberColumnBuilder(pageSize) + idBuilder, err := numberColumnBuilder(s.pageSize) if err != nil { return fmt.Errorf("creating ID column: %w", err) } - minTimestampBuilder, err := numberColumnBuilder(pageSize) + minTimestampBuilder, err := numberColumnBuilder(s.pageSize) if err != nil { return fmt.Errorf("creating minimum timestamp column: %w", err) } - maxTimestampBuilder, err := numberColumnBuilder(pageSize) + maxTimestampBuilder, err := numberColumnBuilder(s.pageSize) if err != nil { return fmt.Errorf("creating maximum timestamp column: %w", err) } - rowsCountBuilder, err := numberColumnBuilder(pageSize) + rowsCountBuilder, err := numberColumnBuilder(s.pageSize) if err != nil { return fmt.Errorf("creating rows column: %w", err) } @@ -154,7 +158,7 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder, pageSize int) error { } builder, err := dataset.NewColumnBuilder(name, dataset.BuilderOptions{ - PageSizeHint: pageSize, + PageSizeHint: s.pageSize, Value: datasetmd.VALUE_TYPE_STRING, Encoding: datasetmd.ENCODING_TYPE_PLAIN, Compression: datasetmd.COMPRESSION_TYPE_ZSTD, @@ -220,7 +224,11 @@ func (s *Streams) EncodeTo(enc *encoding.Encoder, pageSize int) error { } } - return streamsEnc.Commit() + if err := streamsEnc.Commit(); err != nil { + return err + } + s.Reset() + return nil } func numberColumnBuilder(pageSize int) (*dataset.ColumnBuilder, error) { diff --git a/pkg/dataobj/internal/sections/streams/streams_test.go b/pkg/dataobj/internal/sections/streams/streams_test.go index e998afe0de22f..2d6329615bf0a 100644 --- a/pkg/dataobj/internal/sections/streams/streams_test.go +++ b/pkg/dataobj/internal/sections/streams/streams_test.go @@ -26,7 +26,7 @@ func Test(t *testing.T) { {labels.FromStrings("cluster", "test", "app", "foo"), time.Unix(9, 0).UTC()}, } - tracker := streams.New() + tracker := streams.New(1024) for _, tc := range tt { tracker.Record(tc.Labels, tc.Time) } @@ -66,7 +66,7 @@ func Test(t *testing.T) { func buildObject(st *streams.Streams) ([]byte, error) { var buf bytes.Buffer enc := encoding.NewEncoder(&buf) - if err := st.EncodeTo(enc, 1024); err != nil { + if err := st.EncodeTo(enc); err != nil { return nil, err } else if err := enc.Flush(); err != nil { return nil, err