Skip to content

Commit

Permalink
Merge branch 'main' into priyaggopa-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored Jan 14, 2025
2 parents 66b8208 + 45b0752 commit 3f664d7
Show file tree
Hide file tree
Showing 22 changed files with 1,980 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy-pr-preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:

jobs:
deploy-pr-preview:
if: ! github.event.pull_request.head.repo.fork
if: ${{ ! github.event.pull_request.head.repo.fork }}
uses: grafana/writers-toolkit/.github/workflows/deploy-preview.yml@main
with:
sha: ${{ github.event.pull_request.head.sha }}
Expand Down
14 changes: 11 additions & 3 deletions docs/sources/configure/bp-configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ One issue many people have with Loki is their client receiving errors for out of

There are a few things to dissect from that statement. The first is this restriction is per stream. Let’s look at an example:

```
```bash
{job="syslog"} 00:00:00 i'm a syslog!
{job="syslog"} 00:00:01 i'm a syslog!
```

If Loki received these two lines which are for the same stream, everything would be fine. But what about this case:

```
```bash
{job="syslog"} 00:00:00 i'm a syslog!
{job="syslog"} 00:00:02 i'm a syslog!
{job="syslog"} 00:00:01 i'm a syslog! <- Rejected out of order!
```
What can you do about this? What if this was because the sources of these logs were different systems? You can solve this with an additional label which is unique per system:
```
```bash
{job="syslog", instance="host1"} 00:00:00 i'm a syslog!
{job="syslog", instance="host1"} 00:00:02 i'm a syslog!
{job="syslog", instance="host2"} 00:00:01 i'm a syslog! <- Accepted, this is a new stream!
Expand All @@ -50,6 +50,14 @@ But what if the application itself generated logs that were out of order? Well,

It's also worth noting that the batching nature of the Loki push API can lead to some instances of out of order errors being received which are really false positives. (Perhaps a batch partially succeeded and was present; or anything that previously succeeded would return an out of order entry; or anything new would be accepted.)

## Use `snappy` compression algorithm

`Snappy` is currently the Loki compression algorithm of choice. It performs much better than `gzip` for speed, but it is not as efficient in storage. This was an acceptable tradeoff for us.

Grafana Labs has found that `gzip` was very good for compression but was very slow, and this was causing slow query responses.

`LZ4` is a good compromise of speed and compression performance. While compression is slightly slower than `snappy`, the compression ratio is higher, resulting in smaller chunks in object storage.

## Use `chunk_target_size`

Using `chunk_target_size` instructs Loki to try to fill all chunks to a target _compressed_ size of 1.5MB. These larger chunks are more efficient for Loki to process.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/aws/aws-sdk-go v1.55.5
github.com/baidubce/bce-sdk-go v0.9.213
github.com/bmatcuk/doublestar/v4 v4.7.1
github.com/bmatcuk/doublestar/v4 v4.8.0
github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500
github.com/cespare/xxhash/v2 v2.3.0
github.com/containerd/fifo v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2io
github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw=
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
github.com/bmatcuk/doublestar/v4 v4.7.1 h1:fdDeAqgT47acgwd9bd9HxJRDmc9UAmPpc+2m0CXv75Q=
github.com/bmatcuk/doublestar/v4 v4.7.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/bmatcuk/doublestar/v4 v4.8.0 h1:DSXtrypQddoug1459viM9X9D3dp1Z7993fw36I2kNcQ=
github.com/bmatcuk/doublestar/v4 v4.8.0/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
Expand Down
152 changes: 152 additions & 0 deletions pkg/dataobj/internal/encoding/dataset_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package encoding

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// LogsDataset implements returns a [dataset.Dataset] from a [LogsDecoder] for
// the given section.
func LogsDataset(dec LogsDecoder, sec *filemd.SectionInfo) dataset.Dataset {
return &logsDataset{dec: dec, sec: sec}
}

type logsDataset struct {
dec LogsDecoder
sec *filemd.SectionInfo
}

func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] {
return result.Iter(func(yield func(dataset.Column) bool) error {
columns, err := ds.dec.Columns(ctx, ds.sec)
if err != nil {
return err
}

for _, column := range columns {
if !yield(&logsDatasetColumn{dec: ds.dec, desc: column}) {
return nil
}
}

return err
})

}

func (ds *logsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, column := range columns {
pages, err := result.Collect(column.ListPages(ctx))
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (ds *logsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := page.ReadPage(ctx)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}

type logsDatasetColumn struct {
dec LogsDecoder
desc *logsmd.ColumnDesc

info *dataset.ColumnInfo
}

func (col *logsDatasetColumn) ColumnInfo() *dataset.ColumnInfo {
if col.info != nil {
return col.info
}

col.info = &dataset.ColumnInfo{
Name: col.desc.Info.Name,
Type: col.desc.Info.ValueType,
Compression: col.desc.Info.Compression,

RowsCount: int(col.desc.Info.RowsCount),
CompressedSize: int(col.desc.Info.CompressedSize),
UncompressedSize: int(col.desc.Info.UncompressedSize),

Statistics: col.desc.Info.Statistics,
}
return col.info
}

func (col *logsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] {
return result.Iter(func(yield func(dataset.Page) bool) error {
pageSets, err := result.Collect(col.dec.Pages(ctx, []*logsmd.ColumnDesc{col.desc}))
if err != nil {
return err
} else if len(pageSets) != 1 {
return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets))
}

for _, page := range pageSets[0] {
if !yield(&logsDatasetPage{dec: col.dec, desc: page}) {
return nil
}
}

return nil
})
}

type logsDatasetPage struct {
dec LogsDecoder
desc *logsmd.PageDesc

info *dataset.PageInfo
}

func (p *logsDatasetPage) PageInfo() *dataset.PageInfo {
if p.info != nil {
return p.info
}

p.info = &dataset.PageInfo{
UncompressedSize: int(p.desc.Info.UncompressedSize),
CompressedSize: int(p.desc.Info.CompressedSize),
CRC32: p.desc.Info.Crc32,
RowCount: int(p.desc.Info.RowsCount),

Encoding: p.desc.Info.Encoding,
Stats: p.desc.Info.Statistics,
}
return p.info
}

func (p *logsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) {
pages, err := result.Collect(p.dec.ReadPages(ctx, []*logsmd.PageDesc{p.desc}))
if err != nil {
return nil, err
} else if len(pages) != 1 {
return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages))
}

return pages[0], nil
}
21 changes: 21 additions & 0 deletions pkg/dataobj/internal/encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
Expand All @@ -20,6 +21,9 @@ type (

// StreamsDecoder returns a decoder for streams sections.
StreamsDecoder() StreamsDecoder

// LogsDecoder returns a decoder for logs sections.
LogsDecoder() LogsDecoder
}

// StreamsDecoder supports decoding data within a streams section.
Expand All @@ -38,4 +42,21 @@ type (
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData]
}

// LogsDecoder supports decoding data within a logs section.
LogsDecoder interface {
// Columns describes the set of columns in the provided section.
Columns(ctx context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error)

// Pages retrieves the set of pages for the provided columns. The order of
// page lists emitted by the sequence matches the order of columns
// provided: the first page list corresponds to the first column, and so
// on.
Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc]

// ReadPages reads the provided set of pages, iterating over their data
// matching the argument order. If an error is encountered while retrieving
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData]
}
)
28 changes: 27 additions & 1 deletion pkg/dataobj/internal/encoding/decoder_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gogo/protobuf/proto"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)
Expand All @@ -31,7 +32,7 @@ func decodeFileMetadata(r streamio.Reader) (*filemd.Metadata, error) {
return &md, nil
}

// decodeStreamsMetadata decodes stream section metadta from r.
// decodeStreamsMetadata decodes stream section metadata from r.
func decodeStreamsMetadata(r streamio.Reader) (*streamsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
Expand All @@ -56,6 +57,31 @@ func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata,
return &metadata, nil
}

// decodeLogsMetadata decodes logs section metadata from r.
func decodeLogsMetadata(r streamio.Reader) (*logsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("read streams section format version: %w", err)
} else if gotVersion != streamsFormatVersion {
return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion)
}

var md logsmd.Metadata
if err := decodeProto(r, &md); err != nil {
return nil, fmt.Errorf("streams section metadata: %w", err)
}
return &md, nil
}

// decodeLogsColumnMetadata decodes logs column metadata from r.
func decodeLogsColumnMetadata(r streamio.Reader) (*logsmd.ColumnMetadata, error) {
var metadata logsmd.ColumnMetadata
if err := decodeProto(r, &metadata); err != nil {
return nil, fmt.Errorf("streams column metadata: %w", err)
}
return &metadata, nil
}

// decodeProto decodes a proto message from r and stores it in pb. Proto
// messages are expected to be encoded with their size, followed by the proto
// bytes.
Expand Down
Loading

0 comments on commit 3f664d7

Please sign in to comment.