forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(dataobj): add logs tracking and encoding
This commit is the equivalent of grafana#15713 for the logs section: log records are accumulated into column builders and then flushed into a section at encoding time. Some minor API changes have been made to sections/streams to ensure consistency with sections/logs.
- Loading branch information
Showing
6 changed files
with
546 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
package logs | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/grafana/loki/pkg/push" | ||
|
||
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/result" | ||
) | ||
|
||
// Iter iterates over records in the provided decoder. All logs sections are | ||
// iterated over in order. | ||
func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] { | ||
return result.Iter(func(yield func(Record) bool) error { | ||
sections, err := dec.Sections(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
logsDec := dec.LogsDecoder() | ||
|
||
for _, section := range sections { | ||
if section.Type != filemd.SECTION_TYPE_LOGS { | ||
continue | ||
} | ||
|
||
for result := range iterSection(ctx, logsDec, section) { | ||
if result.Err() != nil || !yield(result.MustValue()) { | ||
return result.Err() | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
func iterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.SectionInfo) result.Seq[Record] { | ||
return result.Iter(func(yield func(Record) bool) error { | ||
// We need to pull the columns twice: once from the dataset implementation | ||
// and once for the metadata to retrieve column type. | ||
// | ||
// TODO(rfratto): find a way to expose this information from | ||
// encoding.StreamsDataset to avoid the double call. | ||
streamsColumns, err := dec.Columns(ctx, section) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
dset := encoding.LogsDataset(dec, section) | ||
|
||
columns, err := result.Collect(dset.ListColumns(ctx)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for result := range dataset.Iter(ctx, columns) { | ||
row, err := result.Value() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
record, err := decodeRecord(streamsColumns, row) | ||
if err != nil { | ||
return err | ||
} else if !yield(record) { | ||
return nil | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
func decodeRecord(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) { | ||
record := Record{ | ||
// Preallocate metadata to exact number of metadata columns to avoid | ||
// oversizing. | ||
Metadata: make(push.LabelsAdapter, 0, metadataColumns(columns)), | ||
} | ||
|
||
for columnIndex, columnValue := range row.Values { | ||
if columnValue.IsNil() || columnValue.IsZero() { | ||
continue | ||
} | ||
|
||
column := columns[columnIndex] | ||
switch column.Type { | ||
case logsmd.COLUMN_TYPE_STREAM_ID: | ||
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 { | ||
return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type) | ||
} | ||
record.StreamID = columnValue.Int64() | ||
|
||
case logsmd.COLUMN_TYPE_TIMESTAMP: | ||
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_INT64 { | ||
return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type) | ||
} | ||
record.Timestamp = time.Unix(0, columnValue.Int64()).UTC() | ||
|
||
case logsmd.COLUMN_TYPE_METADATA: | ||
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_STRING { | ||
return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type) | ||
} | ||
record.Metadata = append(record.Metadata, push.LabelAdapter{ | ||
Name: column.Info.Name, | ||
Value: columnValue.String(), | ||
}) | ||
|
||
case logsmd.COLUMN_TYPE_MESSAGE: | ||
if ty := columnValue.Type(); ty != datasetmd.VALUE_TYPE_STRING { | ||
return Record{}, fmt.Errorf("invalid type %s for %s", ty, column.Type) | ||
} | ||
record.Line = columnValue.String() | ||
} | ||
} | ||
|
||
return record, nil | ||
} | ||
|
||
func metadataColumns(columns []*logsmd.ColumnDesc) int { | ||
var count int | ||
for _, column := range columns { | ||
if column.Type == logsmd.COLUMN_TYPE_METADATA { | ||
count++ | ||
} | ||
} | ||
return count | ||
} |
Oops, something went wrong.